internal/postgres: method to compute index timestamp for latency metric

Add GetOldestUnprocessedIndexTime, which computes the index timestamp
of the oldest unprocessed module.

The current time minus this value is a lower bound on the time from
when a module enters the index until it is processed.

Change-Id: I0b74f14e94d8d070496df8a8baa11c31decebfd8
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278552
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Robert Findley <rfindley@google.com>
diff --git a/internal/postgres/postgres.go b/internal/postgres/postgres.go
index 596067a..41fc4aa 100644
--- a/internal/postgres/postgres.go
+++ b/internal/postgres/postgres.go
@@ -8,9 +8,11 @@
 
 import (
 	"context"
+	"database/sql"
 	"time"
 
 	"golang.org/x/pkgsite/internal/database"
+	"golang.org/x/pkgsite/internal/derrors"
 	"golang.org/x/pkgsite/internal/log"
 	"golang.org/x/pkgsite/internal/poller"
 )
@@ -64,3 +66,43 @@
 func (db *DB) Underlying() *database.DB {
 	return db.db
 }
+
+// StalenessTimestamp returns the index timestamp of the oldest
+// module that is newer than the index timestamp of the youngest module we have
+// processed. That is, let T be the maximum index timestamp of all processed
+// modules. Then this function return the minimum index timestamp of unprocessed
+// modules that is no less than T, or an error that wraps derrors.NotFound if
+// there is none.
+//
+// The name of the function is imprecise: there may be an older unprocessed
+// module, if one newer than it has been processed.
+//
+// We use this function to compute a metric that is a lower bound on the time
+// it takes to process a module since it appeared in the index.
+func (db *DB) StalenessTimestamp(ctx context.Context) (time.Time, error) {
+	var ts time.Time
+	err := db.db.QueryRow(ctx, `
+		SELECT m.index_timestamp
+		FROM module_version_states m
+		CROSS JOIN (
+			-- the index timestamp of the youngest processed module
+			SELECT index_timestamp
+			FROM module_version_states
+			WHERE last_processed_at IS NOT NULL
+			ORDER BY 1 DESC
+			LIMIT 1
+		) yp
+		WHERE m.index_timestamp > yp.index_timestamp
+		AND last_processed_at IS NULL
+		ORDER BY m.index_timestamp ASC
+		LIMIT 1
+	`).Scan(&ts)
+	switch err {
+	case nil:
+		return ts, nil
+	case sql.ErrNoRows:
+		return time.Time{}, derrors.NotFound
+	default:
+		return time.Time{}, err
+	}
+}
diff --git a/internal/postgres/postgres_test.go b/internal/postgres/postgres_test.go
index 352a9c4..607eb94 100644
--- a/internal/postgres/postgres_test.go
+++ b/internal/postgres/postgres_test.go
@@ -5,8 +5,13 @@
 package postgres
 
 import (
+	"context"
+	"errors"
+	"fmt"
 	"testing"
 	"time"
+
+	"golang.org/x/pkgsite/internal/derrors"
 )
 
 const testTimeout = 5 * time.Second
@@ -16,3 +21,90 @@
 func TestMain(m *testing.M) {
 	RunDBTests("discovery_postgres_test", m, &testDB)
 }
+
+func TestGetOldestUnprocessedIndexTime(t *testing.T) {
+	ctx := context.Background()
+
+	type modTimes struct {
+		indexTimestamp  string // time in Kitchen format
+		lastProcessedAt string // ditto, empty means NULL
+	}
+
+	for _, test := range []struct {
+		name string
+		mods []modTimes
+		want string // empty => error
+	}{
+		{
+			"no modules",
+			nil,
+			"",
+		},
+		{
+			"no unprocessed modules",
+			[]modTimes{
+				{"7:00AM", "7:02AM"}, // index says 7am, processed at 7:02
+			},
+			"",
+		},
+		{
+			"no processed modules",
+			[]modTimes{
+				{"7:00AM", ""}, // index says 7am, never processed
+			},
+			"",
+		},
+		{
+			"several modules",
+			[]modTimes{
+				{"5:00AM", ""}, // old, never processed
+				{"6:00AM", "6:35AM"},
+				{"7:00AM", "7:02AM"}, // youngest processed module
+				{"8:00AM", ""},       // oldest unprocessed after youngest processed
+				{"9:00AM", ""},
+			},
+			"8:00AM",
+		},
+	} {
+		t.Run(test.name, func(t *testing.T) {
+			defer ResetTestDB(testDB, t)
+			for i, m := range test.mods {
+				path := fmt.Sprintf("m%d", i)
+				it, err := time.Parse(time.Kitchen, m.indexTimestamp)
+				if err != nil {
+					t.Fatal(err)
+				}
+				var lpt *time.Time
+				if m.lastProcessedAt != "" {
+					p, err := time.Parse(time.Kitchen, m.lastProcessedAt)
+					if err != nil {
+						t.Fatal(err)
+					}
+					lpt = &p
+				}
+				if _, err := testDB.db.Exec(ctx, `
+					INSERT INTO module_version_states (module_path, version, index_timestamp, last_processed_at, sort_version, incompatible)
+					VALUES ($1, 'v1.0.0', $2, $3, 'x', false)
+				`, path, it, lpt); err != nil {
+					t.Fatal(err)
+				}
+			}
+			got, err := testDB.StalenessTimestamp(ctx)
+			if err != nil && errors.Is(err, derrors.NotFound) {
+				if test.want != "" {
+					t.Fatalf("got unexpected error %v", err)
+				}
+			} else if err != nil {
+				t.Fatal(err)
+			} else {
+				want, err := time.Parse(time.Kitchen, test.want)
+				if err != nil {
+					t.Fatal(err)
+				}
+				if !got.Equal(want) {
+					t.Errorf("got %s, want %s", got, want)
+				}
+			}
+		})
+	}
+}