blob: 348d632fecf8303ed1ad979dd177ba3dd53c9795 [file] [log] [blame]
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package app
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"log"
"math"
"math/rand/v2"
"net/http"
"slices"
"strings"
"time"
"cloud.google.com/go/compute/metadata"
secretmanager "cloud.google.com/go/secretmanager/apiv1"
"cloud.google.com/go/secretmanager/apiv1/secretmanagerpb"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"golang.org/x/build/internal/influx"
"golang.org/x/build/perf/app/internal/benchtab"
"golang.org/x/build/perfdata"
"golang.org/x/perf/benchfmt"
"golang.org/x/perf/benchmath"
"golang.org/x/perf/benchproc"
"golang.org/x/perf/benchunit"
"google.golang.org/api/idtoken"
)
const (
backfillWindow = 30 * 24 * time.Hour // 30 days.
)
func (a *App) influxClient(ctx context.Context) (influxdb2.Client, error) {
if a.InfluxHost == "" {
return nil, fmt.Errorf("Influx host unknown (set INFLUX_HOST?)")
}
token, err := a.findInfluxToken(ctx)
if err != nil {
return nil, fmt.Errorf("error finding Influx token: %w", err)
}
return influxdb2.NewClient(a.InfluxHost, token), nil
}
// syncInflux handles /cron/syncinflux, which updates an InfluxDB instance with
// the latest data from perfdata.golang.org (i.e. storage), or backfills it.
func (a *App) syncInflux(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
if a.AuthCronEmail != "" {
if err := checkCronAuth(ctx, r, a.AuthCronEmail); err != nil {
log.Printf("Dropping invalid request to /cron/syncinflux: %v", err)
http.Error(w, err.Error(), 403)
return
}
}
ifxc, err := a.influxClient(ctx)
if err != nil {
log.Printf("Error getting Influx client: %v", err)
http.Error(w, err.Error(), 500)
return
}
defer ifxc.Close()
log.Printf("Connecting to influx...")
lastPush, err := latestInfluxTimestamp(ctx, ifxc)
if err != nil {
log.Printf("Error getting latest Influx timestamp: %v", err)
http.Error(w, err.Error(), 500)
return
}
if lastPush.IsZero() {
// Pick the backfill window.
lastPush = time.Now().Add(-backfillWindow)
}
log.Printf("Last push to influx: %v", lastPush)
uploads, err := a.uploadsSince(ctx, lastPush)
if err != nil {
log.Printf("Error getting new uploads: %v", err)
http.Error(w, err.Error(), 500)
return
}
log.Printf("Uploads since last push: %d", len(uploads))
var errs []error
for _, u := range uploads {
log.Printf("Processing upload %s...", u.UploadID)
if err := a.pushRunToInflux(ctx, ifxc, u); err != nil {
errs = append(errs, err)
log.Printf("Error processing upload %s: %v", u.UploadID, err)
}
}
if len(errs) > 0 {
var failures strings.Builder
for _, err := range errs {
failures.WriteString(err.Error())
failures.WriteString("\n")
}
http.Error(w, failures.String(), 500)
}
}
func checkCronAuth(ctx context.Context, r *http.Request, wantEmail string) error {
const audience = "/cron/syncinflux"
const authHeaderPrefix = "Bearer "
authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, authHeaderPrefix) {
return fmt.Errorf("missing Authorization header")
}
token := authHeader[len(authHeaderPrefix):]
p, err := idtoken.Validate(ctx, token, audience)
if err != nil {
return err
}
if p.Issuer != "https://accounts.google.com" {
return fmt.Errorf("issuer must be https://accounts.google.com, but is %s", p.Issuer)
}
e, ok := p.Claims["email"]
if !ok {
return fmt.Errorf("email missing from token")
}
email, ok := e.(string)
if !ok {
return fmt.Errorf("email unexpected type %T", e)
}
if email != wantEmail {
return fmt.Errorf("email got %s want %s", email, wantEmail)
}
return nil
}
func (a *App) findInfluxToken(ctx context.Context) (string, error) {
if a.InfluxToken != "" {
return a.InfluxToken, nil
}
var project string
if a.InfluxProject != "" {
project = a.InfluxProject
} else {
var err error
project, err = metadata.ProjectID()
if err != nil {
return "", fmt.Errorf("error determining GCP project ID (set INFLUX_TOKEN or INFLUX_PROJECT?): %w", err)
}
}
log.Printf("Fetching Influx token from %s...", project)
token, err := fetchInfluxToken(ctx, project)
if err != nil {
return "", fmt.Errorf("error fetching Influx token: %w", err)
}
return token, nil
}
func fetchInfluxToken(ctx context.Context, project string) (string, error) {
client, err := secretmanager.NewClient(ctx)
if err != nil {
return "", fmt.Errorf("error creating secret manager client: %w", err)
}
defer client.Close()
req := &secretmanagerpb.AccessSecretVersionRequest{
Name: "projects/" + project + "/secrets/" + influx.AdminTokenSecretName + "/versions/latest",
}
result, err := client.AccessSecretVersion(ctx, req)
if err != nil {
return "", fmt.Errorf("failed to access secret version: %w", err)
}
return string(result.Payload.Data), nil
}
func latestInfluxTimestamp(ctx context.Context, ifxc influxdb2.Client) (time.Time, error) {
qc := ifxc.QueryAPI(influx.Org)
// Find the latest upload in the last month.
q := fmt.Sprintf(`from(bucket:%q)
|> range(start: -%dh)
|> filter(fn: (r) => r["_measurement"] == "benchmark-result")
|> filter(fn: (r) => r["_field"] == "upload-time")
|> group()
|> sort(columns: ["_value"], desc: true)
|> limit(n: 1)`, influx.Bucket, backfillWindow/time.Hour)
result, err := influxQuery(ctx, qc, q)
if err != nil {
return time.Time{}, err
}
for result.Next() {
// Except for the point timestamp, all other timestamps are stored as strings, specifically
// as the RFC3339Nano format.
//
// We only care about the first result, and there should be just one.
return time.Parse(time.RFC3339Nano, result.Record().Value().(string))
}
return time.Time{}, result.Err()
}
func (a *App) uploadsSince(ctx context.Context, since time.Time) ([]perfdata.UploadInfo, error) {
query := strings.Join([]string{
// Limit results to the window from since to now.
"upload-time>" + since.UTC().Format(time.RFC3339),
// Only take results generated by the coordinator. This ensures that nobody can
// just upload data to perfdata.golang.org and spoof us (accidentally or intentionally).
"by:public-worker-builder@golang-ci-luci.iam.gserviceaccount.com",
// Only take results that were generated from post-submit runs, not trybots.
"post-submit:true",
}, " ")
uploadList := a.StorageClient.ListUploads(
ctx,
query,
nil,
500, // TODO(mknyszek): page results if this isn't enough.
)
defer uploadList.Close()
var uploads []perfdata.UploadInfo
for uploadList.Next() {
uploads = append(uploads, uploadList.Info())
}
if err := uploadList.Err(); err != nil {
return nil, err
}
return uploads, nil
}
func (a *App) pushRunToInflux(ctx context.Context, ifxc influxdb2.Client, u perfdata.UploadInfo) error {
s, err := a.StorageClient.Query(ctx, fmt.Sprintf("upload:%s", u.UploadID))
if err != nil {
return err
}
// We need to read the upload multiple times via benchfmt.Reader, so
// copy to a buffer we can seek back to the beginning.
var buf bytes.Buffer
if _, err := io.Copy(&buf, s); err != nil {
return fmt.Errorf("error reading upload: %w", err)
}
if err := s.Close(); err != nil {
return fmt.Errorf("error closing upload: %w", err)
}
// Iterate over the comparisons, extract the results, and push them to Influx.
wapi := ifxc.WriteAPIBlocking(influx.Org, influx.Bucket)
for _, cfg := range []comparisonConfig{pgoOff, pgoOn, pgoVs} {
r := bytes.NewReader(buf.Bytes())
forAll, err := benchCompare(r, u.UploadID, cfg)
if err != nil {
return fmt.Errorf("performing comparison for %s: %v", u.UploadID, err)
}
// TODO(mknyszek): Use new iterators when possible.
forAll(func(c comparison, err error) bool {
comparisonID := c.keys["experiment-commit"]
if err != nil {
// Just log this error. We don't want to quit early if we have other good comparisons.
log.Printf("error: %s: %s: %s: %v", comparisonID, c.benchmarkName, c.unit, err)
return true
}
measurement := "benchmark-result" // measurement
benchmarkName := c.benchmarkName + cfg.suffix // tag
timestamp := c.keys["experiment-commit-time"] // time
center, low, high := c.center, c.lo, c.hi // fields
unit := c.unit // tag
uploadTime := c.keys["upload-time"] // field
cpu := c.keys["cpu"] // tag
goarch := c.keys["goarch"] // tag
goos := c.keys["goos"] // tag
benchmarksCommit := c.keys["benchmarks-commit"] // field
baselineCommit := c.keys["baseline-commit"] // field
experimentCommit := c.keys["experiment-commit"] // field
repository := c.keys["repository"] // tag
branch := c.keys["branch"] // tag
// cmd/bench didn't set repository prior to
// CL 413915. Older runs are all against go.
if repository == "" {
repository = "go"
}
// Push to influx.
t, err := time.Parse(time.RFC3339Nano, timestamp)
if err != nil {
log.Printf("error: %s: %s: %s: parsing experiment-commit-time: %v", comparisonID, c.benchmarkName, c.unit, err)
return true
}
fields := map[string]any{
"center": center,
"low": low,
"high": high,
"upload-time": uploadTime,
"benchmarks-commit": benchmarksCommit,
"baseline-commit": baselineCommit,
"experiment-commit": experimentCommit,
}
tags := map[string]string{
"name": benchmarkName,
"unit": unit,
"cpu": cpu,
"goarch": goarch,
"goos": goos,
"repository": repository,
"branch": branch,
// TODO(mknyszek): Revisit adding pkg, now that we're not using benchseries.
}
p := influxdb2.NewPoint(measurement, tags, fields, t)
if err := wapi.WritePoint(ctx, p); err != nil {
log.Printf("%s: %s: %s: error writing point: %v", comparisonID, c.benchmarkName, c.unit, err)
return true
}
return true
})
}
return nil
}
type comparisonConfig struct {
suffix string
columnExpr string
filter string
ignore string
}
var (
pgoOff = comparisonConfig{
// Default: toolchain:baseline vs experiment without PGO
columnExpr: "toolchain@(baseline experiment)",
filter: "-pgo:on", // "off" or unset (bent doesn't set pgo).
ignore: "pgo", // Ignore pgo entirely; bent doesn't set it, which messes up the geomean.
}
pgoOn = comparisonConfig{
// toolchain:baseline vs experiment with PGO
suffix: "/pgo=on,toolchain:baseline-vs-experiment",
columnExpr: "toolchain@(baseline experiment)",
filter: "pgo:on",
}
pgoVs = comparisonConfig{
// pgo:off vs on with experiment toolchain (impact of enabling PGO)
suffix: "/toolchain:experiment,pgo=off-vs-on",
columnExpr: "pgo@(off on)",
filter: "toolchain:experiment",
}
)
type comparison struct {
benchmarkName string
unit string
keys map[string]string
lo, center, hi float64
}
// benchCompare reads r, assuming it contains benchmark data, and performs the provided comparison
// on the data, returning an iterator over all the resulting summaries.
func benchCompare(rr io.Reader, name string, c comparisonConfig) (func(func(comparison, error) bool), error) {
r := benchfmt.NewReader(rr, name)
filter, err := benchproc.NewFilter(c.filter)
if err != nil {
return nil, fmt.Errorf("parsing filter: %s", err)
}
var parser benchproc.ProjectionParser
var parseErr error
mustParse := func(name, val string, unit bool) *benchproc.Projection {
var proj *benchproc.Projection
var err error
if unit {
proj, _, err = parser.ParseWithUnit(val, filter)
} else {
proj, err = parser.Parse(val, filter)
}
if err != nil && parseErr == nil {
parseErr = fmt.Errorf("parsing %s: %s", name, err)
}
return proj
}
tableBy := mustParse("table", ".config", true)
rowBy := mustParse("row", ".fullname", false)
colBy := mustParse("col", c.columnExpr, false)
ignore := "go,tip,base,bentstamp,shortname,suite,pkg"
if c.ignore != "" {
ignore += "," + c.ignore
}
mustParse("ignore", ignore, false)
residue := parser.Residue()
// Check parse error.
if parseErr != nil {
return nil, fmt.Errorf("internal error: failed to parse projections for configuration: %v", parseErr)
}
// Scan the results into a benchseries builder.
stat := benchtab.NewBuilder(tableBy, rowBy, colBy, residue)
for r.Scan() {
switch rec := r.Result(); rec := rec.(type) {
case *benchfmt.SyntaxError:
// Non-fatal result parse error. Warn
// but keep going.
log.Printf("Parse error: %v", err)
case *benchfmt.Result:
if ok, _ := filter.Apply(rec); !ok {
continue
}
stat.Add(rec)
}
}
if err := r.Err(); err != nil {
return nil, err
}
// Prepopulate some assumptions about binary size units.
// bent does emit these, but they get stripped by perfdata.
// TODO(mknyszek): Remove this once perfdata stops doing that.
units := r.Units()
assumeExact := func(unit string) {
_, tidyUnit := benchunit.Tidy(1, unit)
key := benchfmt.UnitMetadataKey{Unit: tidyUnit, Key: "assume"}
if _, ok := units[key]; ok {
return // There was an assumption in the benchmark data.
}
units[key] = &benchfmt.UnitMetadata{
UnitMetadataKey: key,
OrigUnit: unit,
Value: "exact",
}
}
assumeExact("total-bytes")
assumeExact("text-bytes")
assumeExact("data-bytes")
assumeExact("rodata-bytes")
assumeExact("pclntab-bytes")
assumeExact("debug-bytes")
// Build the comparison table.
const confidence = 0.95
thresholds := benchmath.DefaultThresholds
tables := stat.ToTables(benchtab.TableOpts{
Confidence: confidence,
Thresholds: &thresholds,
Units: r.Units(),
})
// Iterate over the comparisons and extract the results
return func(yield func(sum comparison, err error) bool) {
for t, table := range tables.Tables {
// All the other keys, which should be identical, are captured as
// sub-fields of .config, our table projection.
keys := make(map[string]string)
for _, f := range tableBy.Fields()[0].Sub {
keys[f.Name] = tables.Keys[t].Get(f)
}
for _, row := range table.Rows {
benchmarkName := row.StringValues()
for _, col := range table.Cols {
cell, ok := table.Cells[benchtab.TableKey{Row: row, Col: col}]
if !ok {
// Cell not present due to missing data.
err := fmt.Errorf("summary not defined %s", benchmarkName)
if !yield(comparison{}, err) {
return
}
continue
}
if cell.Baseline == nil {
// Non-comparison cell.
continue
}
if len(cell.Summary.Warnings) != 0 {
// TODO(mknyszek): Make this an actual failure once it stops failing for x/tools.
// x/tools has 5 runs per benchmark, but we need 6 for 0.95 confidence.
log.Printf("warning: %s: %s: %s: %v", name, benchmarkName, table.Unit, errors.Join(cell.Summary.Warnings...))
}
lo, center, hi := ratioSummary(cell.Baseline.Sample, cell.Sample, confidence, 1000)
if !yield(comparison{benchmarkName, table.Unit, keys, lo, center, hi}, nil) {
return
}
}
}
// Emit the geomean as a separate timeline.
//
// The "benchmark name" for this will look something like "geomean/go/gotip/vs_release/c2s16".
builder, ok := keys["builder"]
if !ok {
log.Printf("warning: not emitting summary, no builder value found")
continue
}
repo, mod, machineType, err := parseBuilder(builder)
if err != nil {
log.Printf("warning: not emitting summary, can't interpret builder: %v", err)
continue
}
if table.SummaryLabel != "geomean" {
log.Printf("warning: not emitting summary, summary is not a geomean")
continue
}
emitted := false
for _, col := range table.Cols {
sum, ok := table.Summary[col]
if !ok {
continue
}
if sum.HasRatio {
geomeanName := table.SummaryLabel + "/" + repo + "/" + mod + "/" + machineType
if !yield(comparison{geomeanName, table.Unit, keys, sum.Ratio, sum.Ratio, sum.Ratio}, nil) {
return
}
emitted = true
break
}
}
if !emitted {
log.Printf("warning: not emitting summary, failed to find ratio")
}
}
}, nil
}
func parseBuilder(builder string) (repo, mod, machineType string, err error) {
if strings.HasPrefix(builder, "x_") || strings.HasPrefix(builder, "z_") {
// Repository.
m := strings.SplitN(builder, "-", 2)
if len(m) != 2 {
return "", "", "", fmt.Errorf("unexpected builder name: %s", builder)
}
repo = m[0]
builder = m[1]
} else {
repo = "go"
}
components := strings.SplitN(builder, "-", 4)
// components[0] -> Go branch name, already handled elsewhere
// components[1] -> GOOS, already handled elsewhere
// components[2] -> GOARCH, mostly handled, but there's a machine specifier here after an _
// components[3] -> rest of the builder name; perf builders will have this start with -perf.
if len(components) != 4 {
return "", "", "", fmt.Errorf("unexpected builder name: %s", builder)
}
// Extract the machine type (platform specifier).
m := strings.SplitN(components[2], "_", 2)
if len(m) == 2 {
machineType = m[1]
}
// Extract the run mods suffix.
if rest, ok := strings.CutPrefix(components[3], "perf_"); ok {
mod = rest
} else {
return "", "", "", fmt.Errorf("unexpected builder name: missing -perf as first run mod: %s", builder)
}
return
}
func ratioSummary(baseline, experiment *benchmath.Sample, confidence float64, bootstrapN int) (lo, center, hi float64) {
ratios := make([]float64, bootstrapN)
sampleNum := make([]float64, len(experiment.Values))
sampleDen := make([]float64, len(baseline.Values))
for i := range ratios {
resampleInto(experiment.Values, sampleNum)
resampleInto(baseline.Values, sampleDen)
den := median(sampleDen)
if den == 0 {
num := median(sampleNum)
if num >= 0 {
ratios[i] = (num + 1)
} else {
ratios[i] = (num - 1)
}
} else {
ratios[i] = median(sampleNum) / den
}
}
slices.Sort(ratios)
p := (1 - confidence) / 2
lo = percentile(ratios, p)
hi = percentile(ratios, 1-p)
center = median(ratios)
return
}
func percentile(a []float64, p float64) float64 {
if len(a) == 0 {
return math.NaN()
}
if p == 0 {
return a[0]
}
n := len(a)
if p == 1 {
return a[n-1]
}
f := float64(float64(n) * p) // Suppress fused-multiply-add
i := int(f)
x := f - float64(i)
r := a[i]
if x > 0 && i+1 < len(a) {
r = float64(r*(1-x)) + float64(a[i+1]*x) // Suppress fused-multiply-add
}
return r
}
func median(a []float64) float64 {
l := len(a)
if l&1 == 1 {
return a[l/2]
}
return (a[l/2] + a[l/2-1]) / 2
}
func norm(a []float64, l float64) float64 {
if len(a) == 0 {
return math.NaN()
}
n := 0.0
sum := 0.0
for _, x := range a {
if math.IsInf(x, 0) || math.IsNaN(x) {
continue
}
sum += math.Pow(math.Abs(x), l)
n++
}
return math.Pow(sum/n, 1/l)
}
func resampleInto(sample, dst []float64) {
for i := range dst {
dst[i] = sample[rand.N[int](len(sample))]
}
slices.Sort(dst)
}