| // Copyright 2023 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "bufio" |
| "context" |
| "encoding/json" |
| "errors" |
| "flag" |
| "fmt" |
| "io/fs" |
| "log" |
| "net/http" |
| "os" |
| "strings" |
| "time" |
| |
| cloudtasks "cloud.google.com/go/cloudtasks/apiv2" |
| taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb" |
| "golang.org/x/exp/slog" |
| "golang.org/x/mod/semver" |
| "golang.org/x/telemetry/godev/internal/config" |
| "golang.org/x/telemetry/godev/internal/content" |
| ilog "golang.org/x/telemetry/godev/internal/log" |
| "golang.org/x/telemetry/godev/internal/middleware" |
| "golang.org/x/telemetry/godev/internal/storage" |
| tconfig "golang.org/x/telemetry/internal/config" |
| contentfs "golang.org/x/telemetry/internal/content" |
| "golang.org/x/telemetry/internal/telemetry" |
| "golang.org/x/telemetry/internal/unionfs" |
| ) |
| |
| func main() { |
| flag.Parse() |
| ctx := context.Background() |
| cfg := config.NewConfig() |
| |
| if cfg.UseGCS { |
| slog.SetDefault(slog.New(ilog.NewGCPLogHandler())) |
| } |
| |
| buckets, err := storage.NewAPI(ctx, cfg) |
| if err != nil { |
| log.Fatal(err) |
| } |
| ucfg, err := tconfig.ReadConfig(cfg.UploadConfig) |
| if err != nil { |
| log.Fatal(err) |
| } |
| fsys := fsys(cfg.DevMode) |
| cserv := content.Server(fsys) |
| mux := http.NewServeMux() |
| |
| mux.Handle("/", cserv) |
| mux.Handle("/merge/", handleMerge(buckets)) |
| mux.Handle("/chart/", handleChart(ucfg, buckets)) |
| mux.Handle("/queue-tasks/", handleTasks(cfg)) |
| |
| mw := middleware.Chain( |
| middleware.Log(slog.Default()), |
| middleware.Timeout(cfg.RequestTimeout), |
| middleware.RequestSize(cfg.MaxRequestBytes), |
| middleware.Recover(), |
| ) |
| |
| fmt.Printf("server listening at http://localhost:%s\n", cfg.WorkerPort) |
| log.Fatal(http.ListenAndServe(":"+cfg.WorkerPort, mw(mux))) |
| } |
| |
| // handleTasks will populate the task queue that processes report |
| // data. Cloud Scheduler will be instrumented to call this endpoint |
| // daily to merge reports and generate chart data. The merge tasks |
| // will merge the previous weeks reports and the chart tasks will do |
| // the same minus one day. |
| // TODO(golang/go#62575): adjust the date range to align with report |
| // upload cutoff. |
| func handleTasks(cfg *config.Config) content.HandlerFunc { |
| return func(w http.ResponseWriter, r *http.Request) error { |
| now := time.Now().UTC() |
| for i := 7; i > 0; i-- { |
| date := now.AddDate(0, 0, -1*i).Format("2006-01-02") |
| url := cfg.WorkerURL + "/merge/?date=" + date |
| if _, err := createHTTPTask(cfg, url); err != nil { |
| return err |
| } |
| } |
| for i := 8; i > 1; i-- { |
| date := now.AddDate(0, 0, -1*i).Format("2006-01-02") |
| url := cfg.WorkerURL + "/chart/?date=" + date |
| if _, err := createHTTPTask(cfg, url); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| } |
| |
| // createHTTPTask constructs a task with a authorization token |
| // and HTTP target then adds it to a Queue. |
| func createHTTPTask(cfg *config.Config, url string) (*taskspb.Task, error) { |
| ctx := context.Background() |
| client, err := cloudtasks.NewClient(ctx) |
| if err != nil { |
| return nil, fmt.Errorf("cloudtasks.NewClient: %w", err) |
| } |
| defer client.Close() |
| |
| queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, cfg.QueueID) |
| req := &taskspb.CreateTaskRequest{ |
| Parent: queuePath, |
| Task: &taskspb.Task{ |
| MessageType: &taskspb.Task_HttpRequest{ |
| HttpRequest: &taskspb.HttpRequest{ |
| HttpMethod: taskspb.HttpMethod_POST, |
| Url: url, |
| AuthorizationHeader: &taskspb.HttpRequest_OidcToken{ |
| OidcToken: &taskspb.OidcToken{ |
| ServiceAccountEmail: cfg.IAPServiceAccount, |
| Audience: cfg.ClientID, |
| }, |
| }, |
| }, |
| }, |
| }, |
| } |
| |
| createdTask, err := client.CreateTask(ctx, req) |
| if err != nil { |
| return nil, fmt.Errorf("cloudtasks.CreateTask: %w", err) |
| } |
| return createdTask, nil |
| } |
| |
| // TODO: monitor duration and processed data volume. |
| func handleMerge(s *storage.API) content.HandlerFunc { |
| return func(w http.ResponseWriter, r *http.Request) error { |
| ctx := r.Context() |
| date := r.URL.Query().Get("date") |
| if _, err := time.Parse("2006-01-02", date); err != nil { |
| return content.Error(err, http.StatusBadRequest) |
| } |
| it := s.Upload.Objects(ctx, date) |
| mergeWriter, err := s.Merge.Object(date + ".json").NewWriter(ctx) |
| if err != nil { |
| return err |
| } |
| defer mergeWriter.Close() |
| encoder := json.NewEncoder(mergeWriter) |
| var count int |
| for { |
| obj, err := it.Next() |
| if errors.Is(err, storage.ErrObjectIteratorDone) { |
| break |
| } |
| if err != nil { |
| return err |
| } |
| count++ |
| reader, err := s.Upload.Object(obj).NewReader(ctx) |
| if err != nil { |
| return err |
| } |
| defer reader.Close() |
| var report telemetry.Report |
| if err := json.NewDecoder(reader).Decode(&report); err != nil { |
| return err |
| } |
| if err := encoder.Encode(report); err != nil { |
| return err |
| } |
| if err := reader.Close(); err != nil { |
| return err |
| } |
| } |
| if err := mergeWriter.Close(); err != nil { |
| return err |
| } |
| msg := fmt.Sprintf("merged %d reports into %s/%s", count, s.Merge.URI(), date) |
| return content.Text(w, msg, http.StatusOK) |
| } |
| } |
| |
| func handleChart(cfg *tconfig.Config, s *storage.API) content.HandlerFunc { |
| return func(w http.ResponseWriter, r *http.Request) error { |
| ctx := r.Context() |
| // TODO: use start date and end date to create a timeseries of data. |
| date := r.URL.Query().Get("date") |
| if _, err := time.Parse("2006-01-02", date); err != nil { |
| return content.Error(err, http.StatusBadRequest) |
| } |
| in, err := s.Merge.Object(date + ".json").NewReader(ctx) |
| if errors.Is(err, storage.ErrObjectNotExist) { |
| return content.Error(err, http.StatusNotFound) |
| } |
| if err != nil { |
| return err |
| } |
| defer in.Close() |
| |
| var reports []*telemetry.Report |
| var xs []float64 |
| scanner := bufio.NewScanner(in) |
| for scanner.Scan() { |
| var report telemetry.Report |
| if err := json.Unmarshal(scanner.Bytes(), &report); err != nil { |
| return err |
| } |
| reports = append(reports, &report) |
| xs = append(xs, report.X) |
| } |
| if err := in.Close(); err != nil { |
| return err |
| } |
| |
| data := nest(reports) |
| charts := charts(cfg, date, data, xs) |
| obj := fmt.Sprintf("%s.json", date) |
| out, err := s.Chart.Object(obj).NewWriter(ctx) |
| if err != nil { |
| return err |
| } |
| defer out.Close() |
| |
| if err := json.NewEncoder(out).Encode(charts); err != nil { |
| return err |
| } |
| if err := out.Close(); err != nil { |
| return err |
| } |
| |
| msg := fmt.Sprintf("processed %d reports into %s", len(reports), s.Chart.URI()+"/"+obj) |
| return content.Text(w, msg, http.StatusOK) |
| } |
| } |
| |
| type chartdata struct { |
| DateRange [2]string |
| Programs []*program |
| NumReports int |
| } |
| |
| type program struct { |
| ID string |
| Name string |
| Charts []*chart |
| } |
| |
| type chart struct { |
| ID string |
| Name string |
| Type string |
| Data []*datum |
| } |
| |
| func (c *chart) String() string { |
| bytes, _ := json.Marshal(c) |
| return string(bytes) |
| } |
| |
| type datum struct { |
| Week string |
| Key string |
| Value float64 |
| } |
| |
| func charts(cfg *tconfig.Config, date string, d data, xs []float64) *chartdata { |
| result := &chartdata{DateRange: [2]string{date, date}, NumReports: len(xs)} |
| for _, p := range cfg.Programs { |
| prog := &program{ID: "charts:" + p.Name, Name: p.Name} |
| result.Programs = append(result.Programs, prog) |
| var charts []*chart |
| if !telemetry.IsToolchainProgram(p.Name) { |
| charts = append(charts, partition(d, p.Name, "Version", p.Versions)) |
| } |
| charts = append(charts, |
| partition(d, p.Name, "GOOS", cfg.GOOS), |
| partition(d, p.Name, "GOARCH", cfg.GOARCH), |
| partition(d, p.Name, "GoVersion", cfg.GoVersion)) |
| for _, c := range p.Counters { |
| // TODO: add support for histogram counters by getting the counter type |
| // from the chart config. |
| charts = append(charts, partition(d, p.Name, c.Name, tconfig.Expand(c.Name))) |
| } |
| for _, p := range charts { |
| if p != nil { |
| prog.Charts = append(prog.Charts, p) |
| } |
| } |
| } |
| return result |
| } |
| |
| func histogram(dat data, program, name string, counters []string, xs []float64) *chart { |
| count := &chart{ |
| ID: "charts:" + program + ":" + name, |
| Name: name, |
| Type: "histogram", |
| } |
| for wk := range dat { |
| for _, c := range counters { |
| prefix, _ := splitCounterName(c) |
| pk := programKey{program} |
| gk := graphKey{prefix} |
| ck := counterKey{c} |
| |
| // TODO: consider using pre-computed distribution buckets to reduce the size |
| // of the output. |
| for _, x := range xs { |
| xk := xKey{x} |
| v := dat[wk][pk][gk][ck][xk] |
| total := dat[wk][pk][gk][counterKey{gk.prefix}][xk] |
| _, bucket := splitCounterName(c) |
| if total == 0 { |
| d := &datum{ |
| Week: wk.date, |
| Key: bucket, |
| } |
| count.Data = append(count.Data, d) |
| continue |
| } |
| d := &datum{ |
| Week: wk.date, |
| Key: bucket, |
| Value: float64(v) / float64(total), |
| } |
| count.Data = append(count.Data, d) |
| } |
| } |
| } |
| return count |
| } |
| |
| // partition builds a chart for the program and the counter. It can return nil |
| // if there is no data for the counter in dat. |
| func partition(dat data, program, counterPrefix string, counters []string) *chart { |
| count := &chart{ |
| ID: "charts:" + program + ":" + counterPrefix, |
| Name: counterPrefix, |
| Type: "partition", |
| } |
| pk := programKey{program} |
| prefix, _ := splitCounterName(counterPrefix) |
| gk := graphKey{prefix} |
| for wk := range dat { |
| // TODO: when should this be number of reports? |
| // total := len(xs) |
| total := len(dat[wk][pk][gk][counterKey{gk.prefix}]) |
| if total == 0 { |
| return nil |
| } |
| // We group versions into major minor buckets, we must skip |
| // major minor versions we've already added to the dataset. |
| seen := make(map[string]bool) |
| for _, b := range counters { |
| // TODO(hyangah): let caller normalize names in counters. |
| counter := normalizeCounterName(counterPrefix, b) |
| if seen[counter] { |
| continue |
| } |
| seen[counter] = true |
| ck := counterKey{counter} |
| // number of reports where count prefix:bucket > 0 |
| n := len(dat[wk][pk][gk][ck]) |
| _, bucket := splitCounterName(counter) |
| d := &datum{ |
| Week: wk.date, |
| Key: bucket, |
| Value: float64(n) / float64(total), |
| } |
| count.Data = append(count.Data, d) |
| } |
| } |
| return count |
| } |
| |
| type weekKey struct { |
| date string |
| } |
| |
| type programKey struct { |
| program string |
| } |
| |
| type graphKey struct { |
| prefix string |
| } |
| |
| type counterKey struct { |
| counter string |
| } |
| |
| type xKey struct { |
| X float64 |
| } |
| |
| type data map[weekKey]map[programKey]map[graphKey]map[counterKey]map[xKey]int64 |
| |
| // Names of special counters. |
| // Unlike other counters, these are constructed from the metadata in the report. |
| const ( |
| versionCounter = "Version" |
| goosCounter = "GOOS" |
| goarchCounter = "GOARCH" |
| goversionCounter = "GoVersion" |
| ) |
| |
| // nest groups the report data by week, program, prefix, counter, and x value |
| // summing together counter values for each program report in a report. |
| func nest(reports []*telemetry.Report) data { |
| result := make(data) |
| for _, r := range reports { |
| for _, p := range r.Programs { |
| result.writeCount(r.Week, p.Program, versionCounter, p.Version, r.X, 1) |
| result.writeCount(r.Week, p.Program, goosCounter, p.GOOS, r.X, 1) |
| result.writeCount(r.Week, p.Program, goarchCounter, p.GOARCH, r.X, 1) |
| result.writeCount(r.Week, p.Program, goversionCounter, p.GoVersion, r.X, 1) |
| for c, value := range p.Counters { |
| prefix, _ := splitCounterName(c) |
| result.writeCount(r.Week, p.Program, prefix, c, r.X, value) |
| } |
| } |
| } |
| return result |
| } |
| |
| // readCount reads the count value based on the input keys. |
| // Return nil if any key does not exist. |
| func (d data) readCount(week, program, prefix, counter string, x float64) (int64, error) { |
| wk := weekKey{week} |
| if _, ok := d[wk]; !ok { |
| return -1, fmt.Errorf("missing weekKey %q", week) |
| } |
| pk := programKey{program} |
| if _, ok := d[wk][pk]; !ok { |
| return -1, fmt.Errorf("missing programKey %q", program) |
| } |
| gk := graphKey{prefix} |
| if _, ok := d[wk][pk][gk]; !ok { |
| return -1, fmt.Errorf("missing graphKey key %q", prefix) |
| } |
| ck := counterKey{counter} |
| if _, ok := d[wk][pk][gk][ck]; !ok { |
| return -1, fmt.Errorf("missing counterKey %v", counter) |
| } |
| return d[wk][pk][gk][ck][xKey{x}], nil |
| } |
| |
| // writeCount writes the counter values to the result. When a report contains |
| // multiple program reports for the same program, the value of the counters |
| // in that report are summed together. |
| func (d data) writeCount(week, program, prefix, counter string, x float64, value int64) { |
| wk := weekKey{week} |
| if _, ok := d[wk]; !ok { |
| d[wk] = make(map[programKey]map[graphKey]map[counterKey]map[xKey]int64) |
| } |
| pk := programKey{program} |
| if _, ok := d[wk][pk]; !ok { |
| d[wk][pk] = make(map[graphKey]map[counterKey]map[xKey]int64) |
| } |
| gk := graphKey{prefix} |
| if _, ok := d[wk][pk][gk]; !ok { |
| d[wk][pk][gk] = make(map[counterKey]map[xKey]int64) |
| } |
| // TODO(hyangah): let caller pass the normalized counter name. |
| counter = normalizeCounterName(prefix, counter) |
| ck := counterKey{counter} |
| if _, ok := d[wk][pk][gk][ck]; !ok { |
| d[wk][pk][gk][ck] = make(map[xKey]int64) |
| } |
| xk := xKey{x} |
| d[wk][pk][gk][ck][xk] += value |
| // record the total for all counters with the prefix |
| // as the bucket name. |
| if prefix != counter { |
| ck = counterKey{prefix} |
| if _, ok := d[wk][pk][gk][ck]; !ok { |
| d[wk][pk][gk][ck] = make(map[xKey]int64) |
| } |
| d[wk][pk][gk][ck][xk] += value |
| } |
| } |
| |
| // normalizeCounterName normalizes the counter name. |
| // More specifically, program version, goos, goarch, and goVersion |
| // are not a real counter, but information from the metadata in the report. |
| // This function constructs pseudo counter names to handle them |
| // like other normal counters in aggregation and chart drawing. |
| // To limit the cardinality of version and goVersion, this function |
| // uses only major and minor version numbers in the pseudo-counter names. |
| // If the counter is a normal counter name, it is returned as is. |
| func normalizeCounterName(prefix, counter string) string { |
| switch prefix { |
| case versionCounter: |
| if counter == "devel" { |
| return prefix + ":" + counter |
| } |
| if strings.HasPrefix(counter, "go") { |
| return prefix + ":" + goMajorMinor(counter) |
| } |
| return prefix + ":" + semver.MajorMinor(counter) |
| case goosCounter: |
| return prefix + ":" + counter |
| case goarchCounter: |
| return prefix + ":" + counter |
| case goversionCounter: |
| return prefix + ":" + goMajorMinor(counter) |
| } |
| return counter |
| } |
| |
| // splitCounterName gets splits the prefix and bucket splitCounterName of a counter name |
| // or a bucket name. For an input with no bucket part prefix and bucket |
| // are the same. |
| func splitCounterName(name string) (prefix, bucket string) { |
| prefix, bucket, found := strings.Cut(name, ":") |
| if !found { |
| bucket = prefix |
| } |
| return prefix, bucket |
| } |
| |
| // goMajorMinor gets the go<Maj>,<Min> version for a given go version. |
| // For example, go1.20.1 -> go1.20. |
| // TODO(hyangah): replace with go/version.Lang (available from go1.22) |
| // after our builders stop running go1.21. |
| func goMajorMinor(v string) string { |
| v = v[2:] |
| maj, x, ok := cutInt(v) |
| if !ok { |
| return "" |
| } |
| x = x[1:] |
| min, _, ok := cutInt(x) |
| if !ok { |
| return "" |
| } |
| return fmt.Sprintf("go%s.%s", maj, min) |
| } |
| |
| // cutInt scans the leading decimal number at the start of x to an integer |
| // and returns that value and the rest of the string. |
| func cutInt(x string) (n, rest string, ok bool) { |
| i := 0 |
| for i < len(x) && '0' <= x[i] && x[i] <= '9' { |
| i++ |
| } |
| if i == 0 || x[0] == '0' && i != 1 { |
| return "", "", false |
| } |
| return x[:i], x[i:], true |
| } |
| func fsys(fromOS bool) fs.FS { |
| var f fs.FS = contentfs.FS |
| if fromOS { |
| f = os.DirFS("internal/content") |
| } |
| f, err := unionfs.Sub(f, "worker", "shared") |
| if err != nil { |
| log.Fatal(err) |
| } |
| return f |
| } |