blob: 376eb41b39c6e9d68352115cdeeb8eee00609e20 [file] [log] [blame]
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package buildstats contains code to sync the coordinator's build
// logs from Datastore to BigQuery.
package buildstats // import "golang.org/x/build/internal/buildstats"
import (
"context"
"fmt"
"log"
"reflect"
"sort"
"strings"
"time"
"cloud.google.com/go/bigquery"
"cloud.google.com/go/datastore"
"golang.org/x/build/buildenv"
"golang.org/x/build/types"
"google.golang.org/api/googleapi"
"google.golang.org/api/iterator"
)
// Verbose controls logging verbosity.
var Verbose = false
// SyncBuilds syncs the datastore "Build" entities to the BigQuery "Builds" table.
// This stores information on each build as a whole, without details.
func SyncBuilds(ctx context.Context, env *buildenv.Environment) error {
bq, err := bigquery.NewClient(ctx, env.ProjectName)
if err != nil {
return err
}
defer bq.Close()
buildsTable := bq.Dataset("builds").Table("Builds")
meta, err := buildsTable.Metadata(ctx)
if ae, ok := err.(*googleapi.Error); ok && ae.Code == 404 {
if Verbose {
log.Printf("Creating table Builds...")
}
err = buildsTable.Create(ctx, nil)
if err == nil {
meta, err = buildsTable.Metadata(ctx)
}
}
if err != nil {
return fmt.Errorf("getting Builds table metadata: %v", err)
}
if Verbose {
log.Printf("buildstats: Builds metadata: %#v", meta)
}
if len(meta.Schema) == 0 {
if Verbose {
log.Printf("buildstats: builds table has empty schema")
}
schema, err := bigquery.InferSchema(types.BuildRecord{})
if err != nil {
return fmt.Errorf("InferSchema: %v", err)
}
blindWrite := ""
meta, err = buildsTable.Update(ctx, bigquery.TableMetadataToUpdate{Schema: schema}, blindWrite)
if err != nil {
return fmt.Errorf("table.Update schema: %v", err)
}
}
if Verbose {
for i, fs := range meta.Schema {
log.Printf(" schema[%v]: %+v", i, fs)
for j, fs := range fs.Schema {
log.Printf(" .. schema[%v]: %+v", j, fs)
}
}
}
q := bq.Query("SELECT MAX(EndTime) FROM builds.Builds")
it, err := q.Read(ctx)
if err != nil {
return fmt.Errorf("Read: %v", err)
}
var values []bigquery.Value
err = it.Next(&values)
if err == iterator.Done {
return fmt.Errorf("No result.")
}
if err != nil {
return fmt.Errorf("Next: %v", err)
}
var since time.Time
switch t := values[0].(type) {
case nil:
// NULL. No rows.
if Verbose {
log.Printf("buildstats: syncing Builds from the beginning")
}
case time.Time:
since = values[0].(time.Time)
default:
return fmt.Errorf("MAX(EndType) = %T: want nil or time.Time", t)
}
if Verbose {
log.Printf("Max is %v (%v)", since, since.Location())
}
ds, err := datastore.NewClient(ctx, env.ProjectName)
if err != nil {
return fmt.Errorf("datastore.NewClient: %v", err)
}
defer ds.Close()
up := buildsTable.Uploader()
if Verbose {
log.Printf("buildstats: Builds max time: %v", since)
}
dsq := datastore.NewQuery("Build")
if !since.IsZero() {
dsq = dsq.Filter("EndTime >", since).Filter("EndTime <", since.Add(24*90*time.Hour))
} else {
// Ignore rows without endtime.
dsq = dsq.Filter("EndTime >", time.Unix(1, 0))
}
dsq = dsq.Order("EndTime")
dsit := ds.Run(ctx, dsq)
var maxPut time.Time
for {
n := 0
var rows []*bigquery.ValuesSaver
for {
var s types.BuildRecord
key, err := dsit.Next(&s)
if err == iterator.Done {
break
}
n++
if err != nil {
return fmt.Errorf("error querying max EndTime: %v", err)
}
if s.EndTime.IsZero() {
return fmt.Errorf("got zero EndTime")
}
var row []bigquery.Value
var putSchema bigquery.Schema
rv := reflect.ValueOf(s)
for _, fs := range meta.Schema {
if fs.Name[0] == '_' {
continue
}
putSchema = append(putSchema, fs)
row = append(row, rv.FieldByName(fs.Name).Interface())
maxPut = s.EndTime
}
rows = append(rows, &bigquery.ValuesSaver{
Schema: putSchema,
InsertID: key.Encode(),
Row: row,
})
if len(rows) == 1000 {
break
}
}
if n == 0 {
return nil
}
err = up.Put(ctx, rows)
log.Printf("buildstats: Build sync put %d rows, up to %v. error = %v", len(rows), maxPut, err)
if err != nil {
return err
}
}
}
// SyncSpans syncs the datastore "Span" entities to the BigQuery "Spans" table.
// These contain the fine-grained timing details of how a build ran.
func SyncSpans(ctx context.Context, env *buildenv.Environment) error {
bq, err := bigquery.NewClient(ctx, env.ProjectName)
if err != nil {
log.Fatal(err)
}
defer bq.Close()
table := bq.Dataset("builds").Table("Spans")
meta, err := table.Metadata(ctx)
if ae, ok := err.(*googleapi.Error); ok && ae.Code == 404 {
log.Printf("Creating table Spans...")
err = table.Create(ctx, nil)
if err == nil {
meta, err = table.Metadata(ctx)
}
}
if err != nil {
return fmt.Errorf("Metadata: %#v", err)
}
if Verbose {
log.Printf("buildstats: Spans metadata: %#v", meta)
}
schema := meta.Schema
if len(schema) == 0 {
if Verbose {
log.Printf("EMPTY SCHEMA")
}
schema, err = bigquery.InferSchema(types.SpanRecord{})
if err != nil {
return fmt.Errorf("InferSchema: %v", err)
}
blindWrite := ""
meta, err := table.Update(ctx, bigquery.TableMetadataToUpdate{Schema: schema}, blindWrite)
if err != nil {
return fmt.Errorf("table.Update schema: %v", err)
}
schema = meta.Schema
}
if Verbose {
for i, fs := range schema {
log.Printf(" schema[%v]: %+v", i, fs)
for j, fs := range fs.Schema {
log.Printf(" .. schema[%v]: %+v", j, fs)
}
}
}
q := bq.Query("SELECT MAX(EndTime) FROM builds.Spans")
it, err := q.Read(ctx)
if err != nil {
return fmt.Errorf("Read: %v", err)
}
var since time.Time
var values []bigquery.Value
if err := it.Next(&values); err != nil {
if err == iterator.Done {
return fmt.Errorf("Expected at least one row fro MAX(EndTime) query; got none.")
}
return fmt.Errorf("Next: %v", err)
}
switch t := values[0].(type) {
case nil:
// NULL. No rows.
log.Printf("starting from the beginning...")
case time.Time:
since = values[0].(time.Time)
default:
return fmt.Errorf("MAX(EndType) = %T: want nil or time.Time", t)
}
if since.IsZero() {
since = time.Unix(1, 0) // arbitrary
}
ds, err := datastore.NewClient(ctx, env.ProjectName)
if err != nil {
return fmt.Errorf("datastore.NewClient: %v", err)
}
defer ds.Close()
up := table.Uploader()
if Verbose {
log.Printf("buildstats: Span max time: %v", since)
}
dsit := ds.Run(ctx, datastore.NewQuery("Span").Filter("EndTime >", since).Order("EndTime"))
var maxPut time.Time
for {
n := 0
var rows []*bigquery.ValuesSaver
for {
var s types.SpanRecord
key, err := dsit.Next(&s)
if err == iterator.Done {
break
}
n++
if err != nil {
log.Fatal(err)
}
if s.EndTime.IsZero() {
return fmt.Errorf("got zero endtime")
}
var row []bigquery.Value
var putSchema bigquery.Schema
rv := reflect.ValueOf(s)
for _, fs := range meta.Schema {
if fs.Name[0] == '_' {
continue
}
putSchema = append(putSchema, fs)
row = append(row, rv.FieldByName(fs.Name).Interface())
maxPut = s.EndTime
}
rows = append(rows, &bigquery.ValuesSaver{
Schema: putSchema,
InsertID: key.Encode(),
Row: row,
})
if len(rows) == 1000 {
break
}
}
if n == 0 {
return nil
}
err = up.Put(ctx, rows)
log.Printf("buildstats: Spans sync put %d rows, up to %v. error = %v", len(rows), maxPut, err)
if err != nil {
return err
}
}
}
// TestStats describes stats for a cmd/dist test on a particular build
// configuration (a "builder").
type TestStats struct {
// AsOf is the time that the stats were queried from BigQuery.
AsOf time.Time
// BuilderTestStats maps from a builder name to that builder's
// test stats.
BuilderTestStats map[string]*BuilderTestStats
}
// Duration returns the median time to run testName on builder, if known.
// Otherwise it returns some non-zero default value.
func (ts *TestStats) Duration(builder, testName string) time.Duration {
if ts != nil {
if bs, ok := ts.BuilderTestStats[builder]; ok {
if d, ok := bs.MedianDuration[testName]; ok {
return d
}
}
}
return 3 * time.Second // some arbitrary value if unknown
}
func (ts *TestStats) Builders() []string {
s := make([]string, 0, len(ts.BuilderTestStats))
for k := range ts.BuilderTestStats {
s = append(s, k)
}
sort.Strings(s)
return s
}
type BuilderTestStats struct {
// Builder is which build configuration this is for.
Builder string
// Runs is how many times tests have run recently, for some
// fuzzy definition of "recently".
// The map key is a cmd/dist test name.
Runs map[string]int
// MedianDuration is the median duration for a test to
// pass on this BuilderTestStat's Builder.
// The map key is a cmd/dist test name.
MedianDuration map[string]time.Duration
}
func (ts *BuilderTestStats) Tests() []string {
s := make([]string, 0, len(ts.Runs))
for k := range ts.Runs {
s = append(s, k)
}
sort.Strings(s)
return s
}
// QueryTestStats returns stats on all tests for all builders.
func QueryTestStats(ctx context.Context, env *buildenv.Environment) (*TestStats, error) {
ts := &TestStats{
AsOf: time.Now(),
BuilderTestStats: map[string]*BuilderTestStats{},
}
bq, err := bigquery.NewClient(ctx, env.ProjectName)
if err != nil {
return nil, err
}
defer bq.Close()
ctx, cancel := context.WithCancel(ctx)
defer cancel()
q := bq.Query(`
SELECT
Builder, Event, APPROX_QUANTILES(Seconds, 100)[OFFSET(50)] as MedianSec, COUNT(*) as N
FROM
builds.Spans
WHERE
Error='' AND
StartTime > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 500 HOUR)
AND Repo = "go"
AND Event LIKE 'run_test:%'
GROUP BY 1, 2
`)
it, err := q.Read(ctx)
if err != nil {
return nil, err
}
n := 0
for {
var row struct {
Builder string
Event string
MedianSec float64
N int
}
err := it.Next(&row)
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}
n++
if n > 50000 {
break
}
bs := ts.BuilderTestStats[row.Builder]
if bs == nil {
bs = &BuilderTestStats{
Builder: row.Builder,
Runs: map[string]int{},
MedianDuration: map[string]time.Duration{},
}
ts.BuilderTestStats[row.Builder] = bs
}
distTest := strings.TrimPrefix(row.Event, "run_test:")
bs.Runs[distTest] = row.N
bs.MedianDuration[distTest] = time.Duration(row.MedianSec * 1e9)
}
return ts, nil
}