blob: 8c5d643e17fa4c0e5887cefc7be3a0b28061c304 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Telemetrygodev serves the telemetry.go.dev website.
package main
import (
"context"
_ "embed"
"encoding/json"
"errors"
"flag"
"fmt"
"io"
"io/fs"
"log"
"net/http"
"os"
"path"
"strings"
"time"
"golang.org/x/mod/semver"
"golang.org/x/telemetry"
"golang.org/x/telemetry/godev"
"golang.org/x/telemetry/godev/internal/content"
"golang.org/x/telemetry/godev/internal/middleware"
"golang.org/x/telemetry/godev/internal/storage"
"golang.org/x/telemetry/godev/internal/unionfs"
tconfig "golang.org/x/telemetry/internal/config"
)
func main() {
flag.Parse()
ctx := context.Background()
cfg := newConfig()
buckets, err := buckets(ctx, cfg)
if err != nil {
log.Fatal(err)
}
ucfg, err := tconfig.ReadConfig(cfg.UploadConfig)
if err != nil {
log.Fatal(err)
}
fsys := fsys(cfg.DevMode)
mux := http.NewServeMux()
mux.Handle("/", handleRoot(fsys, ucfg, buckets))
mux.Handle("/upload/", handleUpload(ucfg, buckets))
mux.Handle("/charts/", handleChart(fsys, ucfg, buckets))
mw := middleware.Chain(
middleware.Log,
middleware.Timeout(cfg.RequestTimeout),
middleware.RequestSize(cfg.MaxRequestBytes),
middleware.Recover,
)
fmt.Printf("server listening at http://localhost:%s\n", cfg.Port)
log.Fatal(http.ListenAndServe(":"+cfg.Port, mw(mux)))
}
type link struct {
Text, URL string
}
type indexPage struct {
Charts []*link
Reports []*link
}
func handleRoot(fsys fs.FS, ucfg *tconfig.Config, buckets *stores) content.HandlerFunc {
cserv := content.Server(fsys)
return func(w http.ResponseWriter, r *http.Request) error {
if r.URL.Path != "/" {
cserv.ServeHTTP(w, r)
return nil
}
page := indexPage{}
ctx := r.Context()
it, err := buckets.chart.List(ctx, "")
if err != nil {
return err
}
for {
obj, err := it.Next()
if errors.Is(err, storage.ErrObjectIteratorDone) {
break
}
date := strings.TrimSuffix(obj, ".json")
page.Charts = append(page.Charts, &link{Text: date, URL: "/charts/" + date})
}
it, err = buckets.merge.List(ctx, "")
if err != nil {
return err
}
for {
obj, err := it.Next()
if errors.Is(err, storage.ErrObjectIteratorDone) {
break
}
page.Reports = append(page.Reports, &link{
Text: strings.TrimSuffix(obj, ".json"),
URL: buckets.merge.Location() + "/" + obj,
})
}
return content.Template(w, fsys, "index.html", page, http.StatusOK)
}
}
type chartPage struct {
Charts map[string]any
}
func handleChart(fsys fs.FS, ucfg *tconfig.Config, buckets *stores) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
ctx := r.Context()
p := strings.TrimPrefix(path.Clean(r.URL.Path), "/charts")
reader, err := buckets.chart.Reader(ctx, p+".json")
if errors.Is(err, storage.ErrObjectNotExist) {
return content.Status(w, http.StatusNotFound)
} else if err != nil {
return err
}
defer reader.Close()
data, err := io.ReadAll(reader)
if err != nil {
return err
}
var charts map[string]any
if err := json.Unmarshal(data, &charts); err != nil {
return err
}
page := chartPage{
Charts: charts,
}
return content.Template(w, fsys, "charts.html", page, http.StatusOK)
}
}
func handleUpload(ucfg *tconfig.Config, buckets *stores) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
if r.Method == "POST" {
var report telemetry.Report
if err := json.NewDecoder(r.Body).Decode(&report); err != nil {
return content.Error(err, http.StatusBadRequest)
}
if err := validate(&report, ucfg); err != nil {
return content.Error(err, http.StatusBadRequest)
}
// TODO: capture metrics for collisions.
ctx := r.Context()
name := fmt.Sprintf("%s/%g.json", report.Week, report.X)
f, err := buckets.upload.Writer(ctx, name)
if err != nil {
return err
}
defer f.Close()
if err := json.NewEncoder(f).Encode(report); err != nil {
return err
}
if err := f.Close(); err != nil {
return err
}
return content.Status(w, http.StatusOK)
}
return content.Status(w, http.StatusMethodNotAllowed)
}
}
// validate validates the telemetry report data against the latest config.
func validate(r *telemetry.Report, cfg *tconfig.Config) error {
// TODO: reject/drop data arrived too early or too late.
if _, err := time.Parse("2006-01-02", r.Week); err != nil {
return fmt.Errorf("invalid week %s", r.Week)
}
if !semver.IsValid(r.Config) {
return fmt.Errorf("invalid config %s", r.Config)
}
if r.X == 0 {
return fmt.Errorf("invalid X %g", r.X)
}
// TODO: We can probably keep known programs and counters even when a report
// includes something that has been removed from the latest config.
for _, p := range r.Programs {
if !cfg.HasGOARCH(p.GOARCH) ||
!cfg.HasGOOS(p.GOOS) ||
!cfg.HasGoVersion(p.GoVersion) ||
!cfg.HasProgram(p.Program) ||
!cfg.HasVersion(p.Program, p.Version) {
return fmt.Errorf("unknown program build %s@%s %s %s/%s", p.Program, p.Version, p.GoVersion, p.GOOS, p.GOARCH)
}
for c := range p.Counters {
if !cfg.HasCounter(p.Program, c) {
return fmt.Errorf("unknown counter %s", c)
}
}
for s := range p.Stacks {
if !cfg.HasStack(p.Program, s) {
return fmt.Errorf("unknown stack %s", s)
}
}
}
return nil
}
func fsys(fromOS bool) fs.FS {
var f fs.FS = godev.FS
if fromOS {
f = os.DirFS(".")
}
f, err := unionfs.Sub(f, "content/telemetrygodev", "content/shared", "third_party")
if err != nil {
log.Fatal(err)
}
return f
}
type stores struct {
upload storage.Store
merge storage.Store
chart storage.Store
}
func buckets(ctx context.Context, cfg *config) (*stores, error) {
if cfg.UseGCS && !cfg.onCloudRun() {
if err := os.Setenv("STORAGE_EMULATOR_HOST", cfg.StorageEmulatorHost); err != nil {
return nil, err
}
}
var upload storage.Store
var merge storage.Store
var chart storage.Store
var err error
if cfg.UseGCS {
upload, err = storage.NewGCStore(ctx, cfg.ProjectID, cfg.UploadBucket)
if err != nil {
return nil, err
}
merge, err = storage.NewGCStore(ctx, cfg.ProjectID, cfg.MergedBucket)
if err != nil {
return nil, err
}
chart, err = storage.NewGCStore(ctx, cfg.ProjectID, cfg.ChartDataBucket)
if err != nil {
return nil, err
}
} else {
upload, err = storage.NewFSStore(ctx, cfg.LocalStorage, cfg.UploadBucket)
if err != nil {
return nil, err
}
merge, err = storage.NewFSStore(ctx, cfg.LocalStorage, cfg.MergedBucket)
if err != nil {
return nil, err
}
chart, err = storage.NewFSStore(ctx, cfg.LocalStorage, cfg.ChartDataBucket)
if err != nil {
return nil, err
}
}
return &stores{upload, merge, chart}, nil
}