internal/worker: add tracing
Add support for OpenTelemetry tracing of certain
important functions, using the event package.
We don't have a burning need for traces, but they
are nice to have, and this was an important exercise
for validating the approach of the event package.
Change-Id: I37d1f56f06f425f3b1eb885877a0d2f5ac85a098
Reviewed-on: https://go-review.googlesource.com/c/vulndb/+/380440
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/checks.bash b/checks.bash
index e0ed57e..323feba 100755
--- a/checks.bash
+++ b/checks.bash
@@ -61,10 +61,8 @@
# check_unparam runs unparam on source files.
check_unparam() {
- if [[ $(go version) = *go1.17* ]]; then
- ensure_go_binary mvdan.cc/unparam
- runcmd unparam ./...
- fi
+ ensure_go_binary mvdan.cc/unparam
+ runcmd unparam ./...
}
# check_vet runs go vet on source files.
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 1d7470f..acfcfb2 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -21,6 +21,7 @@
"text/tabwriter"
"time"
+ "golang.org/x/exp/event"
"golang.org/x/vulndb/internal/cvelistrepo"
"golang.org/x/vulndb/internal/gitrepo"
"golang.org/x/vulndb/internal/issues"
@@ -85,7 +86,8 @@
dieWithUsage("%v", err)
}
- ctx := log.WithLineLogger(context.Background())
+ ctx := event.WithExporter(context.Background(),
+ event.NewExporter(log.NewLineHandler(os.Stderr), nil))
if img := os.Getenv("DOCKER_IMAGE"); img != "" {
log.Infof(ctx, "running in docker image %s", img)
}
diff --git a/go.mod b/go.mod
index 0d7e7b8..75c38b3 100644
--- a/go.mod
+++ b/go.mod
@@ -10,6 +10,8 @@
require (
cloud.google.com/go/errorreporting v0.1.0
cloud.google.com/go/firestore v1.6.1
+ github.com/GoogleCloudPlatform/opentelemetry-operations-go v1.0.0
+ github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace v1.0.0
github.com/client9/misspell v0.3.4
github.com/go-git/go-billy/v5 v5.3.1
github.com/go-git/go-git/v5 v5.4.2
@@ -17,6 +19,9 @@
github.com/google/go-github/v41 v41.0.0
github.com/google/safehtml v0.0.2
github.com/jba/templatecheck v0.6.0
+ go.opentelemetry.io/otel v1.3.0
+ go.opentelemetry.io/otel/sdk v1.3.0
+ go.opentelemetry.io/otel/trace v1.3.0
golang.org/x/exp v0.0.0-20220124173137-7a6bfc487013
golang.org/x/exp/vulncheck v0.0.0-20220114162006-9d54fb35363c
golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57
@@ -33,6 +38,7 @@
require (
cloud.google.com/go v0.97.0 // indirect
+ cloud.google.com/go/trace v1.0.0 // indirect
github.com/BurntSushi/toml v0.3.1 // indirect
github.com/Microsoft/go-winio v0.4.16 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 // indirect
@@ -45,7 +51,9 @@
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0 // indirect
github.com/envoyproxy/protoc-gen-validate v0.1.0 // indirect
github.com/go-git/gcfg v1.5.0 // indirect
- github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
+ github.com/go-logr/logr v1.2.2 // indirect
+ github.com/go-logr/stdr v1.2.0 // indirect
+ github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/googleapis/gax-go/v2 v2.1.1 // indirect
diff --git a/internal/gitrepo/gitrepo.go b/internal/gitrepo/gitrepo.go
index 62057c8..d8af35d 100644
--- a/internal/gitrepo/gitrepo.go
+++ b/internal/gitrepo/gitrepo.go
@@ -16,6 +16,7 @@
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/object"
"github.com/go-git/go-git/v5/storage/memory"
+ "golang.org/x/exp/event"
"golang.org/x/tools/txtar"
"golang.org/x/vulndb/internal/derrors"
"golang.org/x/vulndb/internal/worker/log"
@@ -24,6 +25,9 @@
// Clone returns a repo by cloning the repo at repoURL.
func Clone(ctx context.Context, repoURL string) (repo *git.Repository, err error) {
defer derrors.Wrap(&err, "gitrepo.Clone(%q)", repoURL)
+ ctx = event.Start(ctx, "gitrepo.Clone")
+ defer event.End(ctx)
+
log.Infof(ctx, "Cloning repo %q at HEAD", repoURL)
return git.Clone(memory.NewStorage(), nil, &git.CloneOptions{
URL: repoURL,
@@ -37,6 +41,9 @@
// Open returns a repo by opening the repo at the local path dirpath.
func Open(ctx context.Context, dirpath string) (repo *git.Repository, err error) {
defer derrors.Wrap(&err, "gitrepo.Open(%q)", dirpath)
+ ctx = event.Start(ctx, "gitrepo.Open")
+ defer event.End(ctx)
+
log.Infof(ctx, "Opening repo at %q", dirpath)
repo, err = git.PlainOpen(dirpath)
if err != nil {
diff --git a/internal/worker/false_positives.go b/internal/worker/false_positives.go
index ce35ad1..ec2b640 100644
--- a/internal/worker/false_positives.go
+++ b/internal/worker/false_positives.go
@@ -7,6 +7,7 @@
import (
"context"
+ "golang.org/x/exp/event"
"golang.org/x/vulndb/internal/derrors"
"golang.org/x/vulndb/internal/worker/store"
)
@@ -14,6 +15,8 @@
// updateFalsePositives makes sure the store reflects the list of false positives.
func updateFalsePositives(ctx context.Context, st store.Store) (err error) {
defer derrors.Wrap(&err, "updateFalsePositives")
+ ctx = event.Start(ctx, "updateFalsePositives")
+ defer event.End(ctx)
for i := 0; i < len(falsePositives); i += maxTransactionWrites {
j := i + maxTransactionWrites
diff --git a/internal/worker/log/gcpjson.go b/internal/worker/log/gcpjson.go
index 0b12267..c6d9810 100644
--- a/internal/worker/log/gcpjson.go
+++ b/internal/worker/log/gcpjson.go
@@ -8,18 +8,16 @@
"context"
"fmt"
"io"
- "os"
"sync"
"time"
"golang.org/x/exp/event"
)
-// WithGCPJSONLogger returns a context which will log events in a format that is
+// NewGCPJSONLogger returns a handler which logs events in a format that is
// understood by Google Cloud Platform logging.
-func WithGCPJSONLogger(ctx context.Context, traceID string) context.Context {
- return event.WithExporter(ctx,
- event.NewExporter(&gcpJSONHandler{w: os.Stderr, traceID: traceID}, nil))
+func NewGCPJSONHandler(w io.Writer, traceID string) event.Handler {
+ return &gcpJSONHandler{w: w, traceID: traceID}
}
type gcpJSONHandler struct {
diff --git a/internal/worker/log/log.go b/internal/worker/log/log.go
index d5e3db2..2006af5 100644
--- a/internal/worker/log/log.go
+++ b/internal/worker/log/log.go
@@ -9,7 +9,6 @@
"context"
"fmt"
"io"
- "os"
"reflect"
"strings"
"sync"
@@ -19,12 +18,13 @@
"golang.org/x/exp/event/severity"
)
-func WithLineLogger(ctx context.Context) context.Context {
- return event.WithExporter(ctx, event.NewExporter(&lineHandler{w: os.Stderr}, nil))
+// NewLineHandler returns an event Handler that writes log events one per line
+// in an easy-to-read format:
+// time level message label1=value1 label2=value2 ...
+func NewLineHandler(w io.Writer) event.Handler {
+ return &lineHandler{w: w}
}
-// lineHandler writes log events one per line in an easy-to-read format:
-// time level message label1=value1 label2=value2 ...
type lineHandler struct {
mu sync.Mutex // ensure a log line is not interrupted
w io.Writer
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 0cab728..4e2fb62 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -19,6 +19,7 @@
"cloud.google.com/go/errorreporting"
"github.com/google/safehtml/template"
+ "golang.org/x/exp/event"
"golang.org/x/sync/errgroup"
"golang.org/x/vulndb/internal/cvelistrepo"
"golang.org/x/vulndb/internal/derrors"
@@ -26,6 +27,13 @@
"golang.org/x/vulndb/internal/issues"
"golang.org/x/vulndb/internal/worker/log"
"golang.org/x/vulndb/internal/worker/store"
+
+ texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
+ gcppropagator "github.com/GoogleCloudPlatform/opentelemetry-operations-go/propagator"
+ "go.opentelemetry.io/otel/propagation"
+ sdktrace "go.opentelemetry.io/otel/sdk/trace"
+ otrace "go.opentelemetry.io/otel/trace"
+ eotel "golang.org/x/exp/event/otel"
)
const pkgsiteURL = "https://pkg.go.dev"
@@ -36,6 +44,9 @@
cfg Config
indexTemplate *template.Template
issueClient issues.Client
+ traceHandler event.Handler
+ propagator propagation.TextMapPropagator
+ afterRequest func()
}
func NewServer(ctx context.Context, cfg Config) (_ *Server, err error) {
@@ -43,6 +54,20 @@
s := &Server{cfg: cfg}
+ tracerProvider, err := initOpenTelemetry(cfg.Project)
+ if err != nil {
+ return nil, err
+ }
+ s.traceHandler = eotel.NewTraceHandler(tracerProvider.Tracer("vulndb-worker"))
+ s.afterRequest = func() { tracerProvider.ForceFlush(ctx) }
+ // The propagator extracts incoming trace IDs so that we can connect our trace spans
+ // to the incoming ones constructed by Cloud Run.
+ s.propagator = propagation.NewCompositeTextMapPropagator(
+ propagation.TraceContext{},
+ propagation.Baggage{},
+ gcppropagator.New(),
+ )
+
if cfg.UseErrorReporting {
reportingClient, err := errorreporting.NewClient(ctx, cfg.Project, errorreporting.Config{
ServiceName: serviceID,
@@ -89,9 +114,16 @@
func (s *Server) handle(_ context.Context, pattern string, handler func(w http.ResponseWriter, r *http.Request) error) {
http.HandleFunc(pattern, func(w http.ResponseWriter, r *http.Request) {
+ log.Debugf(r.Context(), "#### SpanContext: %+v\n", otrace.SpanContextFromContext(r.Context()))
start := time.Now()
+ defer s.afterRequest()
traceID := r.Header.Get("X-Cloud-Trace-Context")
- ctx := log.WithGCPJSONLogger(r.Context(), traceID)
+ exporter := event.NewExporter(eventHandlers{
+ log.NewGCPJSONHandler(os.Stderr, traceID),
+ s.traceHandler,
+ }, nil)
+ ctx := event.WithExporter(r.Context(), exporter)
+ ctx = s.propagator.Extract(ctx, propagation.HeaderCarrier(r.Header))
r = r.WithContext(ctx)
log.With("httpRequest", r).Infof(ctx, "starting %s", r.URL.Path)
@@ -314,3 +346,27 @@
}
return s.handleIssues(w, r)
}
+
+func initOpenTelemetry(projectID string) (tp *sdktrace.TracerProvider, err error) {
+ defer derrors.Wrap(&err, "initOpenTelemetry(%q)", projectID)
+
+ exporter, err := texporter.New(texporter.WithProjectID(projectID))
+ if err != nil {
+ return nil, err
+ }
+ tp = sdktrace.NewTracerProvider(
+ // Enable tracing if there is no incoming request, or if the incoming
+ // request is sampled.
+ sdktrace.WithSampler(sdktrace.ParentBased(sdktrace.AlwaysSample())),
+ sdktrace.WithBatcher(exporter))
+ return tp, nil
+}
+
+type eventHandlers []event.Handler
+
+func (eh eventHandlers) Event(ctx context.Context, ev *event.Event) context.Context {
+ for _, h := range eh {
+ ctx = h.Event(ctx, ev)
+ }
+ return ctx
+}
diff --git a/internal/worker/update.go b/internal/worker/update.go
index 7433be1..d6472b3 100644
--- a/internal/worker/update.go
+++ b/internal/worker/update.go
@@ -13,6 +13,7 @@
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/plumbing/object"
+ "golang.org/x/exp/event"
"golang.org/x/vulndb/internal/cvelistrepo"
"golang.org/x/vulndb/internal/cveschema"
"golang.org/x/vulndb/internal/derrors"
@@ -66,6 +67,8 @@
// transaction can do, so the CVE files in the repo are processed in
// batches, one transaction per batch.
defer derrors.Wrap(&err, "updater.update(%s)", u.commit.Hash)
+ ctx = event.Start(ctx, "updater.update")
+ defer event.End(ctx)
defer func() {
if err != nil {
@@ -122,7 +125,7 @@
}
if stats.skipped {
skippedDirs = append(skippedDirs, dirFiles[0].DirPath)
- if len(skippedDirs) > logSkippedEvery {
+ if len(skippedDirs) >= logSkippedEvery {
log.Infof(ctx, "skipping directory %s and %d others because the hashes match",
skippedDirs[0], len(skippedDirs)-1)
skippedDirs = nil
diff --git a/internal/worker/worker.go b/internal/worker/worker.go
index f52f679..a326cfe 100644
--- a/internal/worker/worker.go
+++ b/internal/worker/worker.go
@@ -17,6 +17,7 @@
"github.com/go-git/go-git/v5/plumbing"
"github.com/go-git/go-git/v5/plumbing/object"
+ "golang.org/x/exp/event"
"golang.org/x/sync/errgroup"
"golang.org/x/time/rate"
vulnc "golang.org/x/vuln/client"
@@ -78,6 +79,9 @@
// It verifies that there is not an update currently in progress,
// and it makes sure that the update is to a more recent commit.
func checkUpdate(ctx context.Context, commit *object.Commit, st store.Store) error {
+ ctx = event.Start(ctx, "checkUpdate")
+ defer event.End(ctx)
+
urs, err := st.ListCommitUpdateRecords(ctx, 1)
if err != nil {
return err
@@ -171,7 +175,9 @@
var issueRateLimiter = rate.NewLimiter(rate.Every(time.Duration(1000/float64(issueQPS))*time.Millisecond), 1)
func CreateIssues(ctx context.Context, st store.Store, ic issues.Client, limit int) (err error) {
- derrors.Wrap(&err, "CreateIssues(destination: %s)", ic.Destination())
+ defer derrors.Wrap(&err, "CreateIssues(destination: %s)", ic.Destination())
+ ctx = event.Start(ctx, "CreateIssues")
+ defer event.End(ctx)
needsIssue, err := st.ListCVERecordsWithTriageState(ctx, store.TriageStateNeedsIssue)
if err != nil {
diff --git a/internal/worker/worker_test.go b/internal/worker/worker_test.go
index d15bb2f..9d8baa9 100644
--- a/internal/worker/worker_test.go
+++ b/internal/worker/worker_test.go
@@ -10,12 +10,14 @@
import (
"context"
"math"
+ "os"
"strings"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
+ "golang.org/x/exp/event"
"golang.org/x/vulndb/internal/cveschema"
"golang.org/x/vulndb/internal/gitrepo"
"golang.org/x/vulndb/internal/issues"
@@ -95,7 +97,8 @@
}
func TestCreateIssues(t *testing.T) {
- ctx := log.WithLineLogger(context.Background())
+ ctx := event.WithExporter(context.Background(),
+ event.NewExporter(log.NewLineHandler(os.Stderr), nil))
mstore := store.NewMemStore()
ic := issues.NewFakeClient()
ctime := time.Date(2020, 1, 2, 0, 0, 0, 0, time.UTC)