{cmd,internal}/worker: export DB info metrics

Add metrics for the numbers of active and waiting DB processes.

For golang/go#48010

Change-Id: Ia3c14e492b29c07371ee903182c7ba55f04c584a
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/349310
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 821f11a..6633a97 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -115,6 +115,8 @@
 		worker.ProcessingLag,
 		worker.UnprocessedModules,
 		worker.UnprocessedNewModules,
+		worker.DBProcesses,
+		worker.DBWaitingProcesses,
 		worker.SheddedFetchCount,
 		worker.FetchLatencyDistribution,
 		worker.FetchResponseCount,
diff --git a/internal/worker/metrics.go b/internal/worker/metrics.go
index ff155df..5c183d4 100644
--- a/internal/worker/metrics.go
+++ b/internal/worker/metrics.go
@@ -12,6 +12,7 @@
 	"go.opencensus.io/stats"
 	"go.opencensus.io/stats/view"
 	"go.opencensus.io/tag"
+	"golang.org/x/pkgsite/internal/postgres"
 )
 
 var (
@@ -69,6 +70,32 @@
 		Aggregation: view.LastValue(),
 		Description: "number of unprocessed new modules",
 	}
+
+	dbProcesses = stats.Int64(
+		"go-discovery/db_processes_count",
+		"Number of active DB worker processes",
+		stats.UnitDimensionless,
+	)
+
+	DBProcesses = &view.View{
+		Name:        "go-discovery/db_processes/count",
+		Measure:     dbProcesses,
+		Aggregation: view.LastValue(),
+		Description: "number of active DB worker processes",
+	}
+
+	dbWaitingProcesses = stats.Int64(
+		"go-discovery/db_waiting_processes_count",
+		"Number of active DB worker processes waiting for locks",
+		stats.UnitDimensionless,
+	)
+
+	DBWaitingProcesses = &view.View{
+		Name:        "go-discovery/db_waiting_processes/count",
+		Measure:     dbWaitingProcesses,
+		Aggregation: view.LastValue(),
+		Description: "number of waiting DB worker processes",
+	}
 )
 
 func recordEnqueue(ctx context.Context, status int) {
@@ -85,3 +112,10 @@
 	stats.Record(ctx, unprocessedModules.M(int64(total)))
 	stats.Record(ctx, unprocessedNewModules.M(int64(new)))
 }
+
+func recordWorkerDBInfo(ctx context.Context, dbi *postgres.UserInfo) {
+	if dbi != nil {
+		stats.Record(ctx, dbProcesses.M(int64(dbi.NumTotal)))
+		stats.Record(ctx, dbWaitingProcesses.M(int64(dbi.NumWaiting)))
+	}
+}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 14ba178..33debc8 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -96,7 +96,7 @@
 	}
 
 	// Update information about DB locks, etc. every few seconds.
-	p := poller.New(nil, func(ctx context.Context) (interface{}, error) {
+	p := poller.New(&postgres.UserInfo{}, func(ctx context.Context) (interface{}, error) {
 		return scfg.DB.GetUserInfo(ctx, "worker")
 	}, func(err error) { log.Error(context.Background(), err) })
 	p.Start(context.Background(), 10*time.Second)
@@ -394,6 +394,7 @@
 	log.Infof(ctx, "Inserted %d modules from the index", len(modules))
 	s.computeProcessingLag(ctx)
 	s.computeUnprocessedModules(ctx)
+	recordWorkerDBInfo(ctx, s.workerDBInfo())
 	return nil
 }