internal/observe, etc.: remove event tracing

Use OpenTelemetry tracing directly instead of the exp/event package.

This removes all dependence on exp/event's OpenTelemetry code.

Change-Id: Id2846db3d2c35efea66244c314385b59ef605d9d
Reviewed-on: https://go-review.googlesource.com/c/vulndb/+/552656
Reviewed-by: Tatiana Bradley <tatianabradley@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/go.mod b/go.mod
index 67c6747..6f2b609 100644
--- a/go.mod
+++ b/go.mod
@@ -24,6 +24,7 @@
 	github.com/shurcooL/githubv4 v0.0.0-20231126234147-1cffa1f02456
 	go.opentelemetry.io/otel v1.21.0
 	go.opentelemetry.io/otel/sdk v1.21.0
+	go.opentelemetry.io/otel/trace v1.21.0
 	golang.org/x/exp v0.0.0-20231219180239-dc181d75b848
 	golang.org/x/exp/event v0.0.0-20231219180239-dc181d75b848
 	golang.org/x/mod v0.14.0
@@ -77,7 +78,6 @@
 	go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.46.1 // indirect
 	go.opentelemetry.io/otel/metric v1.21.0 // indirect
 	go.opentelemetry.io/otel/sdk/metric v1.21.0 // indirect
-	go.opentelemetry.io/otel/trace v1.21.0 // indirect
 	golang.org/x/crypto v0.16.0 // indirect
 	golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
 	golang.org/x/net v0.19.0 // indirect
diff --git a/internal/gitrepo/gitrepo.go b/internal/gitrepo/gitrepo.go
index 30077b9..ecaa0a8 100644
--- a/internal/gitrepo/gitrepo.go
+++ b/internal/gitrepo/gitrepo.go
@@ -17,17 +17,17 @@
 	"github.com/go-git/go-git/v5/plumbing"
 	"github.com/go-git/go-git/v5/plumbing/object"
 	"github.com/go-git/go-git/v5/storage/memory"
-	"golang.org/x/exp/event"
 	"golang.org/x/tools/txtar"
 	"golang.org/x/vulndb/internal/derrors"
+	"golang.org/x/vulndb/internal/observe"
 	"golang.org/x/vulndb/internal/worker/log"
 )
 
 // Clone returns a bare repo by cloning the repo at repoURL.
 func Clone(ctx context.Context, repoURL string) (repo *git.Repository, err error) {
 	defer derrors.Wrap(&err, "gitrepo.Clone(%q)", repoURL)
-	ctx = event.Start(ctx, "gitrepo.Clone")
-	defer event.End(ctx)
+	ctx, span := observe.Start(ctx, "gitrepo.Clone")
+	defer span.End()
 
 	return CloneAt(ctx, repoURL, plumbing.HEAD)
 }
