blob: f1e6335a537603c0e7006c37d17c0783635f809c [file] [log] [blame]
// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// The copyuploads command copies uploads from GCS to the local filesystem
// storage, for use with local development of the worker.
//
// By default, this command copies the last 3 days of uploads from
// telemetry.go.dev to the local filesystem bucket local-telemetry-uploaded, at
// which point this data will be available when running ./godev/cmd/worker with
// no arguments.
//
// See --help for more details.
package main
import (
"bytes"
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"net/http"
"os"
"path"
"time"
"golang.org/x/telemetry/godev/internal/config"
"golang.org/x/telemetry/godev/internal/storage"
)
var (
daysBack = flag.Int("days_back", 3, "The number of days back to copy")
verbose = flag.Bool("v", false, "If set, enable verbose logging.")
)
func main() {
flag.Parse()
cfg := config.NewConfig()
ctx := context.Background()
fs, err := storage.NewFSBucket(ctx, cfg.LocalStorage, "local-telemetry-uploaded")
if err != nil {
log.Fatal(err)
}
start := time.Now()
for dayOffset := range *daysBack {
date := start.AddDate(0, 0, -dayOffset-1) // today's merged reports may not yet be available
dateString := date.Format(time.DateOnly)
byFile, err := downloadData(dateString)
if err != nil {
log.Fatalf("Downloading data for %s: %v", dateString, err)
}
for name, content := range byFile {
// Skip objects that already exist in local storage.
dest := fs.Object(path.Join(dateString, name))
if _, err := os.Stat(dest.(*storage.FSObject).Filename()); err == nil {
if *verbose {
log.Printf("Skipping existing object %s", name)
}
continue
}
if *verbose {
log.Printf("Starting copying object %s", name)
}
w, err := dest.NewWriter(ctx)
if err != nil {
log.Fatal(err)
}
if _, err := io.Copy(w, bytes.NewReader(content)); err != nil {
log.Fatal(err)
}
}
}
}
// downloadData downloads the merged telemetry data for the given date string
// (which must be in time.DateOnly format), and splits it back into individual
// uploaded files, keyed by their original name (<X>.json).
func downloadData(dateString string) (map[string][]byte, error) {
url := fmt.Sprintf("https://storage.googleapis.com/prod-telemetry-merged/%s.json", dateString)
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
return nil, fmt.Errorf("downloading %s failed with status %d", url, resp.StatusCode)
}
data, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
byFile := make(map[string][]byte)
for _, line := range bytes.Split(data, []byte("\n")) {
line = bytes.TrimSpace(line)
if len(line) == 0 {
continue // defensive: skip empty lines
}
var x struct{ X float64 }
if err := json.Unmarshal(line, &x); err != nil {
return nil, err
}
file := fmt.Sprintf("%g.json", x.X)
byFile[file] = append(line, '\n') // uploaded data is newline terminated
}
return byFile, nil
}