| // Copyright 2022 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package app |
| |
| import ( |
| "bytes" |
| "context" |
| "fmt" |
| "io" |
| "log" |
| "net/http" |
| "strings" |
| "time" |
| |
| "cloud.google.com/go/compute/metadata" |
| secretmanager "cloud.google.com/go/secretmanager/apiv1" |
| "cloud.google.com/go/secretmanager/apiv1/secretmanagerpb" |
| influxdb2 "github.com/influxdata/influxdb-client-go/v2" |
| "golang.org/x/build/internal/influx" |
| "golang.org/x/build/perfdata" |
| "golang.org/x/perf/benchfmt" |
| "golang.org/x/perf/benchseries" |
| "google.golang.org/api/idtoken" |
| ) |
| |
| const ( |
| backfillWindow = 30 * 24 * time.Hour // 30 days. |
| ) |
| |
| func (a *App) influxClient(ctx context.Context) (influxdb2.Client, error) { |
| if a.InfluxHost == "" { |
| return nil, fmt.Errorf("Influx host unknown (set INFLUX_HOST?)") |
| } |
| |
| token, err := a.findInfluxToken(ctx) |
| if err != nil { |
| return nil, fmt.Errorf("error finding Influx token: %w", err) |
| } |
| |
| return influxdb2.NewClient(a.InfluxHost, token), nil |
| } |
| |
| // syncInflux handles /cron/syncinflux, which updates an InfluxDB instance with |
| // the latest data from perfdata.golang.org (i.e. storage), or backfills it. |
| func (a *App) syncInflux(w http.ResponseWriter, r *http.Request) { |
| ctx := r.Context() |
| |
| if a.AuthCronEmail != "" { |
| if err := checkCronAuth(ctx, r, a.AuthCronEmail); err != nil { |
| log.Printf("Dropping invalid request to /cron/syncinflux: %v", err) |
| http.Error(w, err.Error(), 403) |
| return |
| } |
| } |
| |
| ifxc, err := a.influxClient(ctx) |
| if err != nil { |
| log.Printf("Error getting Influx client: %v", err) |
| http.Error(w, err.Error(), 500) |
| return |
| } |
| defer ifxc.Close() |
| |
| log.Printf("Connecting to influx...") |
| |
| lastPush, err := latestInfluxTimestamp(ctx, ifxc) |
| if err != nil { |
| http.Error(w, err.Error(), 500) |
| return |
| } |
| if lastPush.IsZero() { |
| // Pick the backfill window. |
| lastPush = time.Now().Add(-backfillWindow) |
| } |
| |
| log.Printf("Last push to influx: %v", lastPush) |
| |
| uploads, err := a.uploadsSince(ctx, lastPush) |
| if err != nil { |
| http.Error(w, err.Error(), 500) |
| return |
| } |
| |
| log.Printf("Uploads since last push: %d", len(uploads)) |
| |
| var errs []error |
| for _, u := range uploads { |
| log.Printf("Processing upload %s...", u.UploadID) |
| if err := a.pushRunToInflux(ctx, ifxc, u); err != nil { |
| errs = append(errs, err) |
| log.Printf("Error processing upload %s: %v", u.UploadID, err) |
| } |
| } |
| if len(errs) > 0 { |
| var failures strings.Builder |
| for _, err := range errs { |
| failures.WriteString(err.Error()) |
| failures.WriteString("\n") |
| } |
| http.Error(w, failures.String(), 500) |
| } |
| } |
| |
| func checkCronAuth(ctx context.Context, r *http.Request, wantEmail string) error { |
| const audience = "/cron/syncinflux" |
| |
| const authHeaderPrefix = "Bearer " |
| authHeader := r.Header.Get("Authorization") |
| if !strings.HasPrefix(authHeader, authHeaderPrefix) { |
| return fmt.Errorf("missing Authorization header") |
| } |
| token := authHeader[len(authHeaderPrefix):] |
| |
| p, err := idtoken.Validate(ctx, token, audience) |
| if err != nil { |
| return err |
| } |
| |
| if p.Issuer != "https://accounts.google.com" { |
| return fmt.Errorf("issuer must be https://accounts.google.com, but is %s", p.Issuer) |
| } |
| |
| e, ok := p.Claims["email"] |
| if !ok { |
| return fmt.Errorf("email missing from token") |
| } |
| email, ok := e.(string) |
| if !ok { |
| return fmt.Errorf("email unexpected type %T", e) |
| } |
| |
| if email != wantEmail { |
| return fmt.Errorf("email got %s want %s", email, wantEmail) |
| } |
| |
| return nil |
| } |
| |
| func (a *App) findInfluxToken(ctx context.Context) (string, error) { |
| if a.InfluxToken != "" { |
| return a.InfluxToken, nil |
| } |
| |
| var project string |
| if a.InfluxProject != "" { |
| project = a.InfluxProject |
| } else { |
| var err error |
| project, err = metadata.ProjectID() |
| if err != nil { |
| return "", fmt.Errorf("error determining GCP project ID (set INFLUX_TOKEN or INFLUX_PROJECT?): %w", err) |
| } |
| } |
| |
| log.Printf("Fetching Influx token from %s...", project) |
| |
| token, err := fetchInfluxToken(ctx, project) |
| if err != nil { |
| return "", fmt.Errorf("error fetching Influx token: %w", err) |
| } |
| |
| return token, nil |
| } |
| |
| func fetchInfluxToken(ctx context.Context, project string) (string, error) { |
| client, err := secretmanager.NewClient(ctx) |
| if err != nil { |
| return "", fmt.Errorf("error creating secret manager client: %w", err) |
| } |
| defer client.Close() |
| |
| req := &secretmanagerpb.AccessSecretVersionRequest{ |
| Name: "projects/" + project + "/secrets/" + influx.AdminTokenSecretName + "/versions/latest", |
| } |
| |
| result, err := client.AccessSecretVersion(ctx, req) |
| if err != nil { |
| return "", fmt.Errorf("failed to access secret version: %w", err) |
| } |
| |
| return string(result.Payload.Data), nil |
| } |
| |
| func latestInfluxTimestamp(ctx context.Context, ifxc influxdb2.Client) (time.Time, error) { |
| qc := ifxc.QueryAPI(influx.Org) |
| // Find the latest upload in the last month. |
| q := fmt.Sprintf(`from(bucket:%q) |
| |> range(start: -%dh) |
| |> filter(fn: (r) => r["_measurement"] == "benchmark-result") |
| |> filter(fn: (r) => r["_field"] == "upload-time") |
| |> group() |
| |> sort(columns: ["_value"], desc: true) |
| |> limit(n: 1)`, influx.Bucket, backfillWindow/time.Hour) |
| result, err := influxQuery(ctx, qc, q) |
| if err != nil { |
| return time.Time{}, err |
| } |
| for result.Next() { |
| // Except for the point timestamp, all other timestamps are stored as strings, specifically |
| // as the RFC3339Nano format. |
| // |
| // We only care about the first result, and there should be just one. |
| return time.Parse(time.RFC3339Nano, result.Record().Value().(string)) |
| } |
| return time.Time{}, result.Err() |
| } |
| |
| func (a *App) uploadsSince(ctx context.Context, since time.Time) ([]perfdata.UploadInfo, error) { |
| query := strings.Join([]string{ |
| // Limit results to the window from since to now. |
| "upload-time>" + since.UTC().Format(time.RFC3339), |
| // Only take results generated by the coordinator. This ensures that nobody can |
| // just upload data to perfdata.golang.org and spoof us (accidentally or intentionally). |
| "by:public-worker-builder@golang-ci-luci.iam.gserviceaccount.com", |
| // Only take results that were generated from post-submit runs, not trybots. |
| "post-submit:true", |
| }, " ") |
| uploadList := a.StorageClient.ListUploads( |
| ctx, |
| query, |
| nil, |
| 500, // TODO(mknyszek): page results if this isn't enough. |
| ) |
| defer uploadList.Close() |
| |
| var uploads []perfdata.UploadInfo |
| for uploadList.Next() { |
| uploads = append(uploads, uploadList.Info()) |
| } |
| if err := uploadList.Err(); err != nil { |
| return nil, err |
| } |
| return uploads, nil |
| } |
| |
| func (a *App) pushRunToInflux(ctx context.Context, ifxc influxdb2.Client, u perfdata.UploadInfo) error { |
| s, err := a.StorageClient.Query(ctx, fmt.Sprintf("upload:%s", u.UploadID)) |
| if err != nil { |
| return err |
| } |
| |
| // We need to read the upload multiple times via benchfmt.Reader, so |
| // copy to a buffer we can seek back to the beginning. |
| var buf bytes.Buffer |
| if _, err := io.Copy(&buf, s); err != nil { |
| return fmt.Errorf("error reading upload: %w", err) |
| } |
| if err := s.Close(); err != nil { |
| return fmt.Errorf("error closing upload: %w", err) |
| } |
| |
| comparisons := []struct { |
| suffix string |
| compare string |
| numerator string |
| denominator string |
| filter string |
| }{ |
| { |
| // Default: toolchain:baseline vs experiment without PGO |
| compare: "toolchain", |
| numerator: "experiment", |
| denominator: "baseline", |
| filter: "-pgo:on", // "off" or unset (bent doesn't set pgo). |
| }, |
| { |
| // toolchain:baseline vs experiment with PGO |
| suffix: "/pgo=on,toolchain:baseline-vs-experiment", |
| compare: "toolchain", |
| numerator: "experiment", |
| denominator: "baseline", |
| filter: "pgo:on", |
| }, |
| { |
| // pgo:off vs on with experiment toolchain (impact of enabling PGO) |
| suffix: "/toolchain:experiment,pgo=off-vs-on", |
| compare: "pgo", |
| numerator: "on", |
| denominator: "off", |
| filter: "toolchain:experiment", |
| }, |
| } |
| for _, c := range comparisons { |
| r := bytes.NewReader(buf.Bytes()) |
| fmtr := benchfmt.NewReader(r, u.UploadID) |
| |
| // Use the default comparisons. Namely: |
| // 1. Build a series out of commit dates (in our case, this is length 1). |
| // 2. Split out comparisons by benchmark name (unit we get for free). |
| // |
| // Copy the options for mutation. |
| opts := *benchseries.DefaultBuilderOptions() |
| opts.Compare = c.compare |
| opts.Numerator = c.numerator |
| opts.Denominator = c.denominator |
| if opts.Filter == "" { |
| opts.Filter = c.filter |
| } else { |
| opts.Filter += " " + c.filter |
| } |
| |
| if err := a.compareAndPush(ctx, ifxc, fmtr, &opts, c.suffix); err != nil { |
| return fmt.Errorf("error in compareAndPush(%s): %w", c.suffix, err) |
| } |
| } |
| |
| return nil |
| } |
| |
| func (a *App) compareAndPush(ctx context.Context, ifxc influxdb2.Client, r *benchfmt.Reader, opts *benchseries.BuilderOptions, suffix string) error { |
| // Scan the results into a benchseries builder. |
| builder, err := benchseries.NewBuilder(opts) |
| if err != nil { |
| return fmt.Errorf("failed to create benchseries builder: %v", err) |
| } |
| for r.Scan() { |
| rec := r.Result() |
| if err, ok := rec.(*benchfmt.SyntaxError); ok { |
| // Non-fatal result parse error. Warn |
| // but keep going. |
| log.Printf("Parse error: %v", err) |
| continue |
| } |
| res := rec.(*benchfmt.Result) |
| builder.Add(res) |
| } |
| if err := r.Err(); err != nil { |
| return err |
| } |
| |
| // Run the comparison. We don't have any existing results so our |
| // duplicate policy doesn't matter here. Just pick replacement. |
| comparisons, err := builder.AllComparisonSeries(nil, benchseries.DUPE_REPLACE) |
| if err != nil { |
| return fmt.Errorf("failed to creation comparison series: %w", err) |
| } |
| |
| const ( |
| confidence = 0.95 |
| bootstrap = 1000 |
| ) |
| |
| // Iterate over the comparisons, extract the results, and push them to Influx. |
| wapi := ifxc.WriteAPIBlocking(influx.Org, influx.Bucket) |
| comparisonLoop: |
| for _, cs := range comparisons { |
| cs.AddSummaries(confidence, bootstrap) |
| |
| summaries := cs.Summaries |
| |
| // Build a map of residues with single values. Our benchmark pipeline enforces |
| // that the only key that has a differing value across benchmark runs of the same |
| // name and unit is "toolchain." |
| // |
| // Most other keys are singular for *all* benchmarks in a run (like "goos") but |
| // even those that are not (like "pkg") remain the same even if "toolchain" differs. |
| // |
| // We build a map instead of just using them because we need to decide at upload |
| // time whether the key is an Influx tag or field. |
| residues := make(map[string]string) |
| for _, r := range cs.Residues { |
| if len(r.Slice) > 1 { |
| log.Printf("found non-singular key %q with values %v; comparison may be invalid, skipping...", r.S, r.Slice) |
| continue comparisonLoop |
| } |
| residues[r.S] = r.Slice[0] |
| } |
| |
| // N.B. In our case Series should have length 1, because we're processing |
| // a single result here. By default the string value here is the commit date. |
| for i, series := range cs.Series { |
| for j, benchmarkName := range cs.Benchmarks { |
| sum := summaries[i][j] |
| if !sum.Defined() { |
| log.Printf("Summary not defined for %s %s", series, benchmarkName) |
| continue |
| } |
| |
| measurement := "benchmark-result" // measurement |
| benchmarkName = benchmarkName + suffix // tag |
| series = series // time |
| center, low, high := sum.Center, sum.Low, sum.High // fields |
| unit := cs.Unit // tag |
| uploadTime := residues["upload-time"] // field |
| cpu := residues["cpu"] // tag |
| goarch := residues["goarch"] // tag |
| goos := residues["goos"] // tag |
| benchmarksCommit := residues["benchmarks-commit"] // field |
| baselineCommit := cs.HashPairs[series].DenHash // field |
| experimentCommit := cs.HashPairs[series].NumHash // field |
| repository := residues["repository"] // tag |
| branch := residues["branch"] // tag |
| |
| // cmd/bench didn't set repository prior to |
| // CL 413915. Older runs are all against go. |
| if repository == "" { |
| repository = "go" |
| } |
| |
| // Push to influx. |
| t, err := benchseries.ParseNormalizedDateString(series) |
| if err != nil { |
| return fmt.Errorf("error parsing normalized date: %w", err) |
| } |
| fields := map[string]interface{}{ |
| "center": center, |
| "low": low, |
| "high": high, |
| "upload-time": uploadTime, |
| "benchmarks-commit": benchmarksCommit, |
| "baseline-commit": baselineCommit, |
| "experiment-commit": experimentCommit, |
| } |
| tags := map[string]string{ |
| "name": benchmarkName, |
| "unit": unit, |
| "cpu": cpu, |
| "goarch": goarch, |
| "goos": goos, |
| "repository": repository, |
| "branch": branch, |
| // TODO(prattmic): Add pkg, which |
| // benchseries currently can't handle. |
| } |
| p := influxdb2.NewPoint(measurement, tags, fields, t) |
| if err := wapi.WritePoint(ctx, p); err != nil { |
| return fmt.Errorf("error writing point: %w", err) |
| } |
| } |
| } |
| } |
| return nil |
| } |