internal: add metric for Watchers

Export a metric for the latest time of gaby Watchers.
This will let us track their progress.

A single metric is used for all Watchers, with the different
watchers distinguished by an attribute called "name".
Having one metric is convenient, but the default GCP "Sum" aggregation
is a poor choice; it is better to view the metrics unaggregated.

The Watcher.cutoff method returns the latest DBTime, but it requires
the Watcher lock to be held, and it may be held for a while. It's
not a good idea to wait a long time for a lock during metric collection,
so instead we track the latest time in an atomic variable and expose
its value with the Watcher.Latest method. The various users of Watchers
themselves provide Latest functions or methods to their users.
Ultimately the gaby main program is responsible for collecting
and naming all the Watcher values and registering the metric.

This CL also refactors a bit, removing the convenience layer that was
hiding Open Telemetry from the gcpmetrics package and giving that job to the
gaby program.

Change-Id: Ic69e647a6f259e0a5b38bdc246992e9078a64fad
Reviewed-on: https://go-review.googlesource.com/c/oscar/+/607155
Reviewed-by: Tatiana Bradley <tatianabradley@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/internal/commentfix/fix.go b/internal/commentfix/fix.go
index c2217cd..a5d07eb 100644
--- a/internal/commentfix/fix.go
+++ b/internal/commentfix/fix.go
@@ -367,6 +367,11 @@
 	}
 }
 
+// Latest returns the latest known DBTime marked old by the Fixer's Watcher.
+func (f *Fixer) Latest() timed.DBTime {
+	return f.watcher.Latest()
+}
+
 type issueOrComment struct {
 	issue   *github.Issue
 	comment *github.IssueComment
diff --git a/internal/crawldocs/sync.go b/internal/crawldocs/sync.go
index 771f747..698a947 100644
--- a/internal/crawldocs/sync.go
+++ b/internal/crawldocs/sync.go
@@ -10,6 +10,7 @@
 
 	"golang.org/x/oscar/internal/crawl"
 	"golang.org/x/oscar/internal/docs"
+	"golang.org/x/oscar/internal/storage/timed"
 )
 
 // Sync reads new HTML pages from cr, splits them into sections using [Split],
@@ -43,3 +44,8 @@
 func Restart(ctx context.Context, lg *slog.Logger, cr *crawl.Crawler) {
 	cr.PageWatcher("crawldocs").Restart()
 }
+
+// Latest returns the latest known DBTime marked old by the crawler's Watcher.
+func Latest(cr *crawl.Crawler) timed.DBTime {
+	return cr.PageWatcher("crawldocs").Latest()
+}
diff --git a/internal/embeddocs/sync.go b/internal/embeddocs/sync.go
index 5ce8790..961f9ff 100644
--- a/internal/embeddocs/sync.go
+++ b/internal/embeddocs/sync.go
@@ -83,3 +83,8 @@
 		}
 	}
 }
+
+// Latest returns the latest known DBTime marked old by the corpus's Watcher.
+func Latest(dc *docs.Corpus) timed.DBTime {
+	return dc.DocWatcher("embeddocs").Latest()
+}
diff --git a/internal/gaby/main.go b/internal/gaby/main.go
index 48d7c78..dc46034 100644
--- a/internal/gaby/main.go
+++ b/internal/gaby/main.go
@@ -19,6 +19,7 @@
 	"time"
 
 	"cloud.google.com/go/compute/metadata"
+	ometric "go.opentelemetry.io/otel/metric"
 	"golang.org/x/oscar/internal/commentfix"
 	"golang.org/x/oscar/internal/crawl"
 	"golang.org/x/oscar/internal/crawldocs"
@@ -36,6 +37,7 @@
 	"golang.org/x/oscar/internal/related"
 	"golang.org/x/oscar/internal/secret"
 	"golang.org/x/oscar/internal/storage"
