internal/postgres: update imported-by counts in chunks

If too many rows change imported-by counts, then the
/update-imported-by-count endpoint times out after 30 minutes, and
nothing gets inserted because it runs as one transaction.

Instead, break it into multiple smaller transactions. Since we only
update changed counts, then as long as one transaction completes we
have made some counts identical, so we will make progress.

For golang/go#47555

Change-Id: I546aaabcc5e0f99d71efe38748475274871382c4
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/341249
Reviewed-by: Jamal Carvalho <jamal@golang.org>
Reviewed-by: Julie Qiu <julie@golang.org>
Trust: Julie Qiu <julie@golang.org>
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Julie Qiu <julie@golang.org>
diff --git a/internal/postgres/search.go b/internal/postgres/search.go
index d6cc887..7a08708 100644
--- a/internal/postgres/search.go
+++ b/internal/postgres/search.go
@@ -845,16 +845,27 @@
 	return db.UpdateSearchDocumentsImportedByCountWithCounts(ctx, changedCounts)
 }
 
+// How many imported-by counts to update at a time.
+// A variable for testing.
+var countBatchSize = 20_000
+
 func (db *DB) UpdateSearchDocumentsImportedByCountWithCounts(ctx context.Context, counts map[string]int) (nUpdated int64, err error) {
 	defer derrors.WrapStack(&err, "UpdateSearchDocumentsImportedByCountWithCounts")
-	err = db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
-		if err := insertImportedByCounts(ctx, tx, counts); err != nil {
+	for len(counts) > 0 {
+		var nu int64
+		err := db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
+			if err := insertImportedByCounts(ctx, tx, counts, countBatchSize); err != nil {
+				return err
+			}
+			nu, err = updateImportedByCounts(ctx, tx)
 			return err
+		})
+		if err != nil {
+			return nUpdated, err
 		}
-		nUpdated, err = updateImportedByCounts(ctx, tx)
-		return err
-	})
-	return nUpdated, err
+		nUpdated += nu
+	}
+	return nUpdated, nil
 }
 
 // getSearchPackages returns the set of package paths that are in the search_documents table,
@@ -916,7 +927,10 @@
 	return newCounts, nil
 }
 
-func insertImportedByCounts(ctx context.Context, db *database.DB, counts map[string]int) (err error) {
+// insertImportedByCounts creates a temporary table and inserts at most limit
+// rows into it, where each row is a key and value from the counts map. The
+// inserted keys are deleted from counts.
+func insertImportedByCounts(ctx context.Context, db *database.DB, counts map[string]int, limit int) (err error) {
 	defer derrors.WrapStack(&err, "insertImportedByCounts(ctx, db, counts)")
 
 	const createTableQuery = `
@@ -928,9 +942,15 @@
 	if _, err := db.Exec(ctx, createTableQuery); err != nil {
 		return fmt.Errorf("CREATE TABLE: %v", err)
 	}
-	values := make([]interface{}, 0, 2*len(counts))
+	var values []interface{}
+	i := 0
 	for p, c := range counts {
+		if i >= limit {
+			break
+		}
 		values = append(values, p, c)
+		delete(counts, p)
+		i++
 	}
 	columns := []string{"package_path", "imported_by_count"}
 	return db.BulkInsert(ctx, "computed_imported_by_counts", columns, values, "")
diff --git a/internal/postgres/search_test.go b/internal/postgres/search_test.go
index c1991fb..785de58 100644
--- a/internal/postgres/search_test.go
+++ b/internal/postgres/search_test.go
@@ -1061,7 +1061,7 @@
 }
 
 func TestUpdateSearchDocumentsImportedByCount(t *testing.T) {
-	t.Parallel()
+	// Dont' run in parallel because it changes countBatchSize.
 	ctx := context.Background()
 
 	pkgPath := func(m *internal.Module) string { return m.Packages()[0].Path }
@@ -1189,6 +1189,24 @@
 		updateImportedByCount(testDB)
 		validateImportedByCountAndGetSearchDocument(t, testDB, "mod.com/B/B", 1)
 	})
+	t.Run("multiple", func(t *testing.T) {
+		defer func(old int) { countBatchSize = old }(countBatchSize)
+		countBatchSize = 1
+
+		testDB, release := acquire(t)
+		defer release()
+
+		// Two modules with importers.
+		mA := insertPackageVersion(t, testDB, "A", "v1.0.0", nil)
+		mB := insertPackageVersion(t, testDB, "B", "v1.0.0", nil)
+		insertPackageVersion(t, testDB, "C", "v1.0.0", []string{"A"})
+		insertPackageVersion(t, testDB, "D", "v1.0.0", []string{"A"})
+		insertPackageVersion(t, testDB, "E", "v1.0.0", []string{"B"})
+
+		updateImportedByCount(testDB)
+		_ = validateImportedByCountAndGetSearchDocument(t, testDB, pkgPath(mA), 2)
+		_ = validateImportedByCountAndGetSearchDocument(t, testDB, pkgPath(mB), 1)
+	})
 }
 
 func TestGetPackagesForSearchDocumentUpsert(t *testing.T) {