blob: 5c183d41fa6bf6d143af7c062bca917f673b8618 [file] [log] [blame]
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package worker
import (
"context"
"strconv"
"time"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"golang.org/x/pkgsite/internal/postgres"
)
var (
// keyEnqueueStatus is a census tag used to keep track of the status
// of the modules being enqueued.
keyEnqueueStatus = tag.MustNewKey("enqueue.status")
enqueueStatus = stats.Int64(
"go-discovery/worker_enqueue_count",
"The status of a module version enqueued to Cloud Tasks.",
stats.UnitDimensionless,
)
// EnqueueResponseCount counts worker enqueue responses by response type.
EnqueueResponseCount = &view.View{
Name: "go-discovery/worker-enqueue/count",
Measure: enqueueStatus,
Aggregation: view.Count(),
Description: "Worker enqueue request count",
TagKeys: []tag.Key{keyEnqueueStatus},
}
processingLag = stats.Int64(
"go-discovery/worker_processing_lag",
"Time from appearing in the index to being processed.",
stats.UnitSeconds,
)
ProcessingLag = &view.View{
Name: "go-discovery/worker_processing_lag",
Measure: processingLag,
Aggregation: view.LastValue(),
Description: "worker processing lag",
}
unprocessedModules = stats.Int64(
"go-discovery/unprocessed_modules_count",
"Number of unprocessed modules (status = 0 or >= 500).",
stats.UnitDimensionless,
)
UnprocessedModules = &view.View{
Name: "go-discovery/unprocessed_modules/count",
Measure: unprocessedModules,
Aggregation: view.LastValue(),
Description: "number of unprocessed modules",
}
unprocessedNewModules = stats.Int64(
"go-discovery/unprocessed_new_modules_count",
"Number of new unprocessed modules (status = 0 or 500).",
stats.UnitDimensionless,
)
UnprocessedNewModules = &view.View{
Name: "go-discovery/unprocessed_new_modules/count",
Measure: unprocessedNewModules,
Aggregation: view.LastValue(),
Description: "number of unprocessed new modules",
}
dbProcesses = stats.Int64(
"go-discovery/db_processes_count",
"Number of active DB worker processes",
stats.UnitDimensionless,
)
DBProcesses = &view.View{
Name: "go-discovery/db_processes/count",
Measure: dbProcesses,
Aggregation: view.LastValue(),
Description: "number of active DB worker processes",
}
dbWaitingProcesses = stats.Int64(
"go-discovery/db_waiting_processes_count",
"Number of active DB worker processes waiting for locks",
stats.UnitDimensionless,
)
DBWaitingProcesses = &view.View{
Name: "go-discovery/db_waiting_processes/count",
Measure: dbWaitingProcesses,
Aggregation: view.LastValue(),
Description: "number of waiting DB worker processes",
}
)
func recordEnqueue(ctx context.Context, status int) {
stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Upsert(keyEnqueueStatus, strconv.Itoa(status))},
enqueueStatus.M(int64(status)))
}
func recordProcessingLag(ctx context.Context, d time.Duration) {
stats.Record(ctx, processingLag.M(d.Milliseconds()/1000))
}
func recordUnprocessedModules(ctx context.Context, total, new int) {
stats.Record(ctx, unprocessedModules.M(int64(total)))
stats.Record(ctx, unprocessedNewModules.M(int64(new)))
}
func recordWorkerDBInfo(ctx context.Context, dbi *postgres.UserInfo) {
if dbi != nil {
stats.Record(ctx, dbProcesses.M(int64(dbi.NumTotal)))
stats.Record(ctx, dbWaitingProcesses.M(int64(dbi.NumWaiting)))
}
}