internal/postgres: split insertUnits into multiple functions
insertUnits is currently about 200 lines and hard to parse. It is split
into multiple functions to improve readability.
Change-Id: I9c9ac51cb771c164bb9e500284dc2bc676c4624d
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/280159
Trust: Julie Qiu <julie@golang.org>
Run-TryBot: Julie Qiu <julie@golang.org>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/postgres/insert_module.go b/internal/postgres/insert_module.go
index 21df64f..df57736 100644
--- a/internal/postgres/insert_module.go
+++ b/internal/postgres/insert_module.go
@@ -275,54 +275,14 @@
for _, u := range m.Units {
sort.Strings(u.Imports)
}
-
- // Add new unit paths to the paths table.
- pathToID := map[string]int{}
-
- collect := func(rows *sql.Rows) error {
- var (
- pathID int
- path string
- )
- if err := rows.Scan(&pathID, &path); err != nil {
- return err
- }
- pathToID[path] = pathID
- return nil
- }
-
- // Read all existing paths for this module, to avoid a large bulk upsert.
- // (We've seen these bulk upserts hang for so long that they time out (10
- // minutes)).
- var curPaths []string
- for _, u := range m.Units {
- curPaths = append(curPaths, u.Path)
- }
- if err := db.RunQuery(ctx, `SELECT id, path FROM paths WHERE path = ANY($1)`,
- collect, pq.Array(curPaths)); err != nil {
+ pathToID, err := insertPaths(ctx, db, m)
+ if err != nil {
return err
}
- // Insert any unit paths that we don't already have.
- var pathValues []interface{}
- for _, u := range m.Units {
- if _, ok := pathToID[u.Path]; !ok {
- pathValues = append(pathValues, u.Path)
- }
- }
- if len(pathValues) > 0 {
- // Insert data into the paths table.
- pathCols := []string{"path"}
- uniquePathCols := []string{"path"}
- returningPathCols := []string{"id", "path"}
- if err := db.BulkUpsertReturning(ctx, "paths", pathCols, pathValues, uniquePathCols, returningPathCols, collect); err != nil {
- return err
- }
- }
-
var (
unitValues []interface{}
- pathToUnitID = map[string]int{}
+ paths []string
pathToReadme = map[string]*internal.Readme{}
pathToDoc = map[string]*internal.Documentation{}
pathToImports = map[string][]string{}
@@ -364,7 +324,69 @@
if len(u.Imports) > 0 {
pathToImports[u.Path] = u.Imports
}
+ paths = append(paths, u.Path)
}
+ pathToUnitID, err := insertUnits(ctx, db, unitValues)
+ if err != nil {
+ return err
+ }
+ if err := insertReadmes(ctx, db, paths, pathToUnitID, pathToReadme); err != nil {
+ return err
+ }
+ if err := insertDoc(ctx, db, paths, pathToUnitID, pathToDoc); err != nil {
+ return err
+ }
+ return insertImports(ctx, db, paths, pathToUnitID, pathToImports)
+}
+
+func insertPaths(ctx context.Context, db *database.DB, m *internal.Module) (pathToID map[string]int, err error) {
+ // Add new unit paths to the paths table.
+ pathToID = map[string]int{}
+ collect := func(rows *sql.Rows) error {
+ var (
+ pathID int
+ path string
+ )
+ if err := rows.Scan(&pathID, &path); err != nil {
+ return err
+ }
+ pathToID[path] = pathID
+ return nil
+ }
+
+ // Read all existing paths for this module, to avoid a large bulk upsert.
+ // (We've seen these bulk upserts hang for so long that they time out (10
+ // minutes)).
+ var curPaths []string
+ for _, u := range m.Units {
+ curPaths = append(curPaths, u.Path)
+ }
+ if err := db.RunQuery(ctx, `SELECT id, path FROM paths WHERE path = ANY($1)`,
+ collect, pq.Array(curPaths)); err != nil {
+ return nil, err
+ }
+
+ // Insert any unit paths that we don't already have.
+ var pathValues []interface{}
+ for _, u := range m.Units {
+ if _, ok := pathToID[u.Path]; !ok {
+ pathValues = append(pathValues, u.Path)
+ }
+ }
+ if len(pathValues) > 0 {
+ // Insert data into the paths table.
+ pathCols := []string{"path"}
+ uniquePathCols := []string{"path"}
+ returningPathCols := []string{"id", "path"}
+ if err := db.BulkUpsertReturning(ctx, "paths", pathCols, pathValues, uniquePathCols, returningPathCols, collect); err != nil {
+ return nil, err
+ }
+ }
+ return pathToID, nil
+}
+
+func insertUnits(ctx context.Context, db *database.DB, unitValues []interface{}) (pathToUnitID map[string]int, err error) {
+ defer derrors.Wrap(&err, "insertUnits")
// Insert data into the units table.
unitCols := []string{
@@ -380,65 +402,49 @@
uniqueUnitCols := []string{"path", "module_id"}
returningUnitCols := []string{"id", "path"}
- var paths []string
- if err := db.BulkUpsertReturning(ctx, "units", unitCols, unitValues, uniqueUnitCols, returningUnitCols, func(rows *sql.Rows) error {
- var (
- unitID int
- path string
- )
- if err := rows.Scan(&unitID, &path); err != nil {
- return err
- }
- pathToUnitID[path] = unitID
- paths = append(paths, path)
- return nil
- }); err != nil {
- return err
- }
-
- // Sort to ensure proper lock ordering, avoiding deadlocks. We have seen
- // deadlocks on package_imports and documentation. They can occur when
- // processing two versions of the same module, which happens regularly.
- sort.Strings(paths)
- if len(pathToReadme) > 0 {
- var readmeValues []interface{}
- for _, path := range paths {
- readme, ok := pathToReadme[path]
- if !ok {
- continue
+ pathToUnitID = map[string]int{}
+ if err := db.BulkUpsertReturning(ctx, "units", unitCols, unitValues,
+ uniqueUnitCols, returningUnitCols, func(rows *sql.Rows) error {
+ var (
+ unitID int
+ path string
+ )
+ if err := rows.Scan(&unitID, &path); err != nil {
+ return err
}
-
- // Do not add a readme with empty or zero contents.
- readmeContents := makeValidUnicode(readme.Contents)
- if len(readmeContents) == 0 {
- continue
- }
-
- unitID := pathToUnitID[path]
- readmeValues = append(readmeValues, unitID, readme.Filepath, readmeContents)
- }
- readmeCols := []string{"unit_id", "file_path", "contents"}
- if err := db.BulkUpsert(ctx, "readmes", readmeCols, readmeValues, []string{"unit_id"}); err != nil {
- return err
- }
+ pathToUnitID[path] = unitID
+ return nil
+ }); err != nil {
+ return nil, err
}
+ return pathToUnitID, nil
+}
- if len(pathToDoc) > 0 {
- var docValues []interface{}
- for _, path := range paths {
- doc := pathToDoc[path]
- if doc == nil {
- continue
- }
- unitID := pathToUnitID[path]
- docValues = append(docValues, unitID, doc.GOOS, doc.GOARCH, doc.Synopsis, doc.Source)
+func insertDoc(ctx context.Context, db *database.DB,
+ paths []string,
+ pathToUnitID map[string]int,
+ pathToDoc map[string]*internal.Documentation) (err error) {
+ defer derrors.Wrap(&err, "insertDoc")
+
+ var docValues []interface{}
+ for _, path := range paths {
+ doc := pathToDoc[path]
+ if doc == nil {
+ continue
}
- uniqueCols := []string{"unit_id", "goos", "goarch"}
- docCols := append(uniqueCols, "synopsis", "source")
- if err := db.BulkUpsert(ctx, "documentation", docCols, docValues, uniqueCols); err != nil {
- return err
- }
+ unitID := pathToUnitID[path]
+ docValues = append(docValues, unitID, doc.GOOS, doc.GOARCH, doc.Synopsis, doc.Source)
}
+ uniqueCols := []string{"unit_id", "goos", "goarch"}
+ docCols := append(uniqueCols, "synopsis", "source")
+ return db.BulkUpsert(ctx, "documentation", docCols, docValues, uniqueCols)
+}
+
+func insertImports(ctx context.Context, db *database.DB,
+ paths []string,
+ pathToUnitID map[string]int,
+ pathToImports map[string][]string) (err error) {
+ defer derrors.Wrap(&err, "insertImports")
var importValues []interface{}
for _, pkgPath := range paths {
@@ -455,6 +461,32 @@
return db.BulkUpsert(ctx, "package_imports", importCols, importValues, importCols)
}
+func insertReadmes(ctx context.Context, db *database.DB,
+ paths []string,
+ pathToUnitID map[string]int,
+ pathToReadme map[string]*internal.Readme) (err error) {
+ defer derrors.Wrap(&err, "insertReadmes")
+
+ var readmeValues []interface{}
+ for _, path := range paths {
+ readme, ok := pathToReadme[path]
+ if !ok {
+ continue
+ }
+
+ // Do not add a readme with empty or zero contents.
+ readmeContents := makeValidUnicode(readme.Contents)
+ if len(readmeContents) == 0 {
+ continue
+ }
+
+ unitID := pathToUnitID[path]
+ readmeValues = append(readmeValues, unitID, readme.Filepath, readmeContents)
+ }
+ readmeCols := []string{"unit_id", "file_path", "contents"}
+ return db.BulkUpsert(ctx, "readmes", readmeCols, readmeValues, []string{"unit_id"})
+}
+
// lock obtains an exclusive, transaction-scoped advisory lock on modulePath.
func lock(ctx context.Context, tx *database.DB, modulePath string) (err error) {
defer derrors.Wrap(&err, "lock(%s)", modulePath)