| // Copyright 2023 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package main |
| |
| import ( |
| "bufio" |
| "context" |
| "encoding/json" |
| "errors" |
| "flag" |
| "fmt" |
| "io/fs" |
| "log" |
| "net/http" |
| "net/url" |
| "os" |
| "sort" |
| "strings" |
| "time" |
| |
| cloudtasks "cloud.google.com/go/cloudtasks/apiv2" |
| taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb" |
| "golang.org/x/exp/slog" |
| "golang.org/x/mod/semver" |
| "golang.org/x/sync/errgroup" |
| "golang.org/x/telemetry/godev/internal/config" |
| "golang.org/x/telemetry/godev/internal/content" |
| ilog "golang.org/x/telemetry/godev/internal/log" |
| "golang.org/x/telemetry/godev/internal/middleware" |
| "golang.org/x/telemetry/godev/internal/storage" |
| tconfig "golang.org/x/telemetry/internal/config" |
| contentfs "golang.org/x/telemetry/internal/content" |
| "golang.org/x/telemetry/internal/telemetry" |
| "golang.org/x/telemetry/internal/unionfs" |
| ) |
| |
| func main() { |
| flag.Parse() |
| ctx := context.Background() |
| cfg := config.NewConfig() |
| |
| if cfg.UseGCS { |
| slog.SetDefault(slog.New(ilog.NewGCPLogHandler())) |
| } |
| |
| buckets, err := storage.NewAPI(ctx, cfg) |
| if err != nil { |
| log.Fatal(err) |
| } |
| ucfg, err := tconfig.ReadConfig(cfg.UploadConfig) |
| if err != nil { |
| log.Fatal(err) |
| } |
| fsys := fsys(cfg.DevMode) |
| cserv := content.Server(fsys) |
| mux := http.NewServeMux() |
| |
| mux.Handle("/", cserv) |
| mux.Handle("/merge/", handleMerge(buckets)) |
| mux.Handle("/chart/", handleChart(ucfg, buckets)) |
| mux.Handle("/queue-tasks/", handleTasks(cfg)) |
| mux.Handle("/copy/", handleCopy(cfg, buckets)) |
| |
| mw := middleware.Chain( |
| middleware.Log(slog.Default()), |
| middleware.Timeout(cfg.RequestTimeout), |
| middleware.RequestSize(cfg.MaxRequestBytes), |
| middleware.Recover(), |
| ) |
| |
| fmt.Printf("server listening at http://localhost:%s\n", cfg.WorkerPort) |
| log.Fatal(http.ListenAndServe(":"+cfg.WorkerPort, mw(mux))) |
| } |
| |
| // handleCopy copies uploaded reports from prod gcs bucket to dev gcs buckets. |
| func handleCopy(cfg *config.Config, dest *storage.API) content.HandlerFunc { |
| return func(w http.ResponseWriter, r *http.Request) error { |
| const prodBucket = "prod-telemetry-uploaded" |
| |
| if cfg.UploadBucket == prodBucket { |
| return content.Text(w, fmt.Sprintf("do not need to copy from %s to %s", prodBucket, cfg.UploadBucket), http.StatusOK) |
| } |
| |
| ctx := r.Context() |
| sourceBucket, err := storage.NewBucket(ctx, cfg, "prod-telemetry-uploaded") |
| if err != nil { |
| return err |
| } |
| |
| destBucket := dest.Upload |
| |
| start, end, err := parseDateRange(r.URL) |
| if err != nil { |
| return err |
| } |
| |
| // Copy files concurrently. |
| const concurrency = 10 |
| g, ctx := errgroup.WithContext(ctx) |
| g.SetLimit(concurrency) |
| |
| for date := start; !date.After(end); date = date.AddDate(0, 0, 1) { |
| it := sourceBucket.Objects(ctx, date.Format(time.DateOnly)) |
| for { |
| fileName, err := it.Next() |
| if errors.Is(err, storage.ErrObjectIteratorDone) { |
| break |
| } |
| g.Go(func() error { |
| if err != nil { |
| return err |
| } |
| return storage.Copy(ctx, destBucket.Object(fileName), sourceBucket.Object(fileName)) |
| }) |
| } |
| } |
| |
| return g.Wait() |
| } |
| } |
| |
| // handleTasks will populate the task queue that processes report |
| // data. Cloud Scheduler will be instrumented to call this endpoint |
| // daily to copy uploaded reports, merge reports and generate chart data. |
| // The copy tasks will copy uploaded data from prod to dev. |
| // The merge tasks will merge the previous 7 days reports. |
| // The chart tasks generate daily and weekly charts for the 7 days preceding |
| // today. |
| // - Daily chart: utilizes data exclusively from the specific date. |
| // - Weekly chart: encompasses 7 days of data, concluding on the specified date. |
| // TODO(golang/go#62575): adjust the date range to align with report |
| // upload cutoff. |
| func handleTasks(cfg *config.Config) content.HandlerFunc { |
| return func(w http.ResponseWriter, r *http.Request) error { |
| now := time.Now().UTC() |
| |
| // Copy the past 20 days uploaded reports from prod to dev gcs bucket. |
| if cfg.Env != "prod" { |
| url := cfg.WorkerURL + "/copy/?start=" + now.AddDate(0, 0, -1*20).Format(time.DateOnly) + "&end=" + now.Format(time.DateOnly) |
| if _, err := createHTTPTask(cfg, url); err != nil { |
| return err |
| } |
| } |
| for i := 7; i > 0; i-- { |
| date := now.AddDate(0, 0, -1*i).Format(time.DateOnly) |
| url := cfg.WorkerURL + "/merge/?date=" + date |
| if _, err := createHTTPTask(cfg, url); err != nil { |
| return err |
| } |
| } |
| // TODO(hxjiang): have an endpoint to produce all the json instead of a hard |
| // coded one day delay. |
| for i := 8; i > 1; i-- { |
| // Daily chart: generate chart using one day's data. |
| date := now.AddDate(0, 0, -1*i).Format(time.DateOnly) |
| url := cfg.WorkerURL + "/chart/?date=" + date |
| if _, err := createHTTPTask(cfg, url); err != nil { |
| return err |
| } |
| |
| // Weekly chart: generate chart using past 7 days' data. |
| end := now.AddDate(0, 0, -1*i) |
| start := end.AddDate(0, 0, -6) |
| url = cfg.WorkerURL + "/chart/?start=" + start.Format(time.DateOnly) + "&end=" + end.Format(time.DateOnly) |
| if _, err := createHTTPTask(cfg, url); err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| } |
| |
| // createHTTPTask constructs a task with a authorization token |
| // and HTTP target then adds it to a Queue. |
| func createHTTPTask(cfg *config.Config, url string) (*taskspb.Task, error) { |
| ctx := context.Background() |
| client, err := cloudtasks.NewClient(ctx) |
| if err != nil { |
| return nil, fmt.Errorf("cloudtasks.NewClient: %w", err) |
| } |
| defer client.Close() |
| |
| queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, cfg.QueueID) |
| req := &taskspb.CreateTaskRequest{ |
| Parent: queuePath, |
| Task: &taskspb.Task{ |
| MessageType: &taskspb.Task_HttpRequest{ |
| HttpRequest: &taskspb.HttpRequest{ |
| HttpMethod: taskspb.HttpMethod_POST, |
| Url: url, |
| AuthorizationHeader: &taskspb.HttpRequest_OidcToken{ |
| OidcToken: &taskspb.OidcToken{ |
| ServiceAccountEmail: cfg.IAPServiceAccount, |
| Audience: cfg.ClientID, |
| }, |
| }, |
| }, |
| }, |
| }, |
| } |
| |
| createdTask, err := client.CreateTask(ctx, req) |
| if err != nil { |
| return nil, fmt.Errorf("cloudtasks.CreateTask: %w", err) |
| } |
| return createdTask, nil |
| } |
| |
| // TODO: monitor duration and processed data volume. |
| func handleMerge(s *storage.API) content.HandlerFunc { |
| return func(w http.ResponseWriter, r *http.Request) error { |
| ctx := r.Context() |
| date := r.URL.Query().Get("date") |
| if _, err := time.Parse(time.DateOnly, date); err != nil { |
| return content.Error(err, http.StatusBadRequest) |
| } |
| it := s.Upload.Objects(ctx, date) |
| mergeWriter, err := s.Merge.Object(date + ".json").NewWriter(ctx) |
| if err != nil { |
| return err |
| } |
| defer mergeWriter.Close() |
| encoder := json.NewEncoder(mergeWriter) |
| var count int |
| for { |
| obj, err := it.Next() |
| if errors.Is(err, storage.ErrObjectIteratorDone) { |
| break |
| } |
| if err != nil { |
| return err |
| } |
| count++ |
| reader, err := s.Upload.Object(obj).NewReader(ctx) |
| if err != nil { |
| return err |
| } |
| defer reader.Close() |
| var report telemetry.Report |
| if err := json.NewDecoder(reader).Decode(&report); err != nil { |
| return err |
| } |
| if err := encoder.Encode(report); err != nil { |
| return err |
| } |
| if err := reader.Close(); err != nil { |
| return err |
| } |
| } |
| if err := mergeWriter.Close(); err != nil { |
| return err |
| } |
| msg := fmt.Sprintf("merged %d reports into %s/%s", count, s.Merge.URI(), date) |
| return content.Text(w, msg, http.StatusOK) |
| } |
| } |
| |
| func fileName(start, end time.Time) string { |
| if start.Equal(end) { |
| return end.Format(time.DateOnly) + ".json" |
| } |
| |
| return start.Format(time.DateOnly) + "_" + end.Format(time.DateOnly) + ".json" |
| } |
| |
| // parseDateRange returns the start and end date from the given url. |
| func parseDateRange(url *url.URL) (start, end time.Time, _ error) { |
| if dateString := url.Query().Get("date"); dateString != "" { |
| if url.Query().Get("start") != "" || url.Query().Get("end") != "" { |
| return time.Time{}, time.Time{}, content.Error(fmt.Errorf("start or end key should be empty when date key is being used"), http.StatusBadRequest) |
| } |
| date, err := time.Parse(time.DateOnly, dateString) |
| if err != nil { |
| return time.Time{}, time.Time{}, content.Error(err, http.StatusBadRequest) |
| } |
| return date, date, nil |
| } |
| |
| var err error |
| startString := url.Query().Get("start") |
| start, err = time.Parse(time.DateOnly, startString) |
| if err != nil { |
| return time.Time{}, time.Time{}, content.Error(err, http.StatusBadRequest) |
| } |
| endString := url.Query().Get("end") |
| end, err = time.Parse(time.DateOnly, endString) |
| if err != nil { |
| return time.Time{}, time.Time{}, content.Error(err, http.StatusBadRequest) |
| } |
| if end.Before(start) { |
| return time.Time{}, time.Time{}, content.Error(fmt.Errorf("end date is earlier than start"), http.StatusBadRequest) |
| } |
| return start, end, nil |
| } |
| |
| func readMergedReports(ctx context.Context, fileName string, s *storage.API) ([]telemetry.Report, error) { |
| in, err := s.Merge.Object(fileName).NewReader(ctx) |
| if errors.Is(err, storage.ErrObjectNotExist) { |
| return nil, content.Error(fmt.Errorf("merge file %s not found", fileName), http.StatusNotFound) |
| } |
| if err != nil { |
| return nil, err |
| } |
| defer in.Close() |
| |
| var reports []telemetry.Report |
| scanner := bufio.NewScanner(in) |
| for scanner.Scan() { |
| var report telemetry.Report |
| if err := json.Unmarshal(scanner.Bytes(), &report); err != nil { |
| return nil, err |
| } |
| reports = append(reports, report) |
| } |
| |
| return reports, nil |
| } |
| |
| func handleChart(cfg *tconfig.Config, s *storage.API) content.HandlerFunc { |
| return func(w http.ResponseWriter, r *http.Request) error { |
| ctx := r.Context() |
| |
| start, end, err := parseDateRange(r.URL) |
| if err != nil { |
| return err |
| } |
| |
| var reports []telemetry.Report |
| var xs []float64 |
| for date := start; !date.After(end); date = date.AddDate(0, 0, 1) { |
| dailyReports, err := readMergedReports(ctx, date.Format(time.DateOnly)+".json", s) |
| if err != nil { |
| return err |
| } |
| for _, r := range dailyReports { |
| reports = append(reports, r) |
| xs = append(xs, r.X) |
| } |
| } |
| |
| data := nest(reports) |
| charts := charts(cfg, start.Format(time.DateOnly), end.Format(time.DateOnly), data, xs) |
| |
| obj := fileName(start, end) |
| out, err := s.Chart.Object(obj).NewWriter(ctx) |
| if err != nil { |
| return err |
| } |
| defer out.Close() |
| |
| if err := json.NewEncoder(out).Encode(charts); err != nil { |
| return err |
| } |
| if err := out.Close(); err != nil { |
| return err |
| } |
| |
| msg := fmt.Sprintf("processed %d reports from date %s to %s into %s", len(reports), start.Format(time.DateOnly), end.Format(time.DateOnly), s.Chart.URI()+"/"+obj) |
| return content.Text(w, msg, http.StatusOK) |
| } |
| } |
| |
| type chartdata struct { |
| DateRange [2]string |
| Programs []*program |
| NumReports int |
| } |
| |
| type program struct { |
| ID string |
| Name string |
| Charts []*chart |
| } |
| |
| type chart struct { |
| ID string |
| Name string |
| Type string |
| Data []*datum |
| } |
| |
| func (c *chart) String() string { |
| bytes, _ := json.Marshal(c) |
| return string(bytes) |
| } |
| |
| type datum struct { |
| Week string |
| Key string |
| Value float64 |
| } |
| |
| func charts(cfg *tconfig.Config, start, end string, d data, xs []float64) *chartdata { |
| result := &chartdata{DateRange: [2]string{start, end}, NumReports: len(xs)} |
| for _, p := range cfg.Programs { |
| prog := &program{ID: "charts:" + p.Name, Name: p.Name} |
| result.Programs = append(result.Programs, prog) |
| var charts []*chart |
| if !telemetry.IsToolchainProgram(p.Name) { |
| charts = append(charts, d.partition(p.Name, "Version", p.Versions)) |
| } |
| charts = append(charts, |
| d.partition(p.Name, "GOOS", cfg.GOOS), |
| d.partition(p.Name, "GOARCH", cfg.GOARCH), |
| d.partition(p.Name, "GoVersion", cfg.GoVersion)) |
| for _, c := range p.Counters { |
| // TODO: add support for histogram counters by getting the counter type |
| // from the chart config. |
| charts = append(charts, d.partition(p.Name, c.Name, tconfig.Expand(c.Name))) |
| } |
| for _, p := range charts { |
| if p != nil { |
| prog.Charts = append(prog.Charts, p) |
| } |
| } |
| } |
| return result |
| } |
| |
| // partition builds a chart for the program and the counter. It can return nil |
| // if there is no data for the counter in dat. |
| func (d data) partition(program, counterPrefix string, counters []string) *chart { |
| count := &chart{ |
| ID: "charts:" + program + ":" + counterPrefix, |
| Name: counterPrefix, |
| Type: "partition", |
| } |
| pk := programName(program) |
| prefix, _ := splitCounterName(counterPrefix) |
| gk := graphName(prefix) |
| |
| var ( |
| counts = make(map[string]float64) // bucket name -> total count |
| end weekName // latest week observed |
| ) |
| for wk := range d { |
| if wk >= end { |
| end = wk |
| } |
| // TODO: when should this be number of reports? |
| // total := len(xs) |
| if total := len(d[wk][pk][gk][counterName(gk)]); total == 0 { |
| continue |
| } |
| // We group versions into major minor buckets, we must skip |
| // major minor versions we've already added to the dataset. |
| seen := make(map[string]bool) |
| for _, b := range counters { |
| // TODO(hyangah): let caller normalize names in counters. |
| counter := normalizeCounterName(counterPrefix, b) |
| if seen[counter] { |
| continue |
| } |
| seen[counter] = true |
| ck := counterName(counter) |
| // number of reports where count prefix:bucket > 0 |
| n := len(d[wk][pk][gk][ck]) |
| _, bucket := splitCounterName(counter) |
| |
| counts[bucket] += float64(n) |
| } |
| } |
| |
| if len(counts) == 0 { |
| return nil |
| } |
| |
| // datum.Week always points to the end date |
| for k, v := range counts { |
| d := &datum{ |
| Week: string(end), |
| Key: k, |
| Value: v, |
| } |
| count.Data = append(count.Data, d) |
| } |
| // Sort the data based on bucket name to ensure deterministic output. |
| sort.Slice(count.Data, func(i, j int) bool { |
| return count.Data[i].Key < count.Data[j].Key |
| }) |
| |
| return count |
| } |
| |
| // weekName is the date of the report week in the format "YYYY-MM-DD". |
| type weekName string |
| |
| // programName is the package path of the program, as used in |
| // telemetry.ProgramReport and chartconfig.Program. |
| // e.g. golang.org/x/tools/gopls, cmd/go. |
| type programName string |
| |
| // graphName is the graph name. |
| // A graph plots distribution or timeseries of related counters. |
| type graphName string |
| |
| // counterName is the counter name. |
| type counterName string |
| |
| // reportID is the upload report ID. |
| // The current implementation uses telemetry.Report.X, |
| // a random number, computed by the uploader when creating a Report object. |
| // See x/telemetry/internal/upload.(*uploader).createReport. |
| type reportID float64 |
| |
| type data map[weekName]map[programName]map[graphName]map[counterName]map[reportID]int64 |
| |
| // Names of special counters. |
| // Unlike other counters, these are constructed from the metadata in the report. |
| const ( |
| versionCounter = "Version" |
| goosCounter = "GOOS" |
| goarchCounter = "GOARCH" |
| goversionCounter = "GoVersion" |
| ) |
| |
| // nest groups the report data by week, program, prefix, counter, and x value |
| // summing together counter values for each program report in a report. |
| func nest(reports []telemetry.Report) data { |
| result := make(data) |
| for _, r := range reports { |
| for _, p := range r.Programs { |
| result.writeCount(r.Week, p.Program, versionCounter, p.Version, r.X, 1) |
| result.writeCount(r.Week, p.Program, goosCounter, p.GOOS, r.X, 1) |
| result.writeCount(r.Week, p.Program, goarchCounter, p.GOARCH, r.X, 1) |
| result.writeCount(r.Week, p.Program, goversionCounter, p.GoVersion, r.X, 1) |
| for c, value := range p.Counters { |
| prefix, _ := splitCounterName(c) |
| result.writeCount(r.Week, p.Program, prefix, c, r.X, value) |
| } |
| } |
| } |
| return result |
| } |
| |
| // readCount reads the count value based on the input keys. |
| // Return error if any key does not exist. |
| func (d data) readCount(week, program, prefix, counter string, x float64) (int64, error) { |
| wk := weekName(week) |
| if _, ok := d[wk]; !ok { |
| return -1, fmt.Errorf("missing weekKey %q", week) |
| } |
| pk := programName(program) |
| if _, ok := d[wk][pk]; !ok { |
| return -1, fmt.Errorf("missing programKey %q", program) |
| } |
| gk := graphName(prefix) |
| if _, ok := d[wk][pk][gk]; !ok { |
| return -1, fmt.Errorf("missing graphKey key %q", prefix) |
| } |
| ck := counterName(counter) |
| if _, ok := d[wk][pk][gk][ck]; !ok { |
| return -1, fmt.Errorf("missing counterKey %v", counter) |
| } |
| return d[wk][pk][gk][ck][reportID(x)], nil |
| } |
| |
| // writeCount writes the counter values to the result. When a report contains |
| // multiple program reports for the same program, the value of the counters |
| // in that report are summed together. |
| func (d data) writeCount(week, program, prefix, counter string, x float64, value int64) { |
| wk := weekName(week) |
| if _, ok := d[wk]; !ok { |
| d[wk] = make(map[programName]map[graphName]map[counterName]map[reportID]int64) |
| } |
| pk := programName(program) |
| if _, ok := d[wk][pk]; !ok { |
| d[wk][pk] = make(map[graphName]map[counterName]map[reportID]int64) |
| } |
| // We want to group and plot bucket/histogram counters with the same prefix. |
| // Use the prefix as the graph name. |
| gk := graphName(prefix) |
| if _, ok := d[wk][pk][gk]; !ok { |
| d[wk][pk][gk] = make(map[counterName]map[reportID]int64) |
| } |
| // TODO(hyangah): let caller pass the normalized counter name. |
| counter = normalizeCounterName(prefix, counter) |
| ck := counterName(counter) |
| if _, ok := d[wk][pk][gk][ck]; !ok { |
| d[wk][pk][gk][ck] = make(map[reportID]int64) |
| } |
| |
| // x is a random number sent with each upload report. |
| // Since there is no identifier for the uploader, we use x as the uploader ID |
| // to approximate the number of unique uploader. |
| id := reportID(x) |
| d[wk][pk][gk][ck][id] += value |
| // TODO: each uploader should send the report only once. |
| // Shouldn't we overwrite, instead of summing? |
| |
| // If the counter is an instance of a bucket counter or histogram counter |
| // record the value with a special counter (prefix). For example, if |
| // there are gopls/client:vscode-go, gopls/vlient:vim-go, ..., |
| // we compute the total number of gopls/client:* by summing up all values |
| // with a special counter name "gopls/client". |
| // TODO(hyangah): why do we want to compute the fraction, instead of showing |
| // the absolute number of reports? |
| if prefix != counter { |
| ck = counterName(prefix) |
| if _, ok := d[wk][pk][gk][ck]; !ok { |
| d[wk][pk][gk][ck] = make(map[reportID]int64) |
| } |
| d[wk][pk][gk][ck][id] += value |
| } |
| } |
| |
| // normalizeCounterName normalizes the counter name. |
| // More specifically, program version, goos, goarch, and goVersion |
| // are not a real counter, but information from the metadata in the report. |
| // This function constructs pseudo counter names to handle them |
| // like other normal counters in aggregation and chart drawing. |
| // To limit the cardinality of version and goVersion, this function |
| // uses only major and minor version numbers in the pseudo-counter names. |
| // If the counter is a normal counter name, it is returned as is. |
| func normalizeCounterName(prefix, counter string) string { |
| switch prefix { |
| case versionCounter: |
| if counter == "devel" { |
| return prefix + ":" + counter |
| } |
| if strings.HasPrefix(counter, "go") { |
| return prefix + ":" + goMajorMinor(counter) |
| } |
| return prefix + ":" + semver.MajorMinor(counter) |
| case goosCounter: |
| return prefix + ":" + counter |
| case goarchCounter: |
| return prefix + ":" + counter |
| case goversionCounter: |
| return prefix + ":" + goMajorMinor(counter) |
| } |
| return counter |
| } |
| |
| // splitCounterName gets splits the prefix and bucket splitCounterName of a counter name |
| // or a bucket name. For an input with no bucket part prefix and bucket |
| // are the same. |
| func splitCounterName(name string) (prefix, bucket string) { |
| prefix, bucket, found := strings.Cut(name, ":") |
| if !found { |
| bucket = prefix |
| } |
| return prefix, bucket |
| } |
| |
| // goMajorMinor gets the go<Maj>,<Min> version for a given go version. |
| // For example, go1.20.1 -> go1.20. |
| // TODO(hyangah): replace with go/version.Lang (available from go1.22) |
| // after our builders stop running go1.21. |
| func goMajorMinor(v string) string { |
| v = v[2:] |
| maj, x, ok := cutInt(v) |
| if !ok { |
| return "" |
| } |
| x = x[1:] |
| min, _, ok := cutInt(x) |
| if !ok { |
| return "" |
| } |
| return fmt.Sprintf("go%s.%s", maj, min) |
| } |
| |
| // cutInt scans the leading decimal number at the start of x to an integer |
| // and returns that value and the rest of the string. |
| func cutInt(x string) (n, rest string, ok bool) { |
| i := 0 |
| for i < len(x) && '0' <= x[i] && x[i] <= '9' { |
| i++ |
| } |
| if i == 0 || x[0] == '0' && i != 1 { |
| return "", "", false |
| } |
| return x[:i], x[i:], true |
| } |
| |
| func fsys(fromOS bool) fs.FS { |
| var f fs.FS = contentfs.FS |
| if fromOS { |
| f = os.DirFS("internal/content") |
| } |
| f, err := unionfs.Sub(f, "worker", "shared") |
| if err != nil { |
| log.Fatal(err) |
| } |
| return f |
| } |