blob: c29b452bd617982bce023a8600d593be6b427186 [file] [log] [blame]
Dmitri Shuralyov7bf60f02023-02-28 19:12:21 -05001// Copyright 2022 The Go Authors. All rights reserved.
Michael Prattbf722552022-04-08 12:41:06 -04002// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package app
6
7import (
Michael Pratt03ab3b02023-05-25 13:26:14 -04008 "bytes"
Michael Prattbf722552022-04-08 12:41:06 -04009 "context"
10 "fmt"
Michael Pratt03ab3b02023-05-25 13:26:14 -040011 "io"
Michael Prattbf722552022-04-08 12:41:06 -040012 "log"
13 "net/http"
14 "strings"
15 "time"
16
17 "cloud.google.com/go/compute/metadata"
18 secretmanager "cloud.google.com/go/secretmanager/apiv1"
Dmitri Shuralyov39811cb2023-10-01 14:41:57 -040019 "cloud.google.com/go/secretmanager/apiv1/secretmanagerpb"
Michael Prattbf722552022-04-08 12:41:06 -040020 influxdb2 "github.com/influxdata/influxdb-client-go/v2"
21 "golang.org/x/build/internal/influx"
22 "golang.org/x/build/perfdata"
23 "golang.org/x/perf/benchfmt"
24 "golang.org/x/perf/benchseries"
25 "google.golang.org/api/idtoken"
Michael Prattbf722552022-04-08 12:41:06 -040026)
27
28const (
29 backfillWindow = 30 * 24 * time.Hour // 30 days.
30)
31
Michael Pratt2e87f3b2022-06-13 15:29:38 -040032func (a *App) influxClient(ctx context.Context) (influxdb2.Client, error) {
33 if a.InfluxHost == "" {
34 return nil, fmt.Errorf("Influx host unknown (set INFLUX_HOST?)")
35 }
36
37 token, err := a.findInfluxToken(ctx)
38 if err != nil {
39 return nil, fmt.Errorf("error finding Influx token: %w", err)
40 }
41
42 return influxdb2.NewClient(a.InfluxHost, token), nil
43}
44
Michael Prattbf722552022-04-08 12:41:06 -040045// syncInflux handles /cron/syncinflux, which updates an InfluxDB instance with
46// the latest data from perfdata.golang.org (i.e. storage), or backfills it.
47func (a *App) syncInflux(w http.ResponseWriter, r *http.Request) {
48 ctx := r.Context()
49
50 if a.AuthCronEmail != "" {
51 if err := checkCronAuth(ctx, r, a.AuthCronEmail); err != nil {
52 log.Printf("Dropping invalid request to /cron/syncinflux: %v", err)
53 http.Error(w, err.Error(), 403)
54 return
55 }
56 }
57
Michael Pratt2e87f3b2022-06-13 15:29:38 -040058 ifxc, err := a.influxClient(ctx)
Michael Prattbf722552022-04-08 12:41:06 -040059 if err != nil {
Michael Pratt2e87f3b2022-06-13 15:29:38 -040060 log.Printf("Error getting Influx client: %v", err)
Michael Prattbf722552022-04-08 12:41:06 -040061 http.Error(w, err.Error(), 500)
62 return
63 }
Michael Prattbf722552022-04-08 12:41:06 -040064 defer ifxc.Close()
65
66 log.Printf("Connecting to influx...")
67
68 lastPush, err := latestInfluxTimestamp(ctx, ifxc)
69 if err != nil {
70 http.Error(w, err.Error(), 500)
71 return
72 }
73 if lastPush.IsZero() {
74 // Pick the backfill window.
75 lastPush = time.Now().Add(-backfillWindow)
76 }
77
78 log.Printf("Last push to influx: %v", lastPush)
79
80 uploads, err := a.uploadsSince(ctx, lastPush)
81 if err != nil {
82 http.Error(w, err.Error(), 500)
83 return
84 }
85
86 log.Printf("Uploads since last push: %d", len(uploads))
87
88 var errs []error
89 for _, u := range uploads {
90 log.Printf("Processing upload %s...", u.UploadID)
91 if err := a.pushRunToInflux(ctx, ifxc, u); err != nil {
92 errs = append(errs, err)
93 log.Printf("Error processing upload %s: %v", u.UploadID, err)
94 }
95 }
96 if len(errs) > 0 {
97 var failures strings.Builder
98 for _, err := range errs {
99 failures.WriteString(err.Error())
100 failures.WriteString("\n")
101 }
102 http.Error(w, failures.String(), 500)
103 }
104}
105
106func checkCronAuth(ctx context.Context, r *http.Request, wantEmail string) error {
107 const audience = "/cron/syncinflux"
108
109 const authHeaderPrefix = "Bearer "
110 authHeader := r.Header.Get("Authorization")
111 if !strings.HasPrefix(authHeader, authHeaderPrefix) {
112 return fmt.Errorf("missing Authorization header")
113 }
114 token := authHeader[len(authHeaderPrefix):]
115
116 p, err := idtoken.Validate(ctx, token, audience)
117 if err != nil {
118 return err
119 }
120
121 if p.Issuer != "https://accounts.google.com" {
122 return fmt.Errorf("issuer must be https://accounts.google.com, but is %s", p.Issuer)
123 }
124
125 e, ok := p.Claims["email"]
126 if !ok {
127 return fmt.Errorf("email missing from token")
128 }
129 email, ok := e.(string)
130 if !ok {
131 return fmt.Errorf("email unexpected type %T", e)
132 }
133
134 if email != wantEmail {
135 return fmt.Errorf("email got %s want %s", email, wantEmail)
136 }
137
138 return nil
139}
140
141func (a *App) findInfluxToken(ctx context.Context) (string, error) {
142 if a.InfluxToken != "" {
143 return a.InfluxToken, nil
144 }
145
146 var project string
147 if a.InfluxProject != "" {
148 project = a.InfluxProject
149 } else {
150 var err error
151 project, err = metadata.ProjectID()
152 if err != nil {
153 return "", fmt.Errorf("error determining GCP project ID (set INFLUX_TOKEN or INFLUX_PROJECT?): %w", err)
154 }
155 }
156
157 log.Printf("Fetching Influx token from %s...", project)
158
159 token, err := fetchInfluxToken(ctx, project)
160 if err != nil {
161 return "", fmt.Errorf("error fetching Influx token: %w", err)
162 }
163
164 return token, nil
165}
166
167func fetchInfluxToken(ctx context.Context, project string) (string, error) {
168 client, err := secretmanager.NewClient(ctx)
169 if err != nil {
170 return "", fmt.Errorf("error creating secret manager client: %w", err)
171 }
172 defer client.Close()
173
174 req := &secretmanagerpb.AccessSecretVersionRequest{
175 Name: "projects/" + project + "/secrets/" + influx.AdminTokenSecretName + "/versions/latest",
176 }
177
178 result, err := client.AccessSecretVersion(ctx, req)
179 if err != nil {
180 return "", fmt.Errorf("failed to access secret version: %w", err)
181 }
182
183 return string(result.Payload.Data), nil
184}
185
186func latestInfluxTimestamp(ctx context.Context, ifxc influxdb2.Client) (time.Time, error) {
187 qc := ifxc.QueryAPI(influx.Org)
188 // Find the latest upload in the last month.
189 q := fmt.Sprintf(`from(bucket:%q)
190 |> range(start: -%dh)
191 |> filter(fn: (r) => r["_measurement"] == "benchmark-result")
192 |> filter(fn: (r) => r["_field"] == "upload-time")
193 |> group()
194 |> sort(columns: ["_value"], desc: true)
195 |> limit(n: 1)`, influx.Bucket, backfillWindow/time.Hour)
Michael Pratt6ae4b692022-12-21 16:54:36 -0500196 result, err := influxQuery(ctx, qc, q)
Michael Prattbf722552022-04-08 12:41:06 -0400197 if err != nil {
198 return time.Time{}, err
199 }
200 for result.Next() {
201 // Except for the point timestamp, all other timestamps are stored as strings, specifically
202 // as the RFC3339Nano format.
203 //
204 // We only care about the first result, and there should be just one.
205 return time.Parse(time.RFC3339Nano, result.Record().Value().(string))
206 }
207 return time.Time{}, result.Err()
208}
209
210func (a *App) uploadsSince(ctx context.Context, since time.Time) ([]perfdata.UploadInfo, error) {
211 query := strings.Join([]string{
212 // Limit results to the window from since to now.
213 "upload-time>" + since.UTC().Format(time.RFC3339),
214 // Only take results generated by the coordinator. This ensures that nobody can
215 // just upload data to perfdata.golang.org and spoof us (accidentally or intentionally).
Michael Anthony Knyszekd8b3a642024-02-09 22:27:02 +0000216 "by:public-worker-builder@golang-ci-luci.iam.gserviceaccount.com",
Michael Prattbf722552022-04-08 12:41:06 -0400217 // Only take results that were generated from post-submit runs, not trybots.
218 "post-submit:true",
Michael Prattbf722552022-04-08 12:41:06 -0400219 }, " ")
220 uploadList := a.StorageClient.ListUploads(
221 ctx,
222 query,
223 nil,
224 500, // TODO(mknyszek): page results if this isn't enough.
225 )
226 defer uploadList.Close()
227
228 var uploads []perfdata.UploadInfo
229 for uploadList.Next() {
230 uploads = append(uploads, uploadList.Info())
231 }
232 if err := uploadList.Err(); err != nil {
233 return nil, err
234 }
235 return uploads, nil
236}
237
238func (a *App) pushRunToInflux(ctx context.Context, ifxc influxdb2.Client, u perfdata.UploadInfo) error {
239 s, err := a.StorageClient.Query(ctx, fmt.Sprintf("upload:%s", u.UploadID))
240 if err != nil {
241 return err
242 }
Michael Prattbf722552022-04-08 12:41:06 -0400243
Michael Pratt03ab3b02023-05-25 13:26:14 -0400244 // We need to read the upload multiple times via benchfmt.Reader, so
245 // copy to a buffer we can seek back to the beginning.
246 var buf bytes.Buffer
247 if _, err := io.Copy(&buf, s); err != nil {
248 return fmt.Errorf("error reading upload: %w", err)
249 }
250 if err := s.Close(); err != nil {
251 return fmt.Errorf("error closing upload: %w", err)
252 }
253
cui fliter5a5198d2023-08-18 10:28:05 +0800254 comparisons := []struct {
Michael Pratt03ab3b02023-05-25 13:26:14 -0400255 suffix string
256 compare string
257 numerator string
258 denominator string
259 filter string
260 }{
261 {
262 // Default: toolchain:baseline vs experiment without PGO
263 compare: "toolchain",
264 numerator: "experiment",
265 denominator: "baseline",
cui fliter5a5198d2023-08-18 10:28:05 +0800266 filter: "-pgo:on", // "off" or unset (bent doesn't set pgo).
Michael Pratt03ab3b02023-05-25 13:26:14 -0400267 },
268 {
269 // toolchain:baseline vs experiment with PGO
270 suffix: "/pgo=on,toolchain:baseline-vs-experiment",
271 compare: "toolchain",
272 numerator: "experiment",
273 denominator: "baseline",
274 filter: "pgo:on",
275 },
276 {
277 // pgo:off vs on with experiment toolchain (impact of enabling PGO)
278 suffix: "/toolchain:experiment,pgo=off-vs-on",
279 compare: "pgo",
280 numerator: "on",
281 denominator: "off",
282 filter: "toolchain:experiment",
283 },
284 }
285 for _, c := range comparisons {
286 r := bytes.NewReader(buf.Bytes())
287 fmtr := benchfmt.NewReader(r, u.UploadID)
288
289 // Use the default comparisons. Namely:
290 // 1. Build a series out of commit dates (in our case, this is length 1).
291 // 2. Split out comparisons by benchmark name (unit we get for free).
292 //
293 // Copy the options for mutation.
294 opts := *benchseries.DefaultBuilderOptions()
295 opts.Compare = c.compare
296 opts.Numerator = c.numerator
297 opts.Denominator = c.denominator
298 if opts.Filter == "" {
299 opts.Filter = c.filter
300 } else {
301 opts.Filter += " " + c.filter
302 }
303
304 if err := a.compareAndPush(ctx, ifxc, fmtr, &opts, c.suffix); err != nil {
305 return fmt.Errorf("error in compareAndPush(%s): %w", c.suffix, err)
306 }
307 }
308
309 return nil
310}
311
312func (a *App) compareAndPush(ctx context.Context, ifxc influxdb2.Client, r *benchfmt.Reader, opts *benchseries.BuilderOptions, suffix string) error {
Michael Prattbf722552022-04-08 12:41:06 -0400313 // Scan the results into a benchseries builder.
Michael Pratt03ab3b02023-05-25 13:26:14 -0400314 builder, err := benchseries.NewBuilder(opts)
Michael Prattbf722552022-04-08 12:41:06 -0400315 if err != nil {
316 return fmt.Errorf("failed to create benchseries builder: %v", err)
317 }
318 for r.Scan() {
319 rec := r.Result()
320 if err, ok := rec.(*benchfmt.SyntaxError); ok {
321 // Non-fatal result parse error. Warn
322 // but keep going.
323 log.Printf("Parse error: %v", err)
324 continue
325 }
326 res := rec.(*benchfmt.Result)
327 builder.Add(res)
328 }
329 if err := r.Err(); err != nil {
330 return err
331 }
332
333 // Run the comparison. We don't have any existing results so our
334 // duplicate policy doesn't matter here. Just pick replacement.
Michael Pratt1d025ee2022-12-22 12:11:58 -0500335 comparisons, err := builder.AllComparisonSeries(nil, benchseries.DUPE_REPLACE)
336 if err != nil {
337 return fmt.Errorf("failed to creation comparison series: %w", err)
338 }
Michael Prattbf722552022-04-08 12:41:06 -0400339
340 const (
341 confidence = 0.95
342 bootstrap = 1000
343 )
344
345 // Iterate over the comparisons, extract the results, and push them to Influx.
346 wapi := ifxc.WriteAPIBlocking(influx.Org, influx.Bucket)
347comparisonLoop:
348 for _, cs := range comparisons {
349 cs.AddSummaries(confidence, bootstrap)
350
351 summaries := cs.Summaries
352
353 // Build a map of residues with single values. Our benchmark pipeline enforces
354 // that the only key that has a differing value across benchmark runs of the same
355 // name and unit is "toolchain."
356 //
357 // Most other keys are singular for *all* benchmarks in a run (like "goos") but
358 // even those that are not (like "pkg") remain the same even if "toolchain" differs.
359 //
360 // We build a map instead of just using them because we need to decide at upload
361 // time whether the key is an Influx tag or field.
362 residues := make(map[string]string)
363 for _, r := range cs.Residues {
364 if len(r.Slice) > 1 {
365 log.Printf("found non-singular key %q with values %v; comparison may be invalid, skipping...", r.S, r.Slice)
366 continue comparisonLoop
367 }
368 residues[r.S] = r.Slice[0]
369 }
370
371 // N.B. In our case Series should have length 1, because we're processing
372 // a single result here. By default the string value here is the commit date.
373 for i, series := range cs.Series {
374 for j, benchmarkName := range cs.Benchmarks {
375 sum := summaries[i][j]
376 if !sum.Defined() {
377 log.Printf("Summary not defined for %s %s", series, benchmarkName)
378 continue
379 }
380
381 measurement := "benchmark-result" // measurement
Michael Pratt03ab3b02023-05-25 13:26:14 -0400382 benchmarkName = benchmarkName + suffix // tag
Michael Prattbf722552022-04-08 12:41:06 -0400383 series = series // time
384 center, low, high := sum.Center, sum.Low, sum.High // fields
385 unit := cs.Unit // tag
386 uploadTime := residues["upload-time"] // field
387 cpu := residues["cpu"] // tag
388 goarch := residues["goarch"] // tag
389 goos := residues["goos"] // tag
390 benchmarksCommit := residues["benchmarks-commit"] // field
391 baselineCommit := cs.HashPairs[series].DenHash // field
392 experimentCommit := cs.HashPairs[series].NumHash // field
Michael Prattbb9973b2022-06-24 13:41:27 -0400393 repository := residues["repository"] // tag
Michael Prattbf722552022-04-08 12:41:06 -0400394 branch := residues["branch"] // tag
395
Michael Prattbb9973b2022-06-24 13:41:27 -0400396 // cmd/bench didn't set repository prior to
397 // CL 413915. Older runs are all against go.
398 if repository == "" {
399 repository = "go"
400 }
401
Michael Prattbf722552022-04-08 12:41:06 -0400402 // Push to influx.
403 t, err := benchseries.ParseNormalizedDateString(series)
404 if err != nil {
405 return fmt.Errorf("error parsing normalized date: %w", err)
406 }
407 fields := map[string]interface{}{
408 "center": center,
409 "low": low,
410 "high": high,
411 "upload-time": uploadTime,
412 "benchmarks-commit": benchmarksCommit,
413 "baseline-commit": baselineCommit,
414 "experiment-commit": experimentCommit,
415 }
416 tags := map[string]string{
Michael Prattbb9973b2022-06-24 13:41:27 -0400417 "name": benchmarkName,
418 "unit": unit,
419 "cpu": cpu,
420 "goarch": goarch,
421 "goos": goos,
422 "repository": repository,
423 "branch": branch,
Michael Prattbf722552022-04-08 12:41:06 -0400424 // TODO(prattmic): Add pkg, which
425 // benchseries currently can't handle.
426 }
427 p := influxdb2.NewPoint(measurement, tags, fields, t)
428 if err := wapi.WritePoint(ctx, p); err != nil {
429 return fmt.Errorf("error writing point: %w", err)
430 }
431 }
432 }
433 }
434 return nil
435}