internal/worker: add tracing

Add support for OpenTelemetry tracing of certain
important functions, using the event package.

We don't have a burning need for traces, but they
are nice to have, and this was an important exercise
for validating the approach of the event package.

Change-Id: I37d1f56f06f425f3b1eb885877a0d2f5ac85a098
Reviewed-on: https://go-review.googlesource.com/c/vulndb/+/380440
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/checks.bash b/checks.bash
index e0ed57e..323feba 100755
--- a/checks.bash
+++ b/checks.bash
@@ -61,10 +61,8 @@
 
 # check_unparam runs unparam on source files.
 check_unparam() {
-  if [[ $(go version) = *go1.17* ]]; then
-    ensure_go_binary mvdan.cc/unparam
-    runcmd unparam ./...
-  fi
+  ensure_go_binary mvdan.cc/unparam
+  runcmd unparam ./...
 }
 
 # check_vet runs go vet on source files.
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 1d7470f..acfcfb2 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -21,6 +21,7 @@
 	"text/tabwriter"
 	"time"
 
+	"golang.org/x/exp/event"
 	"golang.org/x/vulndb/internal/cvelistrepo"
 	"golang.org/x/vulndb/internal/gitrepo"
 	"golang.org/x/vulndb/internal/issues"
@@ -85,7 +86,8 @@
 		dieWithUsage("%v", err)
 	}
 
-	ctx := log.WithLineLogger(context.Background())
+	ctx := event.WithExporter(context.Background(),
+		event.NewExporter(log.NewLineHandler(os.Stderr), nil))
 	if img := os.Getenv("DOCKER_IMAGE"); img != "" {
 		log.Infof(ctx, "running in docker image %s", img)
 	}
