internal/worker: insert module if not in module_version_states
If a module is introduced for the first time to pkgsite through frontend
fetch, and not the index, it won't have a row in module_version_states.
A row is now inserted before the fetch process begins if the proxy's
info endpoint tells us that this is a valid module.
This fixes a bug in the fetch flow where
module_version_states.last_processed_at is not updated in
upsertModuleVersionState if a row did not already exist.
For golang/go#46985
Change-Id: I2a166228df20b1fb935bbabebb5a651da0e2c1ba
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/341892
Trust: Julie Qiu <julie@golang.org>
Run-TryBot: Julie Qiu <julie@golang.org>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/discovery.go b/internal/discovery.go
index 73f3438..2e54d3d 100644
--- a/internal/discovery.go
+++ b/internal/discovery.go
@@ -180,7 +180,11 @@
// IndexTimestamp is the timestamp received from the Index for this version,
// which should correspond to the time this version was committed to the
// Index.
- IndexTimestamp time.Time
+ //
+ // This may be nil if the request only came via frontend fetch, or the
+ // status is not a 2xx status.
+ IndexTimestamp *time.Time
+
// CreatedAt is the time this version was originally inserted into the
// module version state table.
CreatedAt time.Time
diff --git a/internal/postgres/versionstate.go b/internal/postgres/versionstate.go
index 6f04372..85f8d54 100644
--- a/internal/postgres/versionstate.go
+++ b/internal/postgres/versionstate.go
@@ -24,19 +24,49 @@
// table with a status of zero.
func (db *DB) InsertIndexVersions(ctx context.Context, versions []*internal.IndexVersion) (err error) {
defer derrors.WrapStack(&err, "InsertIndexVersions(ctx, %v)", versions)
- var vals []interface{}
- for _, v := range versions {
- vals = append(vals, v.Path, v.Version, version.ForSorting(v.Version), v.Timestamp, 0, "", "", version.IsIncompatible(v.Version))
- }
- cols := []string{"module_path", "version", "sort_version", "index_timestamp", "status", "error", "go_mod_path", "incompatible"}
conflictAction := `
ON CONFLICT
(module_path, version)
DO UPDATE SET
index_timestamp=excluded.index_timestamp,
- next_processed_after=CURRENT_TIMESTAMP
- `
- return db.db.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
+ next_processed_after=CURRENT_TIMESTAMP`
+ return insertIndexVersions(ctx, db.db, versions, conflictAction)
+}
+
+// InsertNewModuleVersionFromFrontendFetch inserts a new module version into
+// the module_version_states table with a status of zero that was requested
+// from frontend fetch.
+func (db *DB) InsertNewModuleVersionFromFrontendFetch(ctx context.Context, modulePath, resolvedVersion string) (err error) {
+ defer derrors.WrapStack(&err, "InsertIndexVersion(ctx, %v)", resolvedVersion)
+ conflictAction := `ON CONFLICT (module_path, version) DO NOTHING`
+ return insertIndexVersions(ctx, db.db, []*internal.IndexVersion{{Path: modulePath, Version: resolvedVersion}}, conflictAction)
+}
+
+func insertIndexVersions(ctx context.Context, ddb *database.DB, versions []*internal.IndexVersion, conflictAction string) (err error) {
+ var vals []interface{}
+ for _, v := range versions {
+ vals = append(vals,
+ v.Path,
+ v.Version,
+ version.ForSorting(v.Version),
+ 0,
+ "",
+ "",
+ version.IsIncompatible(v.Version),
+ v.Timestamp,
+ )
+ }
+ cols := []string{
+ "module_path",
+ "version",
+ "sort_version",
+ "status",
+ "error",
+ "go_mod_path",
+ "incompatible",
+ "index_timestamp",
+ }
+ return ddb.Transact(ctx, sql.LevelDefault, func(tx *database.DB) error {
var updates [][2]string // (module_path, version) to update status
err := tx.BulkInsertReturning(ctx, "module_version_states", cols, vals, conflictAction,
[]string{"module_path", "version", "status"},
@@ -280,15 +310,20 @@
func scanModuleVersionState(scan func(dest ...interface{}) error) (*internal.ModuleVersionState, error) {
var (
v internal.ModuleVersionState
+ indexTimestamp pq.NullTime
lastProcessedAt pq.NullTime
numPackages sql.NullInt64
hasGoMod sql.NullBool
)
- if err := scan(&v.ModulePath, &v.Version, &v.IndexTimestamp, &v.CreatedAt, &v.Status, &v.Error,
+ if err := scan(&v.ModulePath, &v.Version, &indexTimestamp, &v.CreatedAt, &v.Status, &v.Error,
&v.TryCount, &v.LastProcessedAt, &v.NextProcessedAfter, &v.AppVersion, &hasGoMod, &v.GoModPath,
&numPackages); err != nil {
return nil, err
}
+ if indexTimestamp.Valid {
+ it := indexTimestamp.Time
+ v.IndexTimestamp = &it
+ }
if lastProcessedAt.Valid {
lp := lastProcessedAt.Time
v.LastProcessedAt = &lp
diff --git a/internal/postgres/versionstate_test.go b/internal/postgres/versionstate_test.go
index 4b0d5ab..cfd0f5f 100644
--- a/internal/postgres/versionstate_test.go
+++ b/internal/postgres/versionstate_test.go
@@ -118,8 +118,8 @@
}
wantVersions := []*internal.ModuleVersionState{
- {ModulePath: "baz.com/quux", Version: "v2.0.1", IndexTimestamp: bazVersion.Timestamp},
- {ModulePath: "foo.com/bar", Version: "v1.0.0", IndexTimestamp: fooVersion.Timestamp},
+ {ModulePath: "baz.com/quux", Version: "v2.0.1", IndexTimestamp: &bazVersion.Timestamp},
+ {ModulePath: "foo.com/bar", Version: "v1.0.0", IndexTimestamp: &fooVersion.Timestamp},
}
ignore := cmpopts.IgnoreFields(internal.ModuleVersionState{}, "CreatedAt", "LastProcessedAt", "NextProcessedAfter")
if diff := cmp.Diff(wantVersions, gotVersions, ignore); diff != "" {
@@ -152,7 +152,7 @@
wantFooState := &internal.ModuleVersionState{
ModulePath: "foo.com/bar",
Version: "v1.0.0",
- IndexTimestamp: now,
+ IndexTimestamp: &now,
TryCount: 1,
GoModPath: goModPath,
Error: errString,
diff --git a/internal/worker/fetch.go b/internal/worker/fetch.go
index 1e6c370..a6aa35e 100644
--- a/internal/worker/fetch.go
+++ b/internal/worker/fetch.go
@@ -140,6 +140,7 @@
// to do load-shedding, but it's also important to make the proxy aware
// of the version if it isn't already, as can happen when we arrive here via
// frontend fetch.
+ //
// Don't fail on a non-nil error. If we return here, we won't record
// the error state in the DB.
info, err := getInfo(ctx, modulePath, requestedVersion, f.ProxyClient)
@@ -160,6 +161,20 @@
}
startFetchInfo(fi)
defer func() { finishFetchInfo(fi, status, err) }()
+
+ // If this is a valid module, insert it into module_version_states.
+ //
+ // In case something happens later on, this will make sure we retry. Also,
+ // modules that are introduced to pkgsite for the first time via frontend
+ // fetch and not index.golang.org won't have a row in
+ // module_version_states, so that ensures the logic below works properly as
+ // well.
+ //
+ // Leave the index_timestamp as empty. This will be populated when the
+ // module appears in the index.
+ if err := f.DB.InsertNewModuleVersionFromFrontendFetch(ctx, modulePath, info.Version); err != nil {
+ return derrors.ToStatus(err), "", err
+ }
}
// Get the latest-version information first, and update the DB. We'll need
diff --git a/internal/worker/server_test.go b/internal/worker/server_test.go
index d1ccd0a..6708806 100644
--- a/internal/worker/server_test.go
+++ b/internal/worker/server_test.go
@@ -105,7 +105,7 @@
}
return &internal.ModuleVersionState{
ModulePath: version.Path,
- IndexTimestamp: version.Timestamp,
+ IndexTimestamp: &version.Timestamp,
Status: code,
TryCount: tryCount,
Version: version.Version,