internal/postgres: method to compute index timestamp for latency metric
Add GetOldestUnprocessedIndexTime, which computes the index timestamp
of the oldest unprocessed module.
The current time minus this value is a lower bound on the time from
when a module enters the index until it is processed.
Change-Id: I0b74f14e94d8d070496df8a8baa11c31decebfd8
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278552
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Robert Findley <rfindley@google.com>
diff --git a/internal/postgres/postgres.go b/internal/postgres/postgres.go
index 596067a..41fc4aa 100644
--- a/internal/postgres/postgres.go
+++ b/internal/postgres/postgres.go
@@ -8,9 +8,11 @@
import (
"context"
+ "database/sql"
"time"
"golang.org/x/pkgsite/internal/database"
+ "golang.org/x/pkgsite/internal/derrors"
"golang.org/x/pkgsite/internal/log"
"golang.org/x/pkgsite/internal/poller"
)
@@ -64,3 +66,43 @@
func (db *DB) Underlying() *database.DB {
return db.db
}
+
+// StalenessTimestamp returns the index timestamp of the oldest
+// module that is newer than the index timestamp of the youngest module we have
+// processed. That is, let T be the maximum index timestamp of all processed
+// modules. Then this function return the minimum index timestamp of unprocessed
+// modules that is no less than T, or an error that wraps derrors.NotFound if
+// there is none.
+//
+// The name of the function is imprecise: there may be an older unprocessed
+// module, if one newer than it has been processed.
+//
+// We use this function to compute a metric that is a lower bound on the time
+// it takes to process a module since it appeared in the index.
+func (db *DB) StalenessTimestamp(ctx context.Context) (time.Time, error) {
+ var ts time.Time
+ err := db.db.QueryRow(ctx, `
+ SELECT m.index_timestamp
+ FROM module_version_states m
+ CROSS JOIN (
+ -- the index timestamp of the youngest processed module
+ SELECT index_timestamp
+ FROM module_version_states
+ WHERE last_processed_at IS NOT NULL
+ ORDER BY 1 DESC
+ LIMIT 1
+ ) yp
+ WHERE m.index_timestamp > yp.index_timestamp
+ AND last_processed_at IS NULL
+ ORDER BY m.index_timestamp ASC
+ LIMIT 1
+ `).Scan(&ts)
+ switch err {
+ case nil:
+ return ts, nil
+ case sql.ErrNoRows:
+ return time.Time{}, derrors.NotFound
+ default:
+ return time.Time{}, err
+ }
+}
diff --git a/internal/postgres/postgres_test.go b/internal/postgres/postgres_test.go
index 352a9c4..607eb94 100644
--- a/internal/postgres/postgres_test.go
+++ b/internal/postgres/postgres_test.go
@@ -5,8 +5,13 @@
package postgres
import (
+ "context"
+ "errors"
+ "fmt"
"testing"
"time"
+
+ "golang.org/x/pkgsite/internal/derrors"
)
const testTimeout = 5 * time.Second
@@ -16,3 +21,90 @@
func TestMain(m *testing.M) {
RunDBTests("discovery_postgres_test", m, &testDB)
}
+
+func TestGetOldestUnprocessedIndexTime(t *testing.T) {
+ ctx := context.Background()
+
+ type modTimes struct {
+ indexTimestamp string // time in Kitchen format
+ lastProcessedAt string // ditto, empty means NULL
+ }
+
+ for _, test := range []struct {
+ name string
+ mods []modTimes
+ want string // empty => error
+ }{
+ {
+ "no modules",
+ nil,
+ "",
+ },
+ {
+ "no unprocessed modules",
+ []modTimes{
+ {"7:00AM", "7:02AM"}, // index says 7am, processed at 7:02
+ },
+ "",
+ },
+ {
+ "no processed modules",
+ []modTimes{
+ {"7:00AM", ""}, // index says 7am, never processed
+ },
+ "",
+ },
+ {
+ "several modules",
+ []modTimes{
+ {"5:00AM", ""}, // old, never processed
+ {"6:00AM", "6:35AM"},
+ {"7:00AM", "7:02AM"}, // youngest processed module
+ {"8:00AM", ""}, // oldest unprocessed after youngest processed
+ {"9:00AM", ""},
+ },
+ "8:00AM",
+ },
+ } {
+ t.Run(test.name, func(t *testing.T) {
+ defer ResetTestDB(testDB, t)
+ for i, m := range test.mods {
+ path := fmt.Sprintf("m%d", i)
+ it, err := time.Parse(time.Kitchen, m.indexTimestamp)
+ if err != nil {
+ t.Fatal(err)
+ }
+ var lpt *time.Time
+ if m.lastProcessedAt != "" {
+ p, err := time.Parse(time.Kitchen, m.lastProcessedAt)
+ if err != nil {
+ t.Fatal(err)
+ }
+ lpt = &p
+ }
+ if _, err := testDB.db.Exec(ctx, `
+ INSERT INTO module_version_states (module_path, version, index_timestamp, last_processed_at, sort_version, incompatible)
+ VALUES ($1, 'v1.0.0', $2, $3, 'x', false)
+ `, path, it, lpt); err != nil {
+ t.Fatal(err)
+ }
+ }
+ got, err := testDB.StalenessTimestamp(ctx)
+ if err != nil && errors.Is(err, derrors.NotFound) {
+ if test.want != "" {
+ t.Fatalf("got unexpected error %v", err)
+ }
+ } else if err != nil {
+ t.Fatal(err)
+ } else {
+ want, err := time.Parse(time.Kitchen, test.want)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !got.Equal(want) {
+ t.Errorf("got %s, want %s", got, want)
+ }
+ }
+ })
+ }
+}