{cmd,internal}/worker: export DB info metrics
Add metrics for the numbers of active and waiting DB processes.
For golang/go#48010
Change-Id: Ia3c14e492b29c07371ee903182c7ba55f04c584a
Reviewed-on: https://go-review.googlesource.com/c/pkgsite/+/349310
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jamal Carvalho <jamal@golang.org>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 821f11a..6633a97 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -115,6 +115,8 @@
worker.ProcessingLag,
worker.UnprocessedModules,
worker.UnprocessedNewModules,
+ worker.DBProcesses,
+ worker.DBWaitingProcesses,
worker.SheddedFetchCount,
worker.FetchLatencyDistribution,
worker.FetchResponseCount,
diff --git a/internal/worker/metrics.go b/internal/worker/metrics.go
index ff155df..5c183d4 100644
--- a/internal/worker/metrics.go
+++ b/internal/worker/metrics.go
@@ -12,6 +12,7 @@
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
+ "golang.org/x/pkgsite/internal/postgres"
)
var (
@@ -69,6 +70,32 @@
Aggregation: view.LastValue(),
Description: "number of unprocessed new modules",
}
+
+ dbProcesses = stats.Int64(
+ "go-discovery/db_processes_count",
+ "Number of active DB worker processes",
+ stats.UnitDimensionless,
+ )
+
+ DBProcesses = &view.View{
+ Name: "go-discovery/db_processes/count",
+ Measure: dbProcesses,
+ Aggregation: view.LastValue(),
+ Description: "number of active DB worker processes",
+ }
+
+ dbWaitingProcesses = stats.Int64(
+ "go-discovery/db_waiting_processes_count",
+ "Number of active DB worker processes waiting for locks",
+ stats.UnitDimensionless,
+ )
+
+ DBWaitingProcesses = &view.View{
+ Name: "go-discovery/db_waiting_processes/count",
+ Measure: dbWaitingProcesses,
+ Aggregation: view.LastValue(),
+ Description: "number of waiting DB worker processes",
+ }
)
func recordEnqueue(ctx context.Context, status int) {
@@ -85,3 +112,10 @@
stats.Record(ctx, unprocessedModules.M(int64(total)))
stats.Record(ctx, unprocessedNewModules.M(int64(new)))
}
+
+func recordWorkerDBInfo(ctx context.Context, dbi *postgres.UserInfo) {
+ if dbi != nil {
+ stats.Record(ctx, dbProcesses.M(int64(dbi.NumTotal)))
+ stats.Record(ctx, dbWaitingProcesses.M(int64(dbi.NumWaiting)))
+ }
+}
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 14ba178..33debc8 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -96,7 +96,7 @@
}
// Update information about DB locks, etc. every few seconds.
- p := poller.New(nil, func(ctx context.Context) (interface{}, error) {
+ p := poller.New(&postgres.UserInfo{}, func(ctx context.Context) (interface{}, error) {
return scfg.DB.GetUserInfo(ctx, "worker")
}, func(err error) { log.Error(context.Background(), err) })
p.Start(context.Background(), 10*time.Second)
@@ -394,6 +394,7 @@
log.Infof(ctx, "Inserted %d modules from the index", len(modules))
s.computeProcessingLag(ctx)
s.computeUnprocessedModules(ctx)
+ recordWorkerDBInfo(ctx, s.workerDBInfo())
return nil
}