internal/worker: insert module if not in module_version_states

If a module is introduced for the first time to pkgsite through frontend
fetch, and not the index, it won't have a row in module_version_states.

A row is now inserted before the fetch process begins if the proxy's
info endpoint tells us that this is a valid module.

This fixes a bug in the fetch flow where
module_version_states.last_processed_at is not updated in
upsertModuleVersionState if a row did not already exist.

For golang/go#46985

Change-Id: I2a166228df20b1fb935bbabebb5a651da0e2c1ba
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/341892
Trust: Julie Qiu <julie@golang.org>
Run-TryBot: Julie Qiu <julie@golang.org>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/discovery.go b/internal/discovery.go
index 73f3438..2e54d3d 100644
--- a/internal/discovery.go
+++ b/internal/discovery.go
@@ -180,7 +180,11 @@
 	// IndexTimestamp is the timestamp received from the Index for this version,
 	// which should correspond to the time this version was committed to the
 	// Index.
-	IndexTimestamp time.Time
+	//
+	// This may be nil if the request only came via frontend fetch, or the
+	// status is not a 2xx status.
+	IndexTimestamp *time.Time
+
 	// CreatedAt is the time this version was originally inserted into the
 	// module version state table.
 	CreatedAt time.Time
diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go
index 6f04372..85f8d54 100644
--- a/internal/postgres/versionstate.go
+++ b/internal/postgres/versionstate.go
@@ -24,19 +24,49 @@
 // table with a status of zero.
 func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.IndexVersion) (err error) {
 	defer derrors.WrapStack(&err, "InsertIndexVersions(ctx, %v)", versions)
-	var vals []interface{}
-	for _, v := range versions {
-		vals = append(vals, v.Path, v.Version, version.ForSorting(v.Version), v.Timestamp, 0, "", "", version.IsIncompatible(v.Version))
-	}
-	cols := []string{"module_path", "version", "sort_version", "index_timestamp", "status", "error", "go_mod_path", "incompatible"}
 	conflictAction := `
 		ON CONFLICT
 			(module_path, version)
 		DO UPDATE SET
 			index_timestamp=excluded.index_timestamp,
-			next_processed_after=CURRENT_TIMESTAMP
-	`
-	return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
+			next_processed_after=CURRENT_TIMESTAMP`
+	return insertIndexVersions(ctx, db.db, versions, conflictAction)
+}
+
+// InsertNewModuleVersionFromFrontendFetch inserts a new module version into
+// the module_version_states table with a status of zero that was requested
+// from frontend fetch.
+func (db *DB) InsertNewModuleVersionFromFrontendFetch(ctx context.Context, modulePath, resolvedVersion string) (err error) {
+	defer derrors.WrapStack(&err, "InsertIndexVersion(ctx, %v)", resolvedVersion)
+	conflictAction := `ON CONFLICT (module_path, version) DO NOTHING`
+	return insertIndexVersions(ctx, db.db, []*internal.IndexVersion{{Path: modulePath, Version: resolvedVersion}}, conflictAction)
+}
+
+func insertIndexVersions(ctx context.Context, ddb *database.DB, versions []*internal.IndexVersion, conflictAction string) (err error) {
+	var vals []interface{}
+	for _, v := range versions {
+		vals = append(vals,
+			v.Path,
+			v.Version,
+			version.ForSorting(v.Version),
+			0,
+			"",
+			"",
+			version.IsIncompatible(v.Version),
+			v.Timestamp,
+		)
+	}
+	cols := []string{
+		"module_path",
+		"version",
+		"sort_version",
+		"status",
+		"error",
+		"go_mod_path",
+		"incompatible",
+		"index_timestamp",
+	}
+	return ddb.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
 		var updates [][2]string // (module_path, version) to update status
 		err := tx.BulkInsertReturning(ctx, "module_version_states", cols, vals, conflictAction,
 			[]string{"module_path", "version", "status"},
@@ -280,15 +310,20 @@
 func scanModuleVersionState(scan func(dest ...interface{}) error) (*internal.ModuleVersionState, error) {
 	var (
 		v               internal.ModuleVersionState
+		indexTimestamp  pq.NullTime
 		lastProcessedAt pq.NullTime
 		numPackages     sql.NullInt64
 		hasGoMod        sql.NullBool
 	)
-	if err := scan(&v.ModulePath, &v.Version, &v.IndexTimestamp, &v.CreatedAt, &v.Status, &v.Error,
+	if err := scan(&v.ModulePath, &v.Version, &indexTimestamp, &v.CreatedAt, &v.Status, &v.Error,
 		&v.TryCount, &v.LastProcessedAt, &v.NextProcessedAfter, &v.AppVersion, &hasGoMod, &v.GoModPath,
 		&numPackages); err != nil {
 		return nil, err
 	}
+	if indexTimestamp.Valid {
+		it := indexTimestamp.Time
+		v.IndexTimestamp = &it
+	}
 	if lastProcessedAt.Valid {
 		lp := lastProcessedAt.Time
 		v.LastProcessedAt = &lp
diff --git a/internal/postgres/versionstate_test.go b/internal/postgres/versionstate_test.go
index 4b0d5ab..cfd0f5f 100644
--- a/internal/postgres/versionstate_test.go
+++ b/internal/postgres/versionstate_test.go
@@ -118,8 +118,8 @@
 	}
 
 	wantVersions := []*internal.ModuleVersionState{
-		{ModulePath: "baz.com/quux", Version: "v2.0.1", IndexTimestamp: bazVersion.Timestamp},
-		{ModulePath: "foo.com/bar", Version: "v1.0.0", IndexTimestamp: fooVersion.Timestamp},
+		{ModulePath: "baz.com/quux", Version: "v2.0.1", IndexTimestamp: &bazVersion.Timestamp},
+		{ModulePath: "foo.com/bar", Version: "v1.0.0", IndexTimestamp: &fooVersion.Timestamp},
 	}
 	ignore := cmpopts.IgnoreFields(internal.ModuleVersionState{}, "CreatedAt", "LastProcessedAt", "NextProcessedAfter")
 	if diff := cmp.Diff(wantVersions, gotVersions, ignore); diff != "" {
@@ -152,7 +152,7 @@
 	wantFooState := &internal.ModuleVersionState{
 		ModulePath:     "foo.com/bar",
 		Version:        "v1.0.0",
-		IndexTimestamp: now,
+		IndexTimestamp: &now,
 		TryCount:       1,
 		GoModPath:      goModPath,
 		Error:          errString,
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index 1e6c370..a6aa35e 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -140,6 +140,7 @@
 	// to do load-shedding, but it's also important to make the proxy aware
 	// of the version if it isn't already, as can happen when we arrive here via
 	// frontend fetch.
+	//
 	// Don't fail on a non-nil error. If we return here, we won't record
 	// the error state in the DB.
 	info, err := getInfo(ctx, modulePath, requestedVersion, f.ProxyClient)
@@ -160,6 +161,20 @@
 		}
 		startFetchInfo(fi)
 		defer func() { finishFetchInfo(fi, status, err) }()
+
+		// If this is a valid module, insert it into module_version_states.
+		//
+		// In case something happens later on, this will make sure we retry. Also,
+		// modules that are introduced to pkgsite for the first time via frontend
+		// fetch and not index.golang.org won't have a row in
+		// module_version_states, so that ensures the logic below works properly as
+		// well.
+		//
+		// Leave the index_timestamp as empty. This will be populated when the
+		// module appears in the index.
+		if err := f.DB.InsertNewModuleVersionFromFrontendFetch(ctx, modulePath, info.Version); err != nil {
+			return derrors.ToStatus(err), "", err
+		}
 	}
 
 	// Get the latest-version information first, and update the DB. We'll need
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index d1ccd0a..6708806 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -105,7 +105,7 @@
 			}
 			return &internal.ModuleVersionState{
 				ModulePath:     version.Path,
-				IndexTimestamp: version.Timestamp,
+				IndexTimestamp: &version.Timestamp,
 				Status:         code,
 				TryCount:       tryCount,
 				Version:        version.Version,