+	"golang.org/x/oscar/internal/storage/timed"
 )
 
 var flags struct {
@@ -71,6 +73,7 @@
 	docs      *docs.Corpus     // document corpus to use
 	embed     llm.Embedder     // LLM embedder to use
 	github    *github.Client   // github client to use
+	meter     ometric.Meter    // used to create Open Telemetry instruments
 }
 
 func main() {
@@ -91,12 +94,15 @@
 	defer shutdown()
 
 	var syncs, changes []func(context.Context)
+	// Named functions to retrieve latest Watcher times.
+	watcherLatests := map[string]func() timed.DBTime{}
 
 	g.github = github.New(g.slog, g.db, g.secret, g.http)
 	syncs = append(syncs, g.github.Sync)
 
 	g.docs = docs.New(g.db)
 	syncs = append(syncs, func(ctx context.Context) { githubdocs.Sync(ctx, g.slog, g.docs, g.github) })
+	watcherLatests["githubdocs"] = func() timed.DBTime { return githubdocs.Latest(g.github) }
 
 	ai, err := gemini.NewClient(g.ctx, g.slog, g.secret, g.http, "text-embedding-004")
 	if err != nil {
@@ -104,6 +110,7 @@
 	}
 	g.embed = ai
 	syncs = append(syncs, func(ctx context.Context) { embeddocs.Sync(ctx, g.slog, g.vector, g.embed, g.docs) })
+	watcherLatests["embeddocs"] = func() timed.DBTime { return embeddocs.Latest(g.docs) }
 
 	cr := crawl.New(g.slog, g.db, g.http)
 	cr.Add("https://go.dev/")
@@ -116,6 +123,7 @@
 	if false {
 		syncs = append(syncs, cr.Run)
 		syncs = append(syncs, func(ctx context.Context) { crawldocs.Sync(ctx, g.slog, g.docs, cr) })
+		watcherLatests["crawldocs"] = func() timed.DBTime { return crawldocs.Latest(cr) }
 	}
 
 	if flags.search {
@@ -129,6 +137,7 @@
 	cf.ReplaceURL(`\Qhttps://go-review.git.corp.google.com/\E`, "https://go-review.googlesource.com/")
 	cf.EnableEdits()
 	changes = append(changes, cf.Run)
+	watcherLatests["gerritlinks fix"] = cf.Latest
 
 	rp := related.New(g.slog, g.db, g.github, g.vector, g.docs, "related")
 	rp.EnableProject("golang/go")
@@ -137,6 +146,7 @@
 	rp.SkipTitleSuffix(" backport]")
 	rp.EnablePosts()
 	changes = append(changes, rp.Run)
+	watcherLatests["related"] = rp.Latest
 
 	if flags.enablesync {
 		g.crons = append(g.crons, syncs...)
@@ -145,8 +155,11 @@
 		g.crons = append(g.crons, changes...)
 	}
 
+	// Install a metric that observes the latest values of the watchers each time metrics are sampled.
+	g.registerWatcherMetric(watcherLatests)
+
 	g.serveHTTP()
-	log.Printf("serving %s\n", g.addr)
+	log.Printf("serving %s", g.addr)
 
 	if !g.cloud {
 		// Simulate Cloud Scheduler.
@@ -219,11 +232,17 @@
 	}
 	g.secret = sdb
 
-	shutdown, err = gcpmetrics.Init(g.ctx, g.slog, flags.project)
+	// Initialize metric collection.
+	mp, err := gcpmetrics.NewMeterProvider(g.ctx, g.slog, flags.project)
 	if err != nil {
 		log.Fatal(err)
 	}
-	return shutdown
+	g.meter = mp.Meter("gcp")
+	return func() {
+		if err := mp.Shutdown(g.ctx); err != nil {
+			log.Fatal(err)
+		}
+	}
 }
 
 // searchLoop runs an interactive search loop.
@@ -250,7 +269,7 @@
 
 // serveHTTP serves HTTP endpoints for Gaby.
 func (g *Gaby) serveHTTP() {
-	cronCounter := gcpmetrics.NewCounter(metricName("crons"), "number of /cron requests")
+	cronCounter := g.newCounter("crons", "number of /cron requests")
 
 	http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
 		fmt.Fprintf(w, "Gaby\n")
@@ -318,19 +337,6 @@
 	return os.Getenv("K_SERVICE") != "" && os.Getenv("K_REVISION") != ""
 }
 
-// metricName returns the full metric name for the given short name.
-// The names are chosen to display nicely on the Metric Explorer's "select a metric"
-// dropdown. Production metrics will group under "Gaby", while others will
-// have their own, distinct groups.
-func metricName(shortName string) string {
-	if flags.firestoredb == "prod" {
-		return "gaby/" + shortName
-	}
-	// Using a hyphen or slash after "gaby" puts the metric in the "Gaby" group.
-	// We want non-prod metrics to be in a different group.
-	return "gaby_" + flags.firestoredb + "/" + shortName
-}
-
 // Crawling parameters
 
 var godevAllow = []string{
diff --git a/internal/gaby/metrics.go b/internal/gaby/metrics.go
new file mode 100644
index 0000000..bff0b1f
--- /dev/null
+++ b/internal/gaby/metrics.go
@@ -0,0 +1,55 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"context"
+
+	"go.opentelemetry.io/otel/attribute"
+	ometric "go.opentelemetry.io/otel/metric"
+	"golang.org/x/oscar/internal/storage/timed"
+)
+
+// newCounter creates an integer counter instrument.
+// It panics if the counter cannot be created.
+func (g *Gaby) newCounter(name, description string) ometric.Int64Counter {
+	c, err := g.meter.Int64Counter(metricName(name), ometric.WithDescription(description))
+	if err != nil {
+		g.slog.Error("counter creation failed", "name", name)
+		panic(err)
+	}
+	return c
+}
+
+// registerWatcherMetric adds a metric called "watcher-latest" for the latest times of Watchers.
+// The latests map contains the functions to compute the latest times, each labeled
+// by a string which becomes the value of the "name" attribute in the metric.
+func (g *Gaby) registerWatcherMetric(latests map[string]func() timed.DBTime) {
+	_, err := g.meter.Int64ObservableGauge(metricName("watcher-latest"),
+		ometric.WithDescription("latest DBTime of watcher"),
+		ometric.WithInt64Callback(func(_ context.Context, observer ometric.Int64Observer) error {
+			for name, f := range latests {
+				observer.Observe(int64(f()), ometric.WithAttributes(attribute.String("name", name)))
+			}
+			return nil
+		}))
+	if err != nil {
+		g.slog.Error("watcher gauge creation failed")
+		panic(err)
+	}
+}
+
+// metricName returns the full metric name for the given short name.
+// The names are chosen to display nicely on the Metric Explorer's "select a metric"
+// dropdown. Production metrics will group under "Gaby", while others will
+// have their own, distinct groups.
+func metricName(shortName string) string {
+	if flags.firestoredb == "prod" {
+		return "gaby/" + shortName
+	}
+	// Using a hyphen or slash after "gaby" puts the metric in the "Gaby" group.
+	// We want non-prod metrics to be in a different group.
+	return "gaby_" + flags.firestoredb + "/" + shortName
+}
diff --git a/internal/gcp/gcpmetrics/metrics.go b/internal/gcp/gcpmetrics/metrics.go
index 400fb0c..e90b2ba 100644
--- a/internal/gcp/gcpmetrics/metrics.go
+++ b/internal/gcp/gcpmetrics/metrics.go
@@ -16,17 +16,14 @@
 
 	gcpexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
 	"go.opentelemetry.io/contrib/detectors/gcp"
-	ometric "go.opentelemetry.io/otel/metric"
 	sdkmetric "go.opentelemetry.io/otel/sdk/metric"
 	"go.opentelemetry.io/otel/sdk/metric/metricdata"
 	"go.opentelemetry.io/otel/sdk/resource"
 )
 
