blob: f6395da54a5385192ffa9445fc65558d184fc3a2 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"bufio"
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"go/version"
"io/fs"
"log"
"net/http"
"net/url"
"os"
"sort"
"strings"
"time"
cloudtasks "cloud.google.com/go/cloudtasks/apiv2"
taskspb "cloud.google.com/go/cloudtasks/apiv2/cloudtaskspb"
"golang.org/x/exp/slog"
"golang.org/x/mod/semver"
"golang.org/x/sync/errgroup"
"golang.org/x/telemetry/godev/internal/config"
"golang.org/x/telemetry/godev/internal/content"
ilog "golang.org/x/telemetry/godev/internal/log"
"golang.org/x/telemetry/godev/internal/middleware"
"golang.org/x/telemetry/godev/internal/storage"
tconfig "golang.org/x/telemetry/internal/config"
contentfs "golang.org/x/telemetry/internal/content"
"golang.org/x/telemetry/internal/telemetry"
"golang.org/x/telemetry/internal/unionfs"
)
func main() {
flag.Parse()
ctx := context.Background()
cfg := config.NewConfig()
if cfg.UseGCS {
slog.SetDefault(slog.New(ilog.NewGCPLogHandler()))
}
buckets, err := storage.NewAPI(ctx, cfg)
if err != nil {
log.Fatal(err)
}
ucfg, err := tconfig.ReadConfig(cfg.UploadConfig)
if err != nil {
log.Fatal(err)
}
fsys := fsys(cfg.DevMode)
cserv := content.Server(fsys)
mux := http.NewServeMux()
mux.Handle("/", cserv)
mux.Handle("/merge/", handleMerge(buckets))
mux.Handle("/chart/", handleChart(ucfg, buckets))
mux.Handle("/queue-tasks/", handleTasks(cfg))
mux.Handle("/copy/", handleCopy(cfg, buckets))
mw := middleware.Chain(
middleware.Log(slog.Default()),
middleware.Timeout(cfg.RequestTimeout),
middleware.RequestSize(cfg.MaxRequestBytes),
middleware.Recover(),
)
fmt.Printf("server listening at http://localhost:%s\n", cfg.WorkerPort)
log.Fatal(http.ListenAndServe(":"+cfg.WorkerPort, mw(mux)))
}
// handleCopy copies uploaded reports from prod gcs bucket to dev gcs buckets.
func handleCopy(cfg *config.Config, dest *storage.API) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
const prodBucket = "prod-telemetry-uploaded"
if cfg.UploadBucket == prodBucket {
return content.Text(w, fmt.Sprintf("do not need to copy from %s to %s", prodBucket, cfg.UploadBucket), http.StatusOK)
}
ctx := r.Context()
sourceBucket, err := storage.NewBucket(ctx, cfg, "prod-telemetry-uploaded")
if err != nil {
return err
}
destBucket := dest.Upload
start, end, err := parseDateRange(r.URL)
if err != nil {
return err
}
// Copy files concurrently.
const concurrency = 10
g, ctx := errgroup.WithContext(ctx)
g.SetLimit(concurrency)
for date := start; !date.After(end); date = date.AddDate(0, 0, 1) {
it := sourceBucket.Objects(ctx, date.Format(telemetry.DateOnly))
for {
fileName, err := it.Next()
if errors.Is(err, storage.ErrObjectIteratorDone) {
break
}
g.Go(func() error {
if err != nil {
return err
}
return storage.Copy(ctx, destBucket.Object(fileName), sourceBucket.Object(fileName))
})
}
}
return g.Wait()
}
}
// handleTasks will populate the task queue that processes report
// data. Cloud Scheduler will be instrumented to call this endpoint
// daily to copy uploaded reports, merge reports and generate chart data.
// The copy tasks will copy uploaded data from prod to dev.
// The merge tasks will merge the previous 7 days reports.
// The chart tasks generate daily and weekly charts for the 7 days preceding
// today.
// - Daily chart: utilizes data exclusively from the specific date.
// - Weekly chart: encompasses 7 days of data, concluding on the specified date.
// TODO(golang/go#62575): adjust the date range to align with report
// upload cutoff.
func handleTasks(cfg *config.Config) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
now := time.Now().UTC()
// Copy the past 20 days uploaded reports from prod to dev gcs bucket.
if cfg.Env != "prod" {
url := cfg.WorkerURL + "/copy/?start=" + now.AddDate(0, 0, -1*20).Format(telemetry.DateOnly) + "&end=" + now.Format(telemetry.DateOnly)
if _, err := createHTTPTask(cfg, url); err != nil {
return err
}
}
for i := 7; i > 0; i-- {
date := now.AddDate(0, 0, -1*i).Format(telemetry.DateOnly)
url := cfg.WorkerURL + "/merge/?date=" + date
if _, err := createHTTPTask(cfg, url); err != nil {
return err
}
}
// TODO(hxjiang): have an endpoint to produce all the json instead of a hard
// coded one day delay.
for i := 8; i > 1; i-- {
// Daily chart: generate chart using one day's data.
date := now.AddDate(0, 0, -1*i).Format(telemetry.DateOnly)
url := cfg.WorkerURL + "/chart/?date=" + date
if _, err := createHTTPTask(cfg, url); err != nil {
return err
}
// Weekly chart: generate chart using past 7 days' data.
end := now.AddDate(0, 0, -1*i)
start := end.AddDate(0, 0, -6)
url = cfg.WorkerURL + "/chart/?start=" + start.Format(telemetry.DateOnly) + "&end=" + end.Format(telemetry.DateOnly)
if _, err := createHTTPTask(cfg, url); err != nil {
return err
}
}
return nil
}
}
// createHTTPTask constructs a task with a authorization token
// and HTTP target then adds it to a Queue.
func createHTTPTask(cfg *config.Config, url string) (*taskspb.Task, error) {
ctx := context.Background()
client, err := cloudtasks.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("cloudtasks.NewClient: %w", err)
}
defer client.Close()
queuePath := fmt.Sprintf("projects/%s/locations/%s/queues/%s", cfg.ProjectID, cfg.LocationID, cfg.QueueID)
req := &taskspb.CreateTaskRequest{
Parent: queuePath,
Task: &taskspb.Task{
MessageType: &taskspb.Task_HttpRequest{
HttpRequest: &taskspb.HttpRequest{
HttpMethod: taskspb.HttpMethod_POST,
Url: url,
AuthorizationHeader: &taskspb.HttpRequest_OidcToken{
OidcToken: &taskspb.OidcToken{
ServiceAccountEmail: cfg.IAPServiceAccount,
Audience: cfg.ClientID,
},
},
},
},
},
}
createdTask, err := client.CreateTask(ctx, req)
if err != nil {
return nil, fmt.Errorf("cloudtasks.CreateTask: %w", err)
}
return createdTask, nil
}
// TODO: monitor duration and processed data volume.
func handleMerge(s *storage.API) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
date := r.URL.Query().Get("date")
if _, err := time.Parse(telemetry.DateOnly, date); err != nil {
return content.Error(err, http.StatusBadRequest)
}
it := s.Upload.Objects(ctx, date)
mergeWriter, err := s.Merge.Object(date + ".json").NewWriter(ctx)
if err != nil {
return err
}
defer mergeWriter.Close()
encoder := json.NewEncoder(mergeWriter)
var count int
for {
obj, err := it.Next()
if errors.Is(err, storage.ErrObjectIteratorDone) {
break
}
if err != nil {
return err
}
count++
reader, err := s.Upload.Object(obj).NewReader(ctx)
if err != nil {
return err
}
defer reader.Close()
var report telemetry.Report
if err := json.NewDecoder(reader).Decode(&report); err != nil {
return err
}
if err := encoder.Encode(report); err != nil {
return err
}
if err := reader.Close(); err != nil {
return err
}
}
if err := mergeWriter.Close(); err != nil {
return err
}
msg := fmt.Sprintf("merged %d reports into %s/%s", count, s.Merge.URI(), date)
return content.Text(w, msg, http.StatusOK)
}
}
func fileName(start, end time.Time) string {
if start.Equal(end) {
return end.Format(telemetry.DateOnly) + ".json"
}
return start.Format(telemetry.DateOnly) + "_" + end.Format(telemetry.DateOnly) + ".json"
}
// parseDateRange returns the start and end date from the given url.
func parseDateRange(url *url.URL) (start, end time.Time, _ error) {
if dateString := url.Query().Get("date"); dateString != "" {
if url.Query().Get("start") != "" || url.Query().Get("end") != "" {
return time.Time{}, time.Time{}, content.Error(fmt.Errorf("start or end key should be empty when date key is being used"), http.StatusBadRequest)
}
date, err := time.Parse(telemetry.DateOnly, dateString)
if err != nil {
return time.Time{}, time.Time{}, content.Error(err, http.StatusBadRequest)
}
return date, date, nil
}
var err error
startString := url.Query().Get("start")
start, err = time.Parse(telemetry.DateOnly, startString)
if err != nil {
return time.Time{}, time.Time{}, content.Error(err, http.StatusBadRequest)
}
endString := url.Query().Get("end")
end, err = time.Parse(telemetry.DateOnly, endString)
if err != nil {
return time.Time{}, time.Time{}, content.Error(err, http.StatusBadRequest)
}
if end.Before(start) {
return time.Time{}, time.Time{}, content.Error(fmt.Errorf("end date is earlier than start"), http.StatusBadRequest)
}
return start, end, nil
}
func readMergedReports(ctx context.Context, fileName string, s *storage.API) ([]telemetry.Report, error) {
in, err := s.Merge.Object(fileName).NewReader(ctx)
if errors.Is(err, storage.ErrObjectNotExist) {
return nil, content.Error(fmt.Errorf("merge file %s not found", fileName), http.StatusNotFound)
}
if err != nil {
return nil, err
}
defer in.Close()
var reports []telemetry.Report
scanner := bufio.NewScanner(in)
for scanner.Scan() {
var report telemetry.Report
if err := json.Unmarshal(scanner.Bytes(), &report); err != nil {
return nil, err
}
reports = append(reports, report)
}
return reports, nil
}
func handleChart(cfg *tconfig.Config, s *storage.API) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
start, end, err := parseDateRange(r.URL)
if err != nil {
return err
}
var reports []telemetry.Report
var xs []float64
for date := start; !date.After(end); date = date.AddDate(0, 0, 1) {
dailyReports, err := readMergedReports(ctx, date.Format(telemetry.DateOnly)+".json", s)
if err != nil {
return err
}
for _, r := range dailyReports {
reports = append(reports, r)
xs = append(xs, r.X)
}
}
data := group(reports)
charts := charts(cfg, start.Format(telemetry.DateOnly), end.Format(telemetry.DateOnly), data, xs)
obj := fileName(start, end)
out, err := s.Chart.Object(obj).NewWriter(ctx)
if err != nil {
return err
}
defer out.Close()
if err := json.NewEncoder(out).Encode(charts); err != nil {
return err
}
if err := out.Close(); err != nil {
return err
}
msg := fmt.Sprintf("processed %d reports from date %s to %s into %s", len(reports), start.Format(telemetry.DateOnly), end.Format(telemetry.DateOnly), s.Chart.URI()+"/"+obj)
return content.Text(w, msg, http.StatusOK)
}
}
type chartdata struct {
DateRange [2]string
Programs []*program
NumReports int
}
type program struct {
ID string
Name string
Charts []*chart
}
type chart struct {
ID string
Name string
Type string
Data []*datum
}
func (c *chart) String() string {
bytes, _ := json.Marshal(c)
return string(bytes)
}
type datum struct {
Week string
Key string
Value float64
}
func charts(cfg *tconfig.Config, start, end string, d data, xs []float64) *chartdata {
result := &chartdata{DateRange: [2]string{start, end}, NumReports: len(xs)}
for _, p := range cfg.Programs {
prog := &program{ID: "charts:" + p.Name, Name: p.Name}
result.Programs = append(result.Programs, prog)
var charts []*chart
program := programName(p.Name)
if !telemetry.IsToolchainProgram(p.Name) {
charts = append(charts, d.partition(program, versionCounter, toSliceOf[bucketName](p.Versions), partitionOptions{
ignoreEmptyBuckets: true,
// Don't normalize buckets: we want to see counts for all versions.
compareBuckets: compareSemver,
}))
}
charts = append(charts,
d.partition(program, goosCounter, toSliceOf[bucketName](cfg.GOOS), partitionOptions{}),
d.partition(program, goarchCounter, toSliceOf[bucketName](cfg.GOARCH), partitionOptions{}),
d.partition(program, goversionCounter, toSliceOf[bucketName](cfg.GoVersion), partitionOptions{
ignoreEmptyBuckets: true,
normalizeBucket: func(b bucketName) bucketName {
// map go1.2.3 -> go1.2
return bucketName(goMajorMinor(string(b)))
},
compareBuckets: version.Compare,
}))
for _, c := range p.Counters {
// TODO: add support for histogram counters by getting the counter type
// from the chart config.
chart, _ := splitCounterName(c.Name)
var buckets []bucketName
for _, counter := range tconfig.Expand(c.Name) {
_, bucket := splitCounterName(counter)
buckets = append(buckets, bucket)
}
charts = append(charts, d.partition(program, chart, buckets, partitionOptions{}))
}
for _, p := range charts {
if p != nil {
prog.Charts = append(prog.Charts, p)
}
}
}
return result
}
// toSliceOf converts a slice of once string type to another.
func toSliceOf[To, From ~string](s []From) []To {
var s2 []To
for _, v := range s {
s2 = append(s2, To(v))
}
return s2
}
// compareSemver wraps semver.Compare, to differentiate equivalent semver
// lexically, as we want all sorting to be stable.
func compareSemver(x, y string) int {
c := semver.Compare(x, y)
if c != 0 {
return c
}
return compareLexically(x, y)
}
func compareLexically(x, y string) int {
switch {
case x < y:
return -1
case x == y:
return 0
default:
return 1
}
}
// partitionOptions controls the behavior of partition charts
type partitionOptions struct {
// If ignoreEmptyBuckets is set, don't include data points with 0 count.
ignoreEmptyBuckets bool
// If normalizeBuckets is provided, it is used to map bucket names to new
// values. Buckets that map to the same value will be merged.
normalizeBucket func(bucketName) bucketName // if set, used to
// If compareBuckets is provided, it is used to sort the buckets, where
// compareBuckets returns -1, 0, or +1 if x < y, x == y, or x > y.
// Otherwise, buckets are sorted lexically.
compareBuckets func(x, y string) int
}
// partition builds a chart for the program and the counter. It can return nil
// if there is no data for the counter in d.
func (d data) partition(program programName, chartName graphName, buckets []bucketName, opts partitionOptions) *chart {
chart := &chart{
ID: fmt.Sprintf("charts:%s:%s", program, chartName),
Name: string(chartName),
Type: "partition",
}
pk := programName(program)
var (
merged = make(map[bucketName]map[reportID]struct{}) // normalized bucket name -> merged report IDs
empty = true // keep track of empty reports, so they can be skipped
end weekName // latest week observed
)
for wk := range d {
if wk >= end {
end = wk
}
// We group versions into major minor buckets, we must skip
// major minor versions we've already added to the dataset.
seen := make(map[bucketName]bool)
for _, bucket := range buckets {
if seen[bucket] {
continue
}
seen[bucket] = true
key := bucket
if opts.normalizeBucket != nil {
key = opts.normalizeBucket(bucket)
}
if _, ok := merged[key]; !ok {
merged[key] = make(map[reportID]struct{})
}
for id := range d[wk][pk][chartName][bucket] {
empty = false
merged[key][id] = struct{}{}
}
}
}
if empty {
return nil
}
// datum.Week always points to the end date
for bucket, v := range merged {
if len(v) > 0 || !opts.ignoreEmptyBuckets {
d := &datum{
Week: string(end),
Key: string(bucket),
Value: float64(len(v)),
}
chart.Data = append(chart.Data, d)
}
}
compareBuckets := compareLexically
if opts.compareBuckets != nil {
compareBuckets = opts.compareBuckets
}
// Sort the data based on bucket name to ensure deterministic output.
sort.Slice(chart.Data, func(i, j int) bool {
return compareBuckets(chart.Data[i].Key, chart.Data[j].Key) < 0
})
return chart
}
// weekName is the date of the report week in the format "YYYY-MM-DD".
type weekName string
// programName is the package path of the program, as used in
// telemetry.ProgramReport and chartconfig.Program.
// e.g. golang.org/x/tools/gopls, cmd/go.
type programName string
// graphName is the graph name.
// A graph plots distribution or timeseries of related counters.
//
// TODO(rfindley): rename to chartName.
type graphName string
// counterName is the counter name. counterName is graphName:bucketName.
type counterName string
// bucketName is the bucket name.
type bucketName string
// reportID is the upload report ID.
// The current implementation uses telemetry.Report.X,
// a random number, computed by the uploader when creating a Report object.
// See x/telemetry/internal/upload.(*uploader).createReport.
type reportID float64
type data map[weekName]map[programName]map[graphName]map[bucketName]map[reportID]int64
// Names of special counters.
// Unlike other counters, these are constructed from the metadata in the report.
const (
versionCounter = "Version"
goosCounter = "GOOS"
goarchCounter = "GOARCH"
goversionCounter = "GoVersion"
)
// group groups the report data by week, program, prefix, counter, and x value
// summing together counter values for each program report in a report.
func group(reports []telemetry.Report) data {
result := make(data)
for _, r := range reports {
var (
week = weekName(r.Week)
// x is a random number sent with each upload report.
// Since there is no identifier for the uploader, we use x as the uploader ID
// to approximate the number of unique uploader.
//
// Multiple uploads with the same x will overwrite each other, so we set the
// value, rather than add it to the existing value.
id = reportID(r.X)
)
for _, p := range r.Programs {
program := programName(p.Program)
result.writeCount(week, program, versionCounter, bucketName(p.Version), id, 1)
result.writeCount(week, program, goosCounter, bucketName(p.GOOS), id, 1)
result.writeCount(week, program, goarchCounter, bucketName(p.GOARCH), id, 1)
result.writeCount(week, program, goversionCounter, bucketName(p.GoVersion), id, 1)
for c, value := range p.Counters {
chart, bucket := splitCounterName(c)
result.writeCount(week, program, chart, bucket, id, value)
}
}
}
return result
}
// writeCount writes the counter values to the result. When a report contains
// multiple program reports for the same program, the value of the counters
// in that report are summed together.
func (d data) writeCount(week weekName, program programName, chart graphName, bucket bucketName, id reportID, value int64) {
if _, ok := d[week]; !ok {
d[week] = make(map[programName]map[graphName]map[bucketName]map[reportID]int64)
}
if _, ok := d[week][program]; !ok {
d[week][program] = make(map[graphName]map[bucketName]map[reportID]int64)
}
if _, ok := d[week][program][chart]; !ok {
d[week][program][chart] = make(map[bucketName]map[reportID]int64)
}
if _, ok := d[week][program][chart][bucket]; !ok {
d[week][program][chart][bucket] = make(map[reportID]int64)
}
d[week][program][chart][bucket][id] = value
}
// splitCounterName gets splits the prefix and bucket splitCounterName of a counter name
// or a bucket name. For an input with no bucket part prefix and bucket
// are the same.
func splitCounterName(name string) (graphName, bucketName) {
prefix, bucket, found := strings.Cut(name, ":")
if !found {
bucket = prefix
}
return graphName(prefix), bucketName(bucket)
}
// goMajorMinor gets the go<Maj>,<Min> version for a given go version.
// For example, go1.20.1 -> go1.20.
// TODO(hyangah): replace with go/version.Lang (available from go1.22)
// after our builders stop running go1.21.
func goMajorMinor(v string) string {
v = v[2:]
maj, x, ok := cutInt(v)
if !ok {
return ""
}
x = x[1:]
min, _, ok := cutInt(x)
if !ok {
return ""
}
return fmt.Sprintf("go%s.%s", maj, min)
}
// cutInt scans the leading decimal number at the start of x to an integer
// and returns that value and the rest of the string.
func cutInt(x string) (n, rest string, ok bool) {
i := 0
for i < len(x) && '0' <= x[i] && x[i] <= '9' {
i++
}
if i == 0 || x[0] == '0' && i != 1 {
return "", "", false
}
return x[:i], x[i:], true
}
func fsys(fromOS bool) fs.FS {
var f fs.FS = contentfs.FS
if fromOS {
f = os.DirFS("internal/content")
}
f, err := unionfs.Sub(f, "worker", "shared")
if err != nil {
log.Fatal(err)
}
return f
}