internal/observe, etc.: remove event tracing
Use OpenTelemetry tracing directly instead of the exp/event package.
This removes all dependence on exp/event's OpenTelemetry code.
Change-Id: Id2846db3d2c35efea66244c314385b59ef605d9d
Reviewed-on: https://go-review.googlesource.com/c/vulndb/+/552656
Reviewed-by: Tatiana Bradley <tatianabradley@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/go.mod b/go.mod
index 67c6747..6f2b609 100644
--- a/go.mod
+++ b/go.mod
@@ -24,6 +24,7 @@
github.com/shurcooL/githubv4 v0.0.0-20231126234147-1cffa1f02456
go.opentelemetry.io/otel v1.21.0
go.opentelemetry.io/otel/sdk v1.21.0
+ go.opentelemetry.io/otel/trace v1.21.0
golang.org/x/exp v0.0.0-20231219180239-dc181d75b848
golang.org/x/exp/event v0.0.0-20231219180239-dc181d75b848
golang.org/x/mod v0.14.0
@@ -77,7 +78,6 @@
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
- go.opentelemetry.io/otel/trace v1.21.0 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
golang.org/x/net v0.19.0 // indirect
diff --git a/internal/gitrepo/gitrepo.go b/internal/gitrepo/gitrepo.go
index 30077b9..ecaa0a8 100644
--- a/internal/gitrepo/gitrepo.go
+++ b/internal/gitrepo/gitrepo.go
@@ -17,17 +17,17 @@
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/object"
"github.com/go-git/go-git/v5/storage/memory"
- "golang.org/x/exp/event"
"golang.org/x/tools/txtar"
"golang.org/x/vulndb/internal/derrors"
+ "golang.org/x/vulndb/internal/observe"
"golang.org/x/vulndb/internal/worker/log"
)
// Clone returns a bare repo by cloning the repo at repoURL.
func Clone(ctx context.Context, repoURL string) (repo *git.Repository, err error) {
defer derrors.Wrap(&err, "gitrepo.Clone(%q)", repoURL)
- ctx = event.Start(ctx, "gitrepo.Clone")
- defer event.End(ctx)
+ ctx, span := observe.Start(ctx, "gitrepo.Clone")
+ defer span.End()
return CloneAt(ctx, repoURL, plumbing.HEAD)
}
@@ -47,8 +47,8 @@
// PlainClone returns a (non-bare) repo with its history by cloning the repo at repoURL.
func PlainClone(ctx context.Context, dir, repoURL string) (repo *git.Repository, err error) {
defer derrors.Wrap(&err, "gitrepo.PlainClone(%q)", repoURL)
- ctx = event.Start(ctx, "gitrepo.PlainClone")
- defer event.End(ctx)
+ ctx, span := observe.Start(ctx, "gitrepo.PlainClone")
+ defer span.End()
log.Infof(ctx, "Plain cloning repo %q at HEAD", repoURL)
return git.PlainCloneContext(ctx, dir, false, &git.CloneOptions{
@@ -63,8 +63,8 @@
// Open returns a repo by opening the repo at the local path dirpath.
func Open(ctx context.Context, dirpath string) (repo *git.Repository, err error) {
defer derrors.Wrap(&err, "gitrepo.Open(%q)", dirpath)
- ctx = event.Start(ctx, "gitrepo.Open")
- defer event.End(ctx)
+ ctx, span := observe.Start(ctx, "gitrepo.Open")
+ defer span.End()
log.Infof(ctx, "Opening repo at %q", dirpath)
repo, err = git.PlainOpen(dirpath)
diff --git a/internal/observe/observe.go b/internal/observe/observe.go
index 5568306..19df286 100644
--- a/internal/observe/observe.go
+++ b/internal/observe/observe.go
@@ -3,7 +3,7 @@
// license that can be found in the LICENSE file.
// Package observe provides metric and tracing support for Go servers.
-// It uses OpenTelemetry and the golang.org/x/exp/events package.
+// It uses OpenTelemetry.
package observe
import (
@@ -21,14 +21,15 @@
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/instrumentation"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
- eotel "golang.org/x/exp/event/otel"
+ "go.opentelemetry.io/otel/trace"
+ tnoop "go.opentelemetry.io/otel/trace/noop"
)
// An Observer handles tracing and metrics exporting.
type Observer struct {
ctx context.Context
tracerProvider *sdktrace.TracerProvider
- traceHandler *eotel.TraceHandler
+ tracer trace.Tracer
propagator propagation.TextMapPropagator
// LogHandlerFunc is invoked in [Observer.Observe] to obtain an
@@ -49,6 +50,12 @@
if err != nil {
return nil, err
}
+ tp := sdktrace.NewTracerProvider(
+ // Enable tracing if there is no incoming request, or if the incoming
+ // request is sampled.
+ sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.AlwaysSample())),
+ sdktrace.WithBatcher(exporter))
+
// Create exporter.
mex, err := mexporter.New(mexporter.WithProjectID(projectID))
if err != nil {
@@ -60,15 +67,10 @@
return !strings.HasPrefix(name, "runtime/")
})
- tp := sdktrace.NewTracerProvider(
- // Enable tracing if there is no incoming request, or if the incoming
- // request is sampled.
- sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.AlwaysSample())),
- sdktrace.WithBatcher(exporter))
return &Observer{
ctx: ctx,
tracerProvider: tp,
- traceHandler: eotel.NewTraceHandler(tp.Tracer(serverName)),
+ tracer: tp.Tracer(serverName),
// The propagator extracts incoming trace IDs so that we can connect our trace spans
// to the incoming ones constructed by Cloud Run.
propagator: propagation.NewCompositeTextMapPropagator(
@@ -78,6 +80,8 @@
}, nil
}
+type key struct{}
+
// Observe adds metrics and tracing to an http.Handler.
func (o *Observer) Observe(h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -88,6 +92,7 @@
exporter := event.NewExporter(eventHandler{o, otherHandler}, nil)
ctx := event.WithExporter(r.Context(), exporter)
ctx = o.propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
+ ctx = context.WithValue(ctx, key{}, o)
defer o.tracerProvider.ForceFlush(o.ctx)
h.ServeHTTP(w, r.WithContext(ctx))
})
@@ -101,7 +106,14 @@
// Event implements event.Handler.
func (h eventHandler) Event(ctx context.Context, ev *event.Event) context.Context {
if h.eh != nil {
- ctx = h.eh.Event(ctx, ev)
+ return h.eh.Event(ctx, ev)
}
- return h.o.traceHandler.Event(ctx, ev)
+ return ctx
+}
+
+func Start(ctx context.Context, name string) (context.Context, trace.Span) {
+ if obs, ok := ctx.Value(key{}).(*Observer); ok {
+ return obs.tracer.Start(ctx, name)
+ }
+ return ctx, tnoop.Span{}
}
diff --git a/internal/worker/false_positives.go b/internal/worker/false_positives.go
index ec2b640..330f88e 100644
--- a/internal/worker/false_positives.go
+++ b/internal/worker/false_positives.go
@@ -7,16 +7,16 @@
import (
"context"
- "golang.org/x/exp/event"
"golang.org/x/vulndb/internal/derrors"
+ "golang.org/x/vulndb/internal/observe"
"golang.org/x/vulndb/internal/worker/store"
)
// updateFalsePositives makes sure the store reflects the list of false positives.
func updateFalsePositives(ctx context.Context, st store.Store) (err error) {
defer derrors.Wrap(&err, "updateFalsePositives")
- ctx = event.Start(ctx, "updateFalsePositives")
- defer event.End(ctx)
+ ctx, span := observe.Start(ctx, "updateFalsePositives")
+ defer span.End()
for i := 0; i < len(falsePositives); i += maxTransactionWrites {
j := i + maxTransactionWrites
diff --git a/internal/worker/update.go b/internal/worker/update.go
index 07425a1..9cbe4d5 100644
--- a/internal/worker/update.go
+++ b/internal/worker/update.go
@@ -14,11 +14,11 @@
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/object"
- "golang.org/x/exp/event"
"golang.org/x/vulndb/internal/cvelistrepo"
"golang.org/x/vulndb/internal/cveschema"
"golang.org/x/vulndb/internal/derrors"
"golang.org/x/vulndb/internal/ghsa"
+ "golang.org/x/vulndb/internal/observe"
"golang.org/x/vulndb/internal/worker/log"
"golang.org/x/vulndb/internal/worker/store"
)
@@ -69,8 +69,8 @@
// transaction can do, so the CVE files in the repo are processed in
// batches, one transaction per batch.
defer derrors.Wrap(&err, "cveUpdater.update(%s)", u.commit.Hash)
- ctx = event.Start(ctx, "cveUpdater.update")
- defer event.End(ctx)
+ ctx, span := observe.Start(ctx, "cveUpdater.update")
+ defer span.End()
defer func() {
if err == nil {
@@ -476,8 +476,8 @@
func updateGHSAs(ctx context.Context, listSAs GHSAListFunc, since time.Time, st store.Store) (stats UpdateGHSAStats, err error) {
defer derrors.Wrap(&err, "updateGHSAs(%s)", since)
- ctx = event.Start(ctx, "updateGHSAs")
- defer event.End(ctx)
+ ctx, span := observe.Start(ctx, "updateGHSAs")
+ defer span.End()
defer func() {
if err == nil {
diff --git a/internal/worker/worker.go b/internal/worker/worker.go
index f3d969b..c79d8dc 100644
--- a/internal/worker/worker.go
+++ b/internal/worker/worker.go
@@ -17,7 +17,6 @@
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/object"
- "golang.org/x/exp/event"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
"golang.org/x/time/rate"
@@ -27,6 +26,7 @@
"golang.org/x/vulndb/internal/ghsa"
"golang.org/x/vulndb/internal/gitrepo"
"golang.org/x/vulndb/internal/issues"
+ "golang.org/x/vulndb/internal/observe"
"golang.org/x/vulndb/internal/proxy"
"golang.org/x/vulndb/internal/report"
"golang.org/x/vulndb/internal/worker/log"
@@ -85,8 +85,8 @@
// It verifies that there is not an update currently in progress,
// and it makes sure that the update is to a more recent commit.
func checkCVEUpdate(ctx context.Context, commit *object.Commit, st store.Store) error {
- ctx = event.Start(ctx, "checkUpdate")
- defer event.End(ctx)
+ ctx, span := observe.Start(ctx, "checkUpdate")
+ defer span.End()
urs, err := st.ListCommitUpdateRecords(ctx, 1)
if err != nil {
@@ -175,8 +175,8 @@
// CreateIssues creates issues on the x/vulndb issue tracker for allReports.
func CreateIssues(ctx context.Context, st store.Store, client *issues.Client, pc *proxy.Client, allReports map[string]*report.Report, limit int) (err error) {
defer derrors.Wrap(&err, "CreateIssues(destination: %s)", client.Destination())
- ctx = event.Start(ctx, "CreateIssues")
- defer event.End(ctx)
+ ctx, span := observe.Start(ctx, "CreateIssues")
+ defer span.End()
if err := createCVEIssues(ctx, st, client, pc, allReports, limit); err != nil {
return err