-// The meter for creating metric instruments (counters and so on).
-var meter ometric.Meter
-var logger *slog.Logger
-
-func Init(ctx context.Context, lg *slog.Logger, projectID string) (shutdown func(), err error) {
+// NewMeterProvider creates an [sdkmetric.MeterProvider] that exports metrics to GCP's Monitoring service.
+// Call Shutdown on the MeterProvider after use.
+func NewMeterProvider(ctx context.Context, lg *slog.Logger, projectID string) (*sdkmetric.MeterProvider, error) {
 	// Create an exporter to send metrics to the GCP Monitoring service.
 	ex, err := gcpexporter.New(gcpexporter.WithProjectID(projectID))
 	if err != nil {
@@ -48,28 +45,10 @@
 		return nil, err
 	}
 	lg.Info("creating OTel MeterProvider", "resource", res.String())
-	mp := sdkmetric.NewMeterProvider(
+	return sdkmetric.NewMeterProvider(
 		sdkmetric.WithResource(res),
 		sdkmetric.WithReader(r),
-	)
-	logger = lg
-	meter = mp.Meter("gcp")
-	return func() {
-		if err := mp.Shutdown(context.Background()); err != nil {
-			lg.Warn("metric shutdown failed", "err", err)
-		}
-	}, nil
-}
-
-// NewCounter creates an integer counter instrument.
-// It panics if the counter cannot be created.
-func NewCounter(name, description string) ometric.Int64Counter {
-	c, err := meter.Int64Counter(name, ometric.WithDescription(description))
-	if err != nil {
-		logger.Error("counter creation failed", "name", name)
-		panic(err)
-	}
-	return c
+	), nil
 }
 
 // A loggingExporter wraps an [sdkmetric.Exporter] with logging.
diff --git a/internal/gcp/gcpmetrics/metrics_test.go b/internal/gcp/gcpmetrics/metrics_test.go
index d1eb892..88fc934 100644
--- a/internal/gcp/gcpmetrics/metrics_test.go
+++ b/internal/gcp/gcpmetrics/metrics_test.go
@@ -22,18 +22,21 @@
 	}
 	ctx := context.Background()
 
-	shutdown, err := Init(ctx, testutil.Slogger(t), *project)
+	mp, err := NewMeterProvider(ctx, testutil.Slogger(t), *project)
 	if err != nil {
 		t.Fatal(err)
 	}
