internal/worker: record processing lag metric

Create a metric for the age of the oldest unprocessed module, and record it periodically.

We choose to record it whenever the /poll endpoint is hit. Since that
endpoint is hit periodically (once a minute) by Cloud Scheduler, we
can be sure that we will get exactly one data point per minute. The
downside is that we won't have finer granularity than that, but since
the main purpose of this metric is to detect large lags, that doesn't
matter.

Change-Id: I7ce3f043cceaf567fb0c871676063eda91bb2420
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/278614
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
Reviewed-by: Robert Findley <rfindley@google.com>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 2016c8d..36fda83 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -114,6 +114,7 @@
 
 	views := append(dcensus.ServerViews,
 		worker.EnqueueResponseCount,
+		worker.ProcessingLag,
 		fetch.FetchLatencyDistribution,
 		fetch.FetchResponseCount,
 		fetch.SheddedFetchCount,
diff --git a/internal/worker/metrics.go b/internal/worker/metrics.go
index d6812cd..bcdc0a0 100644
--- a/internal/worker/metrics.go
+++ b/internal/worker/metrics.go
@@ -7,6 +7,7 @@
 import (
 	"context"
 	"strconv"
+	"time"
 
 	"go.opencensus.io/stats"
 	"go.opencensus.io/stats/view"
@@ -30,6 +31,18 @@
 		Description: "Worker enqueue request count",
 		TagKeys:     []tag.Key{keyEnqueueStatus},
 	}
+
+	processingLag = stats.Int64(
+		"go-discovery/worker_processing_lag",
+		"Time from appearing in the index to being processed.",
+		stats.UnitSeconds,
+	)
+	ProcessingLag = &view.View{
+		Name:        "go-discovery/worker_processing_lag",
+		Measure:     processingLag,
+		Aggregation: view.LastValue(),
+		Description: "worker processing lag",
+	}
 )
 
 func recordEnqueue(ctx context.Context, status int) {
@@ -37,3 +50,7 @@
 		[]tag.Mutator{tag.Upsert(keyEnqueueStatus, strconv.Itoa(status))},
 		enqueueStatus.M(int64(status)))
 }
+
+func recordProcessingLag(ctx context.Context, d time.Duration) {
+	stats.Record(ctx, processingLag.M(d.Milliseconds()/1000))
+}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index d2bf348..b53d8cc 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -336,9 +336,26 @@
 		return err
 	}
 	log.Infof(ctx, "Inserted %d modules from the index", len(modules))
+	s.computeProcessingLag(ctx)
 	return nil
 }
 
+func (s *Server) computeProcessingLag(ctx context.Context) {
+	ot, err := s.db.StalenessTimestamp(ctx)
+	if errors.Is(err, derrors.NotFound) {
+		recordProcessingLag(ctx, 0)
+	} else if err != nil {
+		log.Warningf(ctx, "StalenessTimestamp: %v", err)
+		return
+	} else {
+		// If the times on this machine and the machine that wrote the index
+		// timestamp into the DB are out of sync, then the difference we compute
+		// here will be off. But that is unlikely since both machines are
+		// running on GCP.
+		recordProcessingLag(ctx, time.Since(ot))
+	}
+}
+
 // handleEnqueue queries the module_version_states table for the next batch of
 // module versions to process, and enqueues them for processing. Note that this
 // may cause duplicate processing.