blob: dfce7da19afcc3826b0288fef5f4def1a5277930 [file] [log] [blame]
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package app
import (
"context"
"fmt"
"log"
"net/http"
"strings"
"time"
"cloud.google.com/go/compute/metadata"
secretmanager "cloud.google.com/go/secretmanager/apiv1"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"golang.org/x/build/internal/influx"
"golang.org/x/build/perfdata"
"golang.org/x/perf/benchfmt"
"golang.org/x/perf/benchseries"
"google.golang.org/api/idtoken"
secretmanagerpb "google.golang.org/genproto/googleapis/cloud/secretmanager/v1"
)
const (
backfillWindow = 30 * 24 * time.Hour // 30 days.
)
// syncInflux handles /cron/syncinflux, which updates an InfluxDB instance with
// the latest data from perfdata.golang.org (i.e. storage), or backfills it.
func (a *App) syncInflux(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if a.AuthCronEmail != "" {
if err := checkCronAuth(ctx, r, a.AuthCronEmail); err != nil {
log.Printf("Dropping invalid request to /cron/syncinflux: %v", err)
http.Error(w, err.Error(), 403)
return
}
}
if a.InfluxHost == "" {
s := "Influx host unknown (set INFLUX_HOST?)"
log.Printf(s)
http.Error(w, s, 500)
return
}
token, err := a.findInfluxToken(ctx)
if err != nil {
log.Printf("Error finding Influx token: %v", err)
http.Error(w, err.Error(), 500)
return
}
ifxc := influxdb2.NewClient(a.InfluxHost, token)
defer ifxc.Close()
log.Printf("Connecting to influx...")
lastPush, err := latestInfluxTimestamp(ctx, ifxc)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
if lastPush.IsZero() {
// Pick the backfill window.
lastPush = time.Now().Add(-backfillWindow)
}
log.Printf("Last push to influx: %v", lastPush)
uploads, err := a.uploadsSince(ctx, lastPush)
if err != nil {
http.Error(w, err.Error(), 500)
return
}
log.Printf("Uploads since last push: %d", len(uploads))
var errs []error
for _, u := range uploads {
log.Printf("Processing upload %s...", u.UploadID)
if err := a.pushRunToInflux(ctx, ifxc, u); err != nil {
errs = append(errs, err)
log.Printf("Error processing upload %s: %v", u.UploadID, err)
}
}
if len(errs) > 0 {
var failures strings.Builder
for _, err := range errs {
failures.WriteString(err.Error())
failures.WriteString("\n")
}
http.Error(w, failures.String(), 500)
}
}
func checkCronAuth(ctx context.Context, r *http.Request, wantEmail string) error {
const audience = "/cron/syncinflux"
const authHeaderPrefix = "Bearer "
authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, authHeaderPrefix) {
return fmt.Errorf("missing Authorization header")
}
token := authHeader[len(authHeaderPrefix):]
p, err := idtoken.Validate(ctx, token, audience)
if err != nil {
return err
}
if p.Issuer != "https://accounts.google.com" {
return fmt.Errorf("issuer must be https://accounts.google.com, but is %s", p.Issuer)
}
e, ok := p.Claims["email"]
if !ok {
return fmt.Errorf("email missing from token")
}
email, ok := e.(string)
if !ok {
return fmt.Errorf("email unexpected type %T", e)
}
if email != wantEmail {
return fmt.Errorf("email got %s want %s", email, wantEmail)
}
return nil
}
func (a *App) findInfluxToken(ctx context.Context) (string, error) {
if a.InfluxToken != "" {
return a.InfluxToken, nil
}
var project string
if a.InfluxProject != "" {
project = a.InfluxProject
} else {
var err error
project, err = metadata.ProjectID()
if err != nil {
return "", fmt.Errorf("error determining GCP project ID (set INFLUX_TOKEN or INFLUX_PROJECT?): %w", err)
}
}
log.Printf("Fetching Influx token from %s...", project)
token, err := fetchInfluxToken(ctx, project)
if err != nil {
return "", fmt.Errorf("error fetching Influx token: %w", err)
}
return token, nil
}
func fetchInfluxToken(ctx context.Context, project string) (string, error) {
client, err := secretmanager.NewClient(ctx)
if err != nil {
return "", fmt.Errorf("error creating secret manager client: %w", err)
}
defer client.Close()
req := &secretmanagerpb.AccessSecretVersionRequest{
Name: "projects/" + project + "/secrets/" + influx.AdminTokenSecretName + "/versions/latest",
}
result, err := client.AccessSecretVersion(ctx, req)
if err != nil {
return "", fmt.Errorf("failed to access secret version: %w", err)
}
return string(result.Payload.Data), nil
}
func latestInfluxTimestamp(ctx context.Context, ifxc influxdb2.Client) (time.Time, error) {
qc := ifxc.QueryAPI(influx.Org)
// Find the latest upload in the last month.
q := fmt.Sprintf(`from(bucket:%q)
|> range(start: -%dh)
|> filter(fn: (r) => r["_measurement"] == "benchmark-result")
|> filter(fn: (r) => r["_field"] == "upload-time")
|> group()
|> sort(columns: ["_value"], desc: true)
|> limit(n: 1)`, influx.Bucket, backfillWindow/time.Hour)
result, err := qc.Query(ctx, q)
if err != nil {
return time.Time{}, err
}
for result.Next() {
// Except for the point timestamp, all other timestamps are stored as strings, specifically
// as the RFC3339Nano format.
//
// We only care about the first result, and there should be just one.
return time.Parse(time.RFC3339Nano, result.Record().Value().(string))
}
return time.Time{}, result.Err()
}
func (a *App) uploadsSince(ctx context.Context, since time.Time) ([]perfdata.UploadInfo, error) {
query := strings.Join([]string{
// Limit results to the window from since to now.
"upload-time>" + since.UTC().Format(time.RFC3339),
// Only take results generated by the coordinator. This ensures that nobody can
// just upload data to perfdata.golang.org and spoof us (accidentally or intentionally).
"by:coordinator@symbolic-datum-552.iam.gserviceaccount.com",
// Only take results that were generated from post-submit runs, not trybots.
"post-submit:true",
// Limit to just the master branch for now.
// TODO(mknyszek): Support other branches.
"branch:master",
}, " ")
uploadList := a.StorageClient.ListUploads(
ctx,
query,
nil,
500, // TODO(mknyszek): page results if this isn't enough.
)
defer uploadList.Close()
var uploads []perfdata.UploadInfo
for uploadList.Next() {
uploads = append(uploads, uploadList.Info())
}
if err := uploadList.Err(); err != nil {
return nil, err
}
return uploads, nil
}
func (a *App) pushRunToInflux(ctx context.Context, ifxc influxdb2.Client, u perfdata.UploadInfo) error {
s, err := a.StorageClient.Query(ctx, fmt.Sprintf("upload:%s", u.UploadID))
if err != nil {
return err
}
defer s.Close()
r := benchfmt.NewReader(s, u.UploadID)
// Scan the results into a benchseries builder.
//
// Use the default comparisons. Namely:
// 1. Compare across "toolchain," specifically "baseline" vs. "experiment."
// 2. Build a series out of commit dates (in our case, this is length 1).
// 3. Split out comparisons by benchmark name (unit we get for free).
builder, err := benchseries.NewBuilder(benchseries.DefaultBuilderOptions())
if err != nil {
return fmt.Errorf("failed to create benchseries builder: %v", err)
}
for r.Scan() {
rec := r.Result()
if err, ok := rec.(*benchfmt.SyntaxError); ok {
// Non-fatal result parse error. Warn
// but keep going.
log.Printf("Parse error: %v", err)
continue
}
res := rec.(*benchfmt.Result)
builder.Add(res)
}
if err := r.Err(); err != nil {
return err
}
// Run the comparison. We don't have any existing results so our
// duplicate policy doesn't matter here. Just pick replacement.
comparisons := builder.AllComparisonSeries(nil, benchseries.DUPE_REPLACE)
const (
confidence = 0.95
bootstrap = 1000
)
// Iterate over the comparisons, extract the results, and push them to Influx.
wapi := ifxc.WriteAPIBlocking(influx.Org, influx.Bucket)
comparisonLoop:
for _, cs := range comparisons {
cs.AddSummaries(confidence, bootstrap)
summaries := cs.Summaries
// Build a map of residues with single values. Our benchmark pipeline enforces
// that the only key that has a differing value across benchmark runs of the same
// name and unit is "toolchain."
//
// Most other keys are singular for *all* benchmarks in a run (like "goos") but
// even those that are not (like "pkg") remain the same even if "toolchain" differs.
//
// We build a map instead of just using them because we need to decide at upload
// time whether the key is an Influx tag or field.
residues := make(map[string]string)
for _, r := range cs.Residues {
if len(r.Slice) > 1 {
log.Printf("found non-singular key %q with values %v; comparison may be invalid, skipping...", r.S, r.Slice)
continue comparisonLoop
}
residues[r.S] = r.Slice[0]
}
// N.B. In our case Series should have length 1, because we're processing
// a single result here. By default the string value here is the commit date.
for i, series := range cs.Series {
for j, benchmarkName := range cs.Benchmarks {
sum := summaries[i][j]
if !sum.Defined() {
log.Printf("Summary not defined for %s %s", series, benchmarkName)
continue
}
measurement := "benchmark-result" // measurement
benchmarkName = benchmarkName // tag
series = series // time
center, low, high := sum.Center, sum.Low, sum.High // fields
unit := cs.Unit // tag
uploadTime := residues["upload-time"] // field
cpu := residues["cpu"] // tag
goarch := residues["goarch"] // tag
goos := residues["goos"] // tag
benchmarksCommit := residues["benchmarks-commit"] // field
baselineCommit := cs.HashPairs[series].DenHash // field
experimentCommit := cs.HashPairs[series].NumHash // field
branch := residues["branch"] // tag
// Push to influx.
t, err := benchseries.ParseNormalizedDateString(series)
if err != nil {
return fmt.Errorf("error parsing normalized date: %w", err)
}
fields := map[string]interface{}{
"center": center,
"low": low,
"high": high,
"upload-time": uploadTime,
"benchmarks-commit": benchmarksCommit,
"baseline-commit": baselineCommit,
"experiment-commit": experimentCommit,
}
tags := map[string]string{
"name": benchmarkName,
"unit": unit,
"cpu": cpu,
"goarch": goarch,
"goos": goos,
"branch": branch,
// TODO(prattmic): Add pkg, which
// benchseries currently can't handle.
}
p := influxdb2.NewPoint(measurement, tags, fields, t)
if err := wapi.WritePoint(ctx, p); err != nil {
return fmt.Errorf("error writing point: %w", err)
}
}
}
}
return nil
}