godev/devtools/cmd/copyuploads: download data from the public endpoint
Download and split telemetry reports from the public merged bucket in
the copyuploads tool, so that GCS permissions are not a barrier to local
development.
Change-Id: Ia68231db08556df377c07c1cf1d964a41f0599bd
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/617176
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/godev/devtools/cmd/copyuploads/copyuploads.go b/godev/devtools/cmd/copyuploads/copyuploads.go
index bfc5652..f1e6335 100644
--- a/godev/devtools/cmd/copyuploads/copyuploads.go
+++ b/godev/devtools/cmd/copyuploads/copyuploads.go
@@ -5,35 +5,32 @@
// The copyuploads command copies uploads from GCS to the local filesystem
// storage, for use with local development of the worker.
//
-// By default, this command copies the last 3 days of uploads from the
-// dev-telemetry-uploaded bucket in GCS to the local filesystem bucket
-// local-telemetry-uploaded, at which point this data will be available when
-// running ./godev/cmd/worker with no arguments.
-//
-// This command requires read permission to the go-telemetry GCS buckets.
-// TODO(rfindley): we could avoid the need for read permission by instead
-// downloading the public merged reports, and reassembling the individual
-// uploads.
+// By default, this command copies the last 3 days of uploads from
+// telemetry.go.dev to the local filesystem bucket local-telemetry-uploaded, at
+// which point this data will be available when running ./godev/cmd/worker with
+// no arguments.
//
// See --help for more details.
package main
import (
+ "bytes"
"context"
- "errors"
+ "encoding/json"
"flag"
+ "fmt"
+ "io"
"log"
+ "net/http"
"os"
- "strings"
+ "path"
"time"
- "golang.org/x/sync/errgroup"
"golang.org/x/telemetry/godev/internal/config"
"golang.org/x/telemetry/godev/internal/storage"
)
var (
- bucket = flag.String("bucket", "dev-telemetry-uploaded", "The bucket to copy from.")
daysBack = flag.Int("days_back", 3, "The number of days back to copy")
verbose = flag.Bool("v", false, "If set, enable verbose logging.")
)
@@ -41,39 +38,25 @@
func main() {
flag.Parse()
- if !strings.HasSuffix(*bucket, "-uploaded") {
- log.Fatal("-bucket must end in -uploaded")
- }
-
cfg := config.NewConfig()
ctx := context.Background()
- gcs, err := storage.NewGCSBucket(ctx, cfg.ProjectID, *bucket)
- if err != nil {
- log.Fatal(err)
- }
fs, err := storage.NewFSBucket(ctx, cfg.LocalStorage, "local-telemetry-uploaded")
if err != nil {
log.Fatal(err)
}
- // Copy files concurrently.
- const concurrency = 5
- g, ctx := errgroup.WithContext(ctx)
- g.SetLimit(concurrency)
-
start := time.Now()
for dayOffset := range *daysBack {
- date := start.AddDate(0, 0, -dayOffset)
- it := gcs.Objects(ctx, date.Format(time.DateOnly))
- for {
- name, err := it.Next()
- if errors.Is(err, storage.ErrObjectIteratorDone) {
- break
- }
-
+ date := start.AddDate(0, 0, -dayOffset-1) // today's merged reports may not yet be available
+ dateString := date.Format(time.DateOnly)
+ byFile, err := downloadData(dateString)
+ if err != nil {
+ log.Fatalf("Downloading data for %s: %v", dateString, err)
+ }
+ for name, content := range byFile {
// Skip objects that already exist in local storage.
- dest := fs.Object(name)
+ dest := fs.Object(path.Join(dateString, name))
if _, err := os.Stat(dest.(*storage.FSObject).Filename()); err == nil {
if *verbose {
log.Printf("Skipping existing object %s", name)
@@ -83,17 +66,49 @@
if *verbose {
log.Printf("Starting copying object %s", name)
}
-
- g.Go(func() error {
- if err != nil {
- return err
- }
- return storage.Copy(ctx, dest, gcs.Object(name))
- })
+ w, err := dest.NewWriter(ctx)
+ if err != nil {
+ log.Fatal(err)
+ }
+ if _, err := io.Copy(w, bytes.NewReader(content)); err != nil {
+ log.Fatal(err)
+ }
}
}
+}
- if err := g.Wait(); err != nil {
- log.Fatal(err)
+// downloadData downloads the merged telemetry data for the given date string
+// (which must be in time.DateOnly format), and splits it back into individual
+// uploaded files, keyed by their original name (<X>.json).
+func downloadData(dateString string) (map[string][]byte, error) {
+ url := fmt.Sprintf("https://storage.googleapis.com/prod-telemetry-merged/%s.json", dateString)
+ resp, err := http.Get(url)
+ if err != nil {
+ return nil, err
}
+ defer resp.Body.Close()
+
+ if resp.StatusCode != 200 {
+ return nil, fmt.Errorf("downloading %s failed with status %d", url, resp.StatusCode)
+ }
+
+ data, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ byFile := make(map[string][]byte)
+ for _, line := range bytes.Split(data, []byte("\n")) {
+ line = bytes.TrimSpace(line)
+ if len(line) == 0 {
+ continue // defensive: skip empty lines
+ }
+ var x struct{ X float64 }
+ if err := json.Unmarshal(line, &x); err != nil {
+ return nil, err
+ }
+ file := fmt.Sprintf("%g.json", x.X)
+ byFile[file] = append(line, '\n') // uploaded data is newline terminated
+ }
+ return byFile, nil
}