@@ -47,8 +47,8 @@
 // PlainClone returns a (non-bare) repo with its history by cloning the repo at repoURL.
 func PlainClone(ctx context.Context, dir, repoURL string) (repo *git.Repository, err error) {
 	defer derrors.Wrap(&err, "gitrepo.PlainClone(%q)", repoURL)
-	ctx = event.Start(ctx, "gitrepo.PlainClone")
-	defer event.End(ctx)
+	ctx, span := observe.Start(ctx, "gitrepo.PlainClone")
+	defer span.End()
 
 	log.Infof(ctx, "Plain cloning repo %q at HEAD", repoURL)
 	return git.PlainCloneContext(ctx, dir, false, &git.CloneOptions{
@@ -63,8 +63,8 @@
 // Open returns a repo by opening the repo at the local path dirpath.
 func Open(ctx context.Context, dirpath string) (repo *git.Repository, err error) {
 	defer derrors.Wrap(&err, "gitrepo.Open(%q)", dirpath)
-	ctx = event.Start(ctx, "gitrepo.Open")
-	defer event.End(ctx)
+	ctx, span := observe.Start(ctx, "gitrepo.Open")
+	defer span.End()
 
 	log.Infof(ctx, "Opening repo at %q", dirpath)
 	repo, err = git.PlainOpen(dirpath)
diff --git a/internal/observe/observe.go b/internal/observe/observe.go
index 5568306..19df286 100644
--- a/internal/observe/observe.go
+++ b/internal/observe/observe.go
@@ -3,7 +3,7 @@
 // license that can be found in the LICENSE file.
 
 // Package observe provides metric and tracing support for Go servers.
-// It uses OpenTelemetry and the golang.org/x/exp/events package.
+// It uses OpenTelemetry.
 package observe
 
 import (
@@ -21,14 +21,15 @@
 	"go.opentelemetry.io/otel/propagation"
 	"go.opentelemetry.io/otel/sdk/instrumentation"
 	sdktrace "go.opentelemetry.io/otel/sdk/trace"
-	eotel "golang.org/x/exp/event/otel"
+	"go.opentelemetry.io/otel/trace"
+	tnoop "go.opentelemetry.io/otel/trace/noop"
 )
 
 // An Observer handles tracing and metrics exporting.
 type Observer struct {
 	ctx            context.Context
 	tracerProvider *sdktrace.TracerProvider
-	traceHandler   *eotel.TraceHandler
+	tracer         trace.Tracer
 	propagator     propagation.TextMapPropagator
 
 	// LogHandlerFunc is invoked in [Observer.Observe] to obtain an
@@ -49,6 +50,12 @@
 	if err != nil {
 		return nil, err
 	}
+	tp := sdktrace.NewTracerProvider(
+		// Enable tracing if there is no incoming request, or if the incoming
+		// request is sampled.
+		sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.AlwaysSample())),
+		sdktrace.WithBatcher(exporter))
+
 	// Create exporter.
 	mex, err := mexporter.New(mexporter.WithProjectID(projectID))
 	if err != nil {
@@ -60,15 +67,10 @@
 		return !strings.HasPrefix(name, "runtime/")
 	})
 
-	tp := sdktrace.NewTracerProvider(
-		// Enable tracing if there is no incoming request, or if the incoming
-		// request is sampled.
-		sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.AlwaysSample())),
-		sdktrace.WithBatcher(exporter))
 	return &Observer{
 		ctx:            ctx,
 		tracerProvider: tp,
-		traceHandler:   eotel.NewTraceHandler(tp.Tracer(serverName)),
+		tracer:         tp.Tracer(serverName),
 		// The propagator extracts incoming trace IDs so that we can connect our trace spans
 		// to the incoming ones constructed by Cloud Run.
 		propagator: propagation.NewCompositeTextMapPropagator(
@@ -78,6 +80,8 @@
 	}, nil
 }
 
+type key struct{}
+
 // Observe adds metrics and tracing to an http.Handler.
 func (o *Observer) Observe(h http.Handler) http.Handler {
 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -88,6 +92,7 @@
 		exporter := event.NewExporter(eventHandler{o, otherHandler}, nil)
 		ctx := event.WithExporter(r.Context(), exporter)
 		ctx = o.propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
+		ctx = context.WithValue(ctx, key{}, o)
 		defer o.tracerProvider.ForceFlush(o.ctx)
 		h.ServeHTTP(w, r.WithContext(ctx))
 	})
@@ -101,7 +106,14 @@
 // Event implements event.Handler.
 func (h eventHandler) Event(ctx context.Context, ev *event.Event) context.Context {
 	if h.eh != nil {
-		ctx = h.eh.Event(ctx, ev)
+		return h.eh.Event(ctx, ev)
 	}
-	return h.o.traceHandler.Event(ctx, ev)
+	return ctx
+}
+
+func Start(ctx context.Context, name string) (context.Context, trace.Span) {
+	if obs, ok := ctx.Value(key{}).(*Observer); ok {
+		return obs.tracer.Start(ctx, name)
+	}
+	return ctx, tnoop.Span{}
 }