diff --git a/go.mod b/go.mod
index 0d7e7b8..75c38b3 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,8 @@
 require (
 	cloud.google.com/go/errorreporting v0.1.0
 	cloud.google.com/go/firestore v1.6.1
+	github.com/GoogleCloudPlatform/opentelemetry-operations-go v1.0.0
+	github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.0.0
 	github.com/client9/misspell v0.3.4
 	github.com/go-git/go-billy/v5 v5.3.1
 	github.com/go-git/go-git/v5 v5.4.2
@@ -17,6 +19,9 @@
 	github.com/google/go-github/v41 v41.0.0
 	github.com/google/safehtml v0.0.2
 	github.com/jba/templatecheck v0.6.0
+	go.opentelemetry.io/otel v1.3.0
+	go.opentelemetry.io/otel/sdk v1.3.0
+	go.opentelemetry.io/otel/trace v1.3.0
 	golang.org/x/exp v0.0.0-20220124173137-7a6bfc487013
 	golang.org/x/exp/vulncheck v0.0.0-20220114162006-9d54fb35363c
 	golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57
@@ -33,6 +38,7 @@
 
 require (
 	cloud.google.com/go v0.97.0 // indirect
+	cloud.google.com/go/trace v1.0.0 // indirect
 	github.com/BurntSushi/toml v0.3.1 // indirect
 	github.com/Microsoft/go-winio v0.4.16 // indirect
 	github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 // indirect
@@ -45,7 +51,9 @@
 	github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0 // indirect
 	github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect
 	github.com/go-git/gcfg v1.5.0 // indirect
-	github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
+	github.com/go-logr/logr v1.2.2 // indirect
+	github.com/go-logr/stdr v1.2.0 // indirect
+	github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
 	github.com/golang/protobuf v1.5.2 // indirect
 	github.com/google/go-querystring v1.1.0 // indirect
 	github.com/googleapis/gax-go/v2 v2.1.1 // indirect
diff --git a/internal/gitrepo/gitrepo.go b/internal/gitrepo/gitrepo.go
index 62057c8..d8af35d 100644
--- a/internal/gitrepo/gitrepo.go
+++ b/internal/gitrepo/gitrepo.go
@@ -16,6 +16,7 @@
 	"github.com/go-git/go-git/v5/plumbing"
 	"github.com/go-git/go-git/v5/plumbing/object"
 	"github.com/go-git/go-git/v5/storage/memory"
+	"golang.org/x/exp/event"
 	"golang.org/x/tools/txtar"
 	"golang.org/x/vulndb/internal/derrors"
 	"golang.org/x/vulndb/internal/worker/log"
@@ -24,6 +25,9 @@
 // Clone returns a repo by cloning the repo at repoURL.
 func Clone(ctx context.Context, repoURL string) (repo *git.Repository, err error) {
 	defer derrors.Wrap(&err, "gitrepo.Clone(%q)", repoURL)
+	ctx = event.Start(ctx, "gitrepo.Clone")
+	defer event.End(ctx)
+
 	log.Infof(ctx, "Cloning repo %q at HEAD", repoURL)
 	return git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
 		URL:           repoURL,
@@ -37,6 +41,9 @@
 // Open returns a repo by opening the repo at the local path dirpath.
 func Open(ctx context.Context, dirpath string) (repo *git.Repository, err error) {
 	defer derrors.Wrap(&err, "gitrepo.Open(%q)", dirpath)
+	ctx = event.Start(ctx, "gitrepo.Open")
+	defer event.End(ctx)
+
 	log.Infof(ctx, "Opening repo at %q", dirpath)
 	repo, err = git.PlainOpen(dirpath)
 	if err != nil {
diff --git a/internal/worker/false_positives.go b/internal/worker/false_positives.go
index ce35ad1..ec2b640 100644
--- a/internal/worker/false_positives.go
+++ b/internal/worker/false_positives.go
@@ -7,6 +7,7 @@
 import (
 	"context"
 
+	"golang.org/x/exp/event"
 	"golang.org/x/vulndb/internal/derrors"
 	"golang.org/x/vulndb/internal/worker/store"
 )
@@ -14,6 +15,8 @@
 // updateFalsePositives makes sure the store reflects the list of false positives.
 func updateFalsePositives(ctx context.Context, st store.Store) (err error) {
 	defer derrors.Wrap(&err, "updateFalsePositives")
+	ctx = event.Start(ctx, "updateFalsePositives")
+	defer event.End(ctx)
 
 	for i := 0; i < len(falsePositives); i += maxTransactionWrites {
 		j := i + maxTransactionWrites
diff --git a/internal/worker/log/gcpjson.go b/internal/worker/log/gcpjson.go
index 0b12267..c6d9810 100644
--- a/internal/worker/log/gcpjson.go
+++ b/internal/worker/log/gcpjson.go
@@ -8,18 +8,16 @@
 	"context"
 	"fmt"
 	"io"
-	"os"
 	"sync"
 	"time"
 
 	"golang.org/x/exp/event"
 )
 
-// WithGCPJSONLogger returns a context which will log events in a format that is
+// NewGCPJSONLogger returns a handler which logs events in a format that is
 // understood by Google Cloud Platform logging.
-func WithGCPJSONLogger(ctx context.Context, traceID string) context.Context {
-	return event.WithExporter(ctx,
-		event.NewExporter(&gcpJSONHandler{w: os.Stderr, traceID: traceID}, nil))
+func NewGCPJSONHandler(w io.Writer, traceID string) event.Handler {
+	return &gcpJSONHandler{w: w, traceID: traceID}
 }
 
 type gcpJSONHandler struct {
diff --git a/internal/worker/log/log.go b/internal/worker/log/log.go
index d5e3db2..2006af5 100644
--- a/internal/worker/log/log.go
+++ b/internal/worker/log/log.go
@@ -9,7 +9,6 @@
 	"context"
 	"fmt"
 	"io"
-	"os"
 	"reflect"
 	"strings"
 	"sync"
@@ -19,12 +18,13 @@
 	"golang.org/x/exp/event/severity"
 )
 
-func WithLineLogger(ctx context.Context) context.Context {
-	return event.WithExporter(ctx, event.NewExporter(&lineHandler{w: os.Stderr}, nil))
+// NewLineHandler returns an event Handler that writes log events one per line
+// in an easy-to-read format:
+//   time level message label1=value1 label2=value2 ...
+func NewLineHandler(w io.Writer) event.Handler {
+	return &lineHandler{w: w}
 }
 
-// lineHandler writes log events one per line in an easy-to-read format:
-// time level message label1=value1 label2=value2 ...
 type lineHandler struct {
 	mu sync.Mutex // ensure a log line is not interrupted
 	w  io.Writer
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 0cab728..4e2fb62 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -19,6 +19,7 @@
 
 	"cloud.google.com/go/errorreporting"
 	"github.com/google/safehtml/template"
+	"golang.org/x/exp/event"
 	"golang.org/x/sync/errgroup"
 	"golang.org/x/vulndb/internal/cvelistrepo"
 	"golang.org/x/vulndb/internal/derrors"
@@ -26,6 +27,13 @@
 	"golang.org/x/vulndb/internal/issues"
 	"golang.org/x/vulndb/internal/worker/log"
 	"golang.org/x/vulndb/internal/worker/store"
+
+	texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
+	gcppropagator "github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator"
+	"go.opentelemetry.io/otel/propagation"
+	sdktrace "go.opentelemetry.io/otel/sdk/trace"
+	otrace "go.opentelemetry.io/otel/trace"
+	eotel "golang.org/x/exp/event/otel"
 )
 
 const pkgsiteURL = "https://pkg.go.dev"
@@ -36,6 +44,9 @@
 	cfg           Config
 	indexTemplate *template.Template
 	issueClient   issues.Client
+	traceHandler  event.Handler
+	propagator    propagation.TextMapPropagator
+	afterRequest  func()
 }
 
 func NewServer(ctx context.Context, cfg Config) (_ *Server, err error) {
@@ -43,6 +54,20 @@
 
 	s := &Server{cfg: cfg}
 
+	tracerProvider, err := initOpenTelemetry(cfg.Project)
+	if err != nil {
+		return nil, err
+	}
+	s.traceHandler = eotel.NewTraceHandler(tracerProvider.Tracer("vulndb-worker"))
+	s.afterRequest = func() { tracerProvider.ForceFlush(ctx) }
+	// The propagator extracts incoming trace IDs so that we can connect our trace spans
+	// to the incoming ones constructed by Cloud Run.
+	s.propagator = propagation.NewCompositeTextMapPropagator(
+		propagation.TraceContext{},
+		propagation.Baggage{},
+		gcppropagator.New(),
+	)
+
 	if cfg.UseErrorReporting {
 		reportingClient, err := errorreporting.NewClient(ctx, cfg.Project, errorreporting.Config{
 			ServiceName: serviceID,
@@ -89,9 +114,16 @@
 
 func (s *Server) handle(_ context.Context, pattern string, handler func(w http.ResponseWriter, r *http.Request) error) {
 	http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
+		log.Debugf(r.Context(), "#### SpanContext: %+v\n", otrace.SpanContextFromContext(r.Context()))
 		start := time.Now()
+		defer s.afterRequest()
 		traceID := r.Header.Get("X-Cloud-Trace-Context")
-		ctx := log.WithGCPJSONLogger(r.Context(), traceID)
+		exporter := event.NewExporter(eventHandlers{
+			log.NewGCPJSONHandler(os.Stderr, traceID),
+			s.traceHandler,
+		}, nil)
+		ctx := event.WithExporter(r.Context(), exporter)
+		ctx = s.propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
 		r = r.WithContext(ctx)
 
 		log.With("httpRequest", r).Infof(ctx, "starting %s", r.URL.Path)
@@ -314,3 +346,27 @@
 	}
 	return s.handleIssues(w, r)
 }
+
+func initOpenTelemetry(projectID string) (tp *sdktrace.TracerProvider, err error) {
+	defer derrors.Wrap(&err, "initOpenTelemetry(%q)", projectID)
+
+	exporter, err := texporter.New(texporter.WithProjectID(projectID))
+	if err != nil {
+		return nil, err
+	}
+	tp = sdktrace.NewTracerProvider(
+		// Enable tracing if there is no incoming request, or if the incoming
+		// request is sampled.
+		sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.AlwaysSample())),
+		sdktrace.WithBatcher(exporter))
+	return tp, nil
+}
+
+type eventHandlers []event.Handler
+
+func (eh eventHandlers) Event(ctx context.Context, ev *event.Event) context.Context {
+	for _, h := range eh {
+		ctx = h.Event(ctx, ev)
+	}
+	return ctx
+}
diff --git a/internal/worker/update.go b/internal/worker/update.go
index 7433be1..d6472b3 100644
--- a/internal/worker/update.go
+++ b/internal/worker/update.go
@@ -13,6 +13,7 @@
 
 	"github.com/go-git/go-git/v5"
 	"github.com/go-git/go-git/v5/plumbing/object"
+	"golang.org/x/exp/event"
 	"golang.org/x/vulndb/internal/cvelistrepo"
 	"golang.org/x/vulndb/internal/cveschema"
 	"golang.org/x/vulndb/internal/derrors"
@@ -66,6 +67,8 @@
 	// transaction can do, so the CVE files in the repo are processed in
 	// batches, one transaction per batch.
 	defer derrors.Wrap(&err, "updater.update(%s)", u.commit.Hash)
+	ctx = event.Start(ctx, "updater.update")
+	defer event.End(ctx)
 
 	defer func() {
 		if err != nil {
@@ -122,7 +125,7 @@
 		}
 		if stats.skipped {
 			skippedDirs = append(skippedDirs, dirFiles[0].DirPath)
-			if len(skippedDirs) > logSkippedEvery {
+			if len(skippedDirs) >= logSkippedEvery {
 				log.Infof(ctx, "skipping directory %s and %d others because the hashes match",
 					skippedDirs[0], len(skippedDirs)-1)
 				skippedDirs = nil
diff --git a/internal/worker/worker.go b/internal/worker/worker.go
index f52f679..a326cfe 100644
--- a/internal/worker/worker.go
+++ b/internal/worker/worker.go
@@ -17,6 +17,7 @@
 
 	"github.com/go-git/go-git/v5/plumbing"
 	"github.com/go-git/go-git/v5/plumbing/object"
+	"golang.org/x/exp/event"
 	"golang.org/x/sync/errgroup"
 	"golang.org/x/time/rate"
 	vulnc "golang.org/x/vuln/client"
@@ -78,6 +79,9 @@
 // It verifies that there is not an update currently in progress,
 // and it makes sure that the update is to a more recent commit.
 func checkUpdate(ctx context.Context, commit *object.Commit, st store.Store) error {
+	ctx = event.Start(ctx, "checkUpdate")
+	defer event.End(ctx)
+
 	urs, err := st.ListCommitUpdateRecords(ctx, 1)
 	if err != nil {
 		return err
@@ -171,7 +175,9 @@
 var issueRateLimiter = rate.NewLimiter(rate.Every(time.Duration(1000/float64(issueQPS))*time.Millisecond), 1)
 
 func CreateIssues(ctx context.Context, st store.Store, ic issues.Client, limit int) (err error) {
-	derrors.Wrap(&err, "CreateIssues(destination: %s)", ic.Destination())
+	defer derrors.Wrap(&err, "CreateIssues(destination: %s)", ic.Destination())
+	ctx = event.Start(ctx, "CreateIssues")
+	defer event.End(ctx)
 
 	needsIssue, err := st.ListCVERecordsWithTriageState(ctx, store.TriageStateNeedsIssue)
 	if err != nil {
diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go
index d15bb2f..9d8baa9 100644
--- a/internal/worker/worker_test.go
+++ b/internal/worker/worker_test.go
@@ -10,12 +10,14 @@
 import (
 	"context"
 	"math"
+	"os"
 	"strings"
 	"testing"
 	"time"
 
 	"github.com/google/go-cmp/cmp"
 	"github.com/google/go-cmp/cmp/cmpopts"
+	"golang.org/x/exp/event"
 	"golang.org/x/vulndb/internal/cveschema"
 	"golang.org/x/vulndb/internal/gitrepo"
 	"golang.org/x/vulndb/internal/issues"
@@ -95,7 +97,8 @@
 }
 
 func TestCreateIssues(t *testing.T) {
-	ctx := log.WithLineLogger(context.Background())
+	ctx := event.WithExporter(context.Background(),
+		event.NewExporter(log.NewLineHandler(os.Stderr), nil))
 	mstore := store.NewMemStore()
 	ic := issues.NewFakeClient()
 	ctime := time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC)