internal/postgres: split insertUnits into multiple functions

insertUnits is currently about 200 lines and hard to parse. It is split
into multiple functions to improve readability.

Change-Id: I9c9ac51cb771c164bb9e500284dc2bc676c4624d
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/280159
Trust: Julie Qiu <julie@golang.org>
Run-TryBot: Julie Qiu <julie@golang.org>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/postgres/insert_module.go b/internal/postgres/insert_module.go
index 21df64f..df57736 100644
--- a/internal/postgres/insert_module.go
+++ b/internal/postgres/insert_module.go
@@ -275,54 +275,14 @@
 	for _, u := range m.Units {
 		sort.Strings(u.Imports)
 	}
-
-	// Add new unit paths to the paths table.
-	pathToID := map[string]int{}
-
-	collect := func(rows *sql.Rows) error {
-		var (
-			pathID int
-			path   string
-		)
-		if err := rows.Scan(&pathID, &path); err != nil {
-			return err
-		}
-		pathToID[path] = pathID
-		return nil
-	}
-
-	// Read all existing paths for this module, to avoid a large bulk upsert.
-	// (We've seen these bulk upserts hang for so long that they time out (10
-	// minutes)).
-	var curPaths []string
-	for _, u := range m.Units {
-		curPaths = append(curPaths, u.Path)
-	}
-	if err := db.RunQuery(ctx, `SELECT id, path FROM paths WHERE path = ANY($1)`,
-		collect, pq.Array(curPaths)); err != nil {
+	pathToID, err := insertPaths(ctx, db, m)
+	if err != nil {
 		return err
 	}
 
-	// Insert any unit paths that we don't already have.
-	var pathValues []interface{}
-	for _, u := range m.Units {
-		if _, ok := pathToID[u.Path]; !ok {
-			pathValues = append(pathValues, u.Path)
-		}
-	}
-	if len(pathValues) > 0 {
-		// Insert data into the paths table.
-		pathCols := []string{"path"}
-		uniquePathCols := []string{"path"}
-		returningPathCols := []string{"id", "path"}
-		if err := db.BulkUpsertReturning(ctx, "paths", pathCols, pathValues, uniquePathCols, returningPathCols, collect); err != nil {
-			return err
-		}
-	}
-
 	var (
 		unitValues    []interface{}
-		pathToUnitID  = map[string]int{}
+		paths         []string
 		pathToReadme  = map[string]*internal.Readme{}
 		pathToDoc     = map[string]*internal.Documentation{}
 		pathToImports = map[string][]string{}
@@ -364,7 +324,69 @@
 		if len(u.Imports) > 0 {
 			pathToImports[u.Path] = u.Imports
 		}
+		paths = append(paths, u.Path)
 	}
+	pathToUnitID, err := insertUnits(ctx, db, unitValues)
+	if err != nil {
+		return err
+	}
+	if err := insertReadmes(ctx, db, paths, pathToUnitID, pathToReadme); err != nil {
+		return err
+	}
+	if err := insertDoc(ctx, db, paths, pathToUnitID, pathToDoc); err != nil {
+		return err
+	}
+	return insertImports(ctx, db, paths, pathToUnitID, pathToImports)
+}
+
+func insertPaths(ctx context.Context, db *database.DB, m *internal.Module) (pathToID map[string]int, err error) {
+	// Add new unit paths to the paths table.
+	pathToID = map[string]int{}
+	collect := func(rows *sql.Rows) error {
+		var (
+			pathID int
+			path   string
+		)
+		if err := rows.Scan(&pathID, &path); err != nil {
+			return err
+		}
+		pathToID[path] = pathID
+		return nil
+	}
+
+	// Read all existing paths for this module, to avoid a large bulk upsert.
+	// (We've seen these bulk upserts hang for so long that they time out (10
+	// minutes)).
+	var curPaths []string
+	for _, u := range m.Units {
+		curPaths = append(curPaths, u.Path)
+	}
+	if err := db.RunQuery(ctx, `SELECT id, path FROM paths WHERE path = ANY($1)`,
+		collect, pq.Array(curPaths)); err != nil {
+		return nil, err
+	}
+
+	// Insert any unit paths that we don't already have.
+	var pathValues []interface{}
+	for _, u := range m.Units {
+		if _, ok := pathToID[u.Path]; !ok {
+			pathValues = append(pathValues, u.Path)
+		}
+	}
+	if len(pathValues) > 0 {
+		// Insert data into the paths table.
+		pathCols := []string{"path"}
+		uniquePathCols := []string{"path"}
+		returningPathCols := []string{"id", "path"}
+		if err := db.BulkUpsertReturning(ctx, "paths", pathCols, pathValues, uniquePathCols, returningPathCols, collect); err != nil {
+			return nil, err
+		}
+	}
+	return pathToID, nil
+}
+
+func insertUnits(ctx context.Context, db *database.DB, unitValues []interface{}) (pathToUnitID map[string]int, err error) {
+	defer derrors.Wrap(&err, "insertUnits")
 
 	// Insert data into the units table.
 	unitCols := []string{
@@ -380,65 +402,49 @@
 	uniqueUnitCols := []string{"path", "module_id"}
 	returningUnitCols := []string{"id", "path"}
 
-	var paths []string
-	if err := db.BulkUpsertReturning(ctx, "units", unitCols, unitValues, uniqueUnitCols, returningUnitCols, func(rows *sql.Rows) error {
-		var (
-			unitID int
-			path   string
-		)
-		if err := rows.Scan(&unitID, &path); err != nil {
-			return err
-		}
-		pathToUnitID[path] = unitID
-		paths = append(paths, path)
-		return nil
-	}); err != nil {
-		return err
-	}
-
-	// Sort to ensure proper lock ordering, avoiding deadlocks. We have seen
-	// deadlocks on package_imports and documentation. They can occur when
-	// processing two versions of the same module, which happens regularly.
-	sort.Strings(paths)
-	if len(pathToReadme) > 0 {
-		var readmeValues []interface{}
-		for _, path := range paths {
-			readme, ok := pathToReadme[path]
-			if !ok {
-				continue
+	pathToUnitID = map[string]int{}
+	if err := db.BulkUpsertReturning(ctx, "units", unitCols, unitValues,
+		uniqueUnitCols, returningUnitCols, func(rows *sql.Rows) error {
+			var (
+				unitID int
+				path   string
+			)
+			if err := rows.Scan(&unitID, &path); err != nil {
+				return err
 			}
-
-			// Do not add a readme with empty or zero contents.
-			readmeContents := makeValidUnicode(readme.Contents)
-			if len(readmeContents) == 0 {
-				continue
-			}
-
-			unitID := pathToUnitID[path]
-			readmeValues = append(readmeValues, unitID, readme.Filepath, readmeContents)
-		}
-		readmeCols := []string{"unit_id", "file_path", "contents"}
-		if err := db.BulkUpsert(ctx, "readmes", readmeCols, readmeValues, []string{"unit_id"}); err != nil {
-			return err
-		}
+			pathToUnitID[path] = unitID
+			return nil
+		}); err != nil {
+		return nil, err
 	}
+	return pathToUnitID, nil
+}
 
-	if len(pathToDoc) > 0 {
-		var docValues []interface{}
-		for _, path := range paths {
-			doc := pathToDoc[path]
-			if doc == nil {
-				continue
-			}
-			unitID := pathToUnitID[path]
-			docValues = append(docValues, unitID, doc.GOOS, doc.GOARCH, doc.Synopsis, doc.Source)
+func insertDoc(ctx context.Context, db *database.DB,
+	paths []string,
+	pathToUnitID map[string]int,
+	pathToDoc map[string]*internal.Documentation) (err error) {
+	defer derrors.Wrap(&err, "insertDoc")
+
+	var docValues []interface{}
+	for _, path := range paths {
+		doc := pathToDoc[path]
+		if doc == nil {
+			continue
 		}
-		uniqueCols := []string{"unit_id", "goos", "goarch"}
-		docCols := append(uniqueCols, "synopsis", "source")
-		if err := db.BulkUpsert(ctx, "documentation", docCols, docValues, uniqueCols); err != nil {
-			return err
-		}
+		unitID := pathToUnitID[path]
+		docValues = append(docValues, unitID, doc.GOOS, doc.GOARCH, doc.Synopsis, doc.Source)
 	}
+	uniqueCols := []string{"unit_id", "goos", "goarch"}
+	docCols := append(uniqueCols, "synopsis", "source")
+	return db.BulkUpsert(ctx, "documentation", docCols, docValues, uniqueCols)
+}
+
+func insertImports(ctx context.Context, db *database.DB,
+	paths []string,
+	pathToUnitID map[string]int,
+	pathToImports map[string][]string) (err error) {
+	defer derrors.Wrap(&err, "insertImports")
 
 	var importValues []interface{}
 	for _, pkgPath := range paths {
@@ -455,6 +461,32 @@
 	return db.BulkUpsert(ctx, "package_imports", importCols, importValues, importCols)
 }
 
+func insertReadmes(ctx context.Context, db *database.DB,
+	paths []string,
+	pathToUnitID map[string]int,
+	pathToReadme map[string]*internal.Readme) (err error) {
+	defer derrors.Wrap(&err, "insertReadmes")
+
+	var readmeValues []interface{}
+	for _, path := range paths {
+		readme, ok := pathToReadme[path]
+		if !ok {
+			continue
+		}
+
+		// Do not add a readme with empty or zero contents.
+		readmeContents := makeValidUnicode(readme.Contents)
+		if len(readmeContents) == 0 {
+			continue
+		}
+
+		unitID := pathToUnitID[path]
+		readmeValues = append(readmeValues, unitID, readme.Filepath, readmeContents)
+	}
+	readmeCols := []string{"unit_id", "file_path", "contents"}
+	return db.BulkUpsert(ctx, "readmes", readmeCols, readmeValues, []string{"unit_id"})
+}
+
 // lock obtains an exclusive, transaction-scoped advisory lock on modulePath.
 func lock(ctx context.Context, tx *database.DB, modulePath string) (err error) {
 	defer derrors.Wrap(&err, "lock(%s)", modulePath)