diff --git a/internal/worker/false_positives.go b/internal/worker/false_positives.go
index ec2b640..330f88e 100644
--- a/internal/worker/false_positives.go
+++ b/internal/worker/false_positives.go
@@ -7,16 +7,16 @@
 import (
 	"context"
 
-	"golang.org/x/exp/event"
 	"golang.org/x/vulndb/internal/derrors"
+	"golang.org/x/vulndb/internal/observe"
 	"golang.org/x/vulndb/internal/worker/store"
 )
 
 // updateFalsePositives makes sure the store reflects the list of false positives.
 func updateFalsePositives(ctx context.Context, st store.Store) (err error) {
 	defer derrors.Wrap(&err, "updateFalsePositives")
-	ctx = event.Start(ctx, "updateFalsePositives")
-	defer event.End(ctx)
+	ctx, span := observe.Start(ctx, "updateFalsePositives")
+	defer span.End()
 
 	for i := 0; i < len(falsePositives); i += maxTransactionWrites {
 		j := i + maxTransactionWrites
diff --git a/internal/worker/update.go b/internal/worker/update.go
index 07425a1..9cbe4d5 100644
--- a/internal/worker/update.go
+++ b/internal/worker/update.go
@@ -14,11 +14,11 @@
 
 	"github.com/go-git/go-git/v5"
 	"github.com/go-git/go-git/v5/plumbing/object"
-	"golang.org/x/exp/event"
 	"golang.org/x/vulndb/internal/cvelistrepo"
 	"golang.org/x/vulndb/internal/cveschema"
 	"golang.org/x/vulndb/internal/derrors"
 	"golang.org/x/vulndb/internal/ghsa"
+	"golang.org/x/vulndb/internal/observe"
 	"golang.org/x/vulndb/internal/worker/log"
 	"golang.org/x/vulndb/internal/worker/store"
 )
@@ -69,8 +69,8 @@
 	// transaction can do, so the CVE files in the repo are processed in
 	// batches, one transaction per batch.
 	defer derrors.Wrap(&err, "cveUpdater.update(%s)", u.commit.Hash)
-	ctx = event.Start(ctx, "cveUpdater.update")
-	defer event.End(ctx)
+	ctx, span := observe.Start(ctx, "cveUpdater.update")
+	defer span.End()
 
 	defer func() {
 		if err == nil {
@@ -476,8 +476,8 @@
 
 func updateGHSAs(ctx context.Context, listSAs GHSAListFunc, since time.Time, st store.Store) (stats UpdateGHSAStats, err error) {
 	defer derrors.Wrap(&err, "updateGHSAs(%s)", since)
-	ctx = event.Start(ctx, "updateGHSAs")
-	defer event.End(ctx)
+	ctx, span := observe.Start(ctx, "updateGHSAs")
+	defer span.End()
 
 	defer func() {
 		if err == nil {
diff --git a/internal/worker/worker.go b/internal/worker/worker.go
index f3d969b..c79d8dc 100644
--- a/internal/worker/worker.go
+++ b/internal/worker/worker.go
@@ -17,7 +17,6 @@
 
 	"github.com/go-git/go-git/v5/plumbing"
 	"github.com/go-git/go-git/v5/plumbing/object"
-	"golang.org/x/exp/event"
 	"golang.org/x/exp/maps"
 	"golang.org/x/exp/slices"
 	"golang.org/x/time/rate"
@@ -27,6 +26,7 @@
 	"golang.org/x/vulndb/internal/ghsa"
 	"golang.org/x/vulndb/internal/gitrepo"
 	"golang.org/x/vulndb/internal/issues"
+	"golang.org/x/vulndb/internal/observe"
 	"golang.org/x/vulndb/internal/proxy"
 	"golang.org/x/vulndb/internal/report"
 	"golang.org/x/vulndb/internal/worker/log"
@@ -85,8 +85,8 @@
 // It verifies that there is not an update currently in progress,
 // and it makes sure that the update is to a more recent commit.
 func checkCVEUpdate(ctx context.Context, commit *object.Commit, st store.Store) error {
-	ctx = event.Start(ctx, "checkUpdate")
-	defer event.End(ctx)
+	ctx, span := observe.Start(ctx, "checkUpdate")
+	defer span.End()
 
 	urs, err := st.ListCommitUpdateRecords(ctx, 1)
 	if err != nil {
@@ -175,8 +175,8 @@
 // CreateIssues creates issues on the x/vulndb issue tracker for allReports.
 func CreateIssues(ctx context.Context, st store.Store, client *issues.Client, pc *proxy.Client, allReports map[string]*report.Report, limit int) (err error) {
 	defer derrors.Wrap(&err, "CreateIssues(destination: %s)", client.Destination())
-	ctx = event.Start(ctx, "CreateIssues")
-	defer event.End(ctx)
+	ctx, span := observe.Start(ctx, "CreateIssues")
+	defer span.End()
 
 	if err := createCVEIssues(ctx, st, client, pc, allReports, limit); err != nil {
 		return err