godev/devtools/cmd/copyuploads: download data from the public endpoint

Download and split telemetry reports from the public merged bucket in
the copyuploads tool, so that GCS permissions are not a barrier to local
development.

Change-Id: Ia68231db08556df377c07c1cf1d964a41f0599bd
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/617176
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/godev/devtools/cmd/copyuploads/copyuploads.go b/godev/devtools/cmd/copyuploads/copyuploads.go
index bfc5652..f1e6335 100644
--- a/godev/devtools/cmd/copyuploads/copyuploads.go
+++ b/godev/devtools/cmd/copyuploads/copyuploads.go
@@ -5,35 +5,32 @@
 // The copyuploads command copies uploads from GCS to the local filesystem
 // storage, for use with local development of the worker.
 //
-// By default, this command copies the last 3 days of uploads from the
-// dev-telemetry-uploaded bucket in GCS to the local filesystem bucket
-// local-telemetry-uploaded, at which point this data will be available when
-// running ./godev/cmd/worker with no arguments.
-//
-// This command requires read permission to the go-telemetry GCS buckets.
-// TODO(rfindley): we could avoid the need for read permission by instead
-// downloading the public merged reports, and reassembling the individual
-// uploads.
+// By default, this command copies the last 3 days of uploads from
+// telemetry.go.dev to the local filesystem bucket local-telemetry-uploaded, at
+// which point this data will be available when running ./godev/cmd/worker with
+// no arguments.
 //
 // See --help for more details.
 package main
 
 import (
+	"bytes"
 	"context"
-	"errors"
+	"encoding/json"
 	"flag"
+	"fmt"
+	"io"
 	"log"
+	"net/http"
 	"os"
-	"strings"
+	"path"
 	"time"
 
-	"golang.org/x/sync/errgroup"
 	"golang.org/x/telemetry/godev/internal/config"
 	"golang.org/x/telemetry/godev/internal/storage"
 )
 
 var (
-	bucket   = flag.String("bucket", "dev-telemetry-uploaded", "The bucket to copy from.")
 	daysBack = flag.Int("days_back", 3, "The number of days back to copy")
 	verbose  = flag.Bool("v", false, "If set, enable verbose logging.")
 )
@@ -41,39 +38,25 @@
 func main() {
 	flag.Parse()
 
-	if !strings.HasSuffix(*bucket, "-uploaded") {
-		log.Fatal("-bucket must end in -uploaded")
-	}
-
 	cfg := config.NewConfig()
 	ctx := context.Background()
 
-	gcs, err := storage.NewGCSBucket(ctx, cfg.ProjectID, *bucket)
-	if err != nil {
-		log.Fatal(err)
-	}
 	fs, err := storage.NewFSBucket(ctx, cfg.LocalStorage, "local-telemetry-uploaded")
 	if err != nil {
 		log.Fatal(err)
 	}
 
-	// Copy files concurrently.
-	const concurrency = 5
-	g, ctx := errgroup.WithContext(ctx)
-	g.SetLimit(concurrency)
-
 	start := time.Now()
 	for dayOffset := range *daysBack {
-		date := start.AddDate(0, 0, -dayOffset)
-		it := gcs.Objects(ctx, date.Format(time.DateOnly))
-		for {
-			name, err := it.Next()
-			if errors.Is(err, storage.ErrObjectIteratorDone) {
-				break
-			}
-
+		date := start.AddDate(0, 0, -dayOffset-1) // today's merged reports may not yet be available
+		dateString := date.Format(time.DateOnly)
+		byFile, err := downloadData(dateString)
+		if err != nil {
+			log.Fatalf("Downloading data for %s: %v", dateString, err)
+		}
+		for name, content := range byFile {
 			// Skip objects that already exist in local storage.
-			dest := fs.Object(name)
+			dest := fs.Object(path.Join(dateString, name))
 			if _, err := os.Stat(dest.(*storage.FSObject).Filename()); err == nil {
 				if *verbose {
 					log.Printf("Skipping existing object %s", name)
@@ -83,17 +66,49 @@
 			if *verbose {
 				log.Printf("Starting copying object %s", name)
 			}
-
-			g.Go(func() error {
-				if err != nil {
-					return err
-				}
-				return storage.Copy(ctx, dest, gcs.Object(name))
-			})
+			w, err := dest.NewWriter(ctx)
+			if err != nil {
+				log.Fatal(err)
+			}
+			if _, err := io.Copy(w, bytes.NewReader(content)); err != nil {
+				log.Fatal(err)
+			}
 		}
 	}
+}
 
-	if err := g.Wait(); err != nil {
-		log.Fatal(err)
+// downloadData downloads the merged telemetry data for the given date string
+// (which must be in time.DateOnly format), and splits it back into individual
+// uploaded files, keyed by their original name (<X>.json).
+func downloadData(dateString string) (map[string][]byte, error) {
+	url := fmt.Sprintf("https://storage.googleapis.com/prod-telemetry-merged/%s.json", dateString)
+	resp, err := http.Get(url)
+	if err != nil {
+		return nil, err
 	}
+	defer resp.Body.Close()
+
+	if resp.StatusCode != 200 {
+		return nil, fmt.Errorf("downloading %s failed with status %d", url, resp.StatusCode)
+	}
+
+	data, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return nil, err
+	}
+
+	byFile := make(map[string][]byte)
+	for _, line := range bytes.Split(data, []byte("\n")) {
+		line = bytes.TrimSpace(line)
+		if len(line) == 0 {
+			continue // defensive: skip empty lines
+		}
+		var x struct{ X float64 }
+		if err := json.Unmarshal(line, &x); err != nil {
+			return nil, err
+		}
+		file := fmt.Sprintf("%g.json", x.X)
+		byFile[file] = append(line, '\n') // uploaded data is newline terminated
+	}
+	return byFile, nil
 }