-	c := NewCounter("test-counter", "a counter for testing")
+	meter := mp.Meter("test")
+	c, err := meter.Int64Counter("test-counter")
 	if err != nil {
 		t.Fatal(err)
 	}
 	c.Add(ctx, 1)
 
 	// Force an export even if the interval hasn't passed.
-	shutdown()
+	if err := mp.Shutdown(ctx); err != nil {
+		t.Fatal(err)
+	}
 
 	if g, w := totalExports.Load(), int64(1); g != w {
 		t.Errorf("total exports: got %d, want %d", g, w)
diff --git a/internal/githubdocs/sync.go b/internal/githubdocs/sync.go
index 3a4d6fb..6ef3007 100644
--- a/internal/githubdocs/sync.go
+++ b/internal/githubdocs/sync.go
@@ -13,6 +13,7 @@
 
 	"golang.org/x/oscar/internal/docs"
 	"golang.org/x/oscar/internal/github"
+	"golang.org/x/oscar/internal/storage/timed"
 )
 
 // Sync writes to dc docs corresponding to each issue in gh that is
@@ -48,6 +49,11 @@
 	gh.EventWatcher("githubdocs").Restart()
 }
 
+// Latest returns the latest known DBTime marked old by the client's Watcher.
+func Latest(gh *github.Client) timed.DBTime {
+	return gh.EventWatcher("githubdocs").Latest()
+}
+
 // cleanTitle should clean the title for indexing.
 // For now we assume the LLM is good enough at Markdown not to bother.
 func cleanTitle(title string) string {
diff --git a/internal/related/related.go b/internal/related/related.go
index 0aa5463..131899e 100644
--- a/internal/related/related.go
+++ b/internal/related/related.go
@@ -250,6 +250,11 @@
 	}
 }
 
+// Latest returns the latest known DBTime marked old by the Poster's Watcher.
+func (p *Poster) Latest() timed.DBTime {
+	return p.watcher.Latest()
+}
+
 var markdownEscaper = strings.NewReplacer(
 	"_", `\_`,
 	"*", `\*`,
diff --git a/internal/storage/timed/timed.go b/internal/storage/timed/timed.go
index 647a7e1..038ef17 100644
--- a/internal/storage/timed/timed.go
+++ b/internal/storage/timed/timed.go
@@ -273,6 +273,7 @@
 	kind   string
 	decode func(*Entry) T
 	locked atomic.Bool
+	latest atomic.Int64 // highest known DBTime marked old, for fast retrieval by metrics
 }
 
 // NewWatcher returns a new named Watcher reading keys of the given kind from db.
@@ -320,6 +321,9 @@
 			w.db.Panic("watcher decode", "dval", storage.Fmt(dval), "err", err)
 		}
 	}
+	if w.latest.Load() < t {
+		w.latest.Store(t)
+	}
 	return DBTime(t)
 }
 
@@ -388,6 +392,7 @@
 		return
 	}
 	w.db.Set(w.dkey, ordered.Encode(int64(t)))
+	w.latest.Store(int64(t))
 }
 
 // Flush flushes the definition of recent (changed by MarkOld) to the database.
@@ -396,3 +401,13 @@
 func (w *Watcher[T]) Flush() {
 	w.db.Flush()
 }
+
+// Latest returns the latest known DBTime marked old by the Watcher.
+// It does not require the lock to be held.
+//
+// Latest may return a stale value. In particular, it will return
+// 0 until the first call to [Watcher.Recent] or [Watcher.MarkOld] in
+// the process.
+func (w *Watcher[T]) Latest() DBTime {
+	return DBTime(w.latest.Load())
+}
diff --git a/internal/storage/timed/timed_test.go b/internal/storage/timed/timed_test.go
index d21dbce..3264795 100644
--- a/internal/storage/timed/timed_test.go
+++ b/internal/storage/timed/timed_test.go
@@ -116,6 +116,10 @@
 	last = 0
 	keys = nil
 	w := NewWatcher(db, "name", "kind", func(e *Entry) *Entry { return e })
+	if latest, want := w.Latest(), DBTime(0); latest != want {
+		t.Errorf("Watcher.Latest() = %d, want %d", latest, want)
+	}
+
 	for e := range w.Recent() {
 		do(e)
 		w.MarkOld(e.ModTime)
@@ -124,6 +128,9 @@
 	if want := []string{"k1", "k3", "k2"}; !slices.Equal(keys, want) {
 		t.Errorf("Watcher.Recent() = %v, want %v", keys, want)
 	}
+	if got := w.Latest(); got != last {
+		t.Errorf("Watcher.Latest() = %d, want %d", got, last)
+	}
 
 	// Timed iteration with break.
 	last = 0