godev: simplify local development of the worker
To make it easier to develop the worker, add a new
./godev/devtools/cmd/copyuploads command to copy uploaded reports from a
GCS bucket to the local filesystem. After running this script, running
./godev/cmd/worker with no arguments will have access to recent uploaded
reports in the local filesystem environment.
Also update the README for local development, and update the default
config to set the GCP ProjectID (without this setting, I assume it was
using whatever was returned by gcloud config get project).
Finally, remove the unused Config.StorageEmulatorHost. This had confused
me due to the overlap with the STORAGE_EMULATOR_HOST environment
variable read by the cloud.google.com/storage package.
Change-Id: Ia0565f67fa763ac3d74aba0631a92e8f21b506f8
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/615776
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/godev/cmd/telemetrygodev/main_test.go b/godev/cmd/telemetrygodev/main_test.go
index 1ba8f69..769fb7a 100644
--- a/godev/cmd/telemetrygodev/main_test.go
+++ b/godev/cmd/telemetrygodev/main_test.go
@@ -86,6 +86,8 @@
ctx := context.Background()
cfg := config.NewConfig()
cfg.LocalStorage = t.TempDir()
+ cfg.ProjectID = "" // defensive: don't use a real project ID for tests.
+
// NewConfig assumes that the command is run from the repo root, but tests
// run from their test directory. We should fix this, but for now just
// fix up the config path.
diff --git a/godev/cmd/worker/README.md b/godev/cmd/worker/README.md
index 453dfbd..8324625 100644
--- a/godev/cmd/worker/README.md
+++ b/godev/cmd/worker/README.md
@@ -37,8 +37,9 @@
Similar to the /chart endpoint, /copy also supports the following query
parameters:
-- `/?date=<YYYY-MM-DD>``: Copies reports for a specific date.
-- `/?start=<YYYY-MM-DD>&end=<YYYY-MM-DD>``: Copies reports within a specified date range.
+- `/copy/?date=<YYYY-MM-DD>`: Copies reports for a specific date.
+- `/copy/?start=<YYYY-MM-DD>&end=<YYYY-MM-DD>`: Copies reports within a
+ specified date range.
### `/queue-tasks`
@@ -51,17 +52,56 @@
## Local Development
-For local development, simply build and run. It serves on localhost:8082.
+The preferred method of local develoment is to simply build and run the worker
+binary. Use PORT= to customize the default hosting port.
go run ./godev/cmd/worker
-By default, the server will use the filesystem for storage object I/O. Use the
--gcs flag to use the Cloud Storage API.
+By default, the server will use the filesystem for storage object I/O (see
+[`GO_TELEMETRY_LOCAL_STORAGE`](#environment-variables)). Unless you have also
+uploaded reports through a local instance of the telemetry frontend, this local
+storage will be empty. To copy uploads from GCS to the local environment, run:
+
+ go run ./godev/devtools/cmd/copyuploads -v
+
+Note that this command requires read permission to our GCS buckets.
+
+So, this is a complete end-to-end test of the merge endpoint:
+
+1. First, copy data with:
+
+ ```
+ go run ./godev/devtools/cmd/copyuploads -v
+ ```
+
+2. Then, run the worker:
+
+ ```
+ go run ./godev/cmd/worker
+ ```
+
+3. Finally, in a separate terminal, trigger the merge operation:
+
+ ```
+ curl http://localhost:8082/merge/?date=2024-09-26
+ ```
+
+After doing this, you should see the resulting merged reports in the
+`./localstorage/local-telemetry-merged` directory.
+
+Note: the `/queue-tasks/` endpoint does not currently work locally: by default
+it tries to enqueue tasks in the associated GCP project, which will fail unless
+you have escalated permissions on GCP.
+
+### Local development using GCS
+
+Alternatively, you can use the -gcs flag to use the Cloud Storage API:
go run ./godev/cmd/worker --gcs
-Optionally, use the localstorage devtool the emulate the GCS server on your
-machine.
+However, the above command requires write permissions to our public GCS buckets,
+which one should in general not request. Instead, use the localstorage devtool
+the emulate the GCS server on your machine.
./godev/devtools/localstorage.sh
STORAGE_EMULATOR_HOST=localhost:8081 go run ./godev/cmd/worker --gcs
diff --git a/godev/cmd/worker/main.go b/godev/cmd/worker/main.go
index f6395da..e7dd88c 100644
--- a/godev/cmd/worker/main.go
+++ b/godev/cmd/worker/main.go
@@ -133,6 +133,8 @@
// - Weekly chart: encompasses 7 days of data, concluding on the specified date.
// TODO(golang/go#62575): adjust the date range to align with report
// upload cutoff.
+//
+// TODO(rfindley): use a local task queue when not run with -gcs.
func handleTasks(cfg *config.Config) content.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) error {
now := time.Now().UTC()
diff --git a/godev/devtools/cmd/copyuploads/copyuploads.go b/godev/devtools/cmd/copyuploads/copyuploads.go
new file mode 100644
index 0000000..bfc5652
--- /dev/null
+++ b/godev/devtools/cmd/copyuploads/copyuploads.go
@@ -0,0 +1,99 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// The copyuploads command copies uploads from GCS to the local filesystem
+// storage, for use with local development of the worker.
+//
+// By default, this command copies the last 3 days of uploads from the
+// dev-telemetry-uploaded bucket in GCS to the local filesystem bucket
+// local-telemetry-uploaded, at which point this data will be available when
+// running ./godev/cmd/worker with no arguments.
+//
+// This command requires read permission to the go-telemetry GCS buckets.
+// TODO(rfindley): we could avoid the need for read permission by instead
+// downloading the public merged reports, and reassembling the individual
+// uploads.
+//
+// See --help for more details.
+package main
+
+import (
+ "context"
+ "errors"
+ "flag"
+ "log"
+ "os"
+ "strings"
+ "time"
+
+ "golang.org/x/sync/errgroup"
+ "golang.org/x/telemetry/godev/internal/config"
+ "golang.org/x/telemetry/godev/internal/storage"
+)
+
+var (
+ bucket = flag.String("bucket", "dev-telemetry-uploaded", "The bucket to copy from.")
+ daysBack = flag.Int("days_back", 3, "The number of days back to copy")
+ verbose = flag.Bool("v", false, "If set, enable verbose logging.")
+)
+
+func main() {
+ flag.Parse()
+
+ if !strings.HasSuffix(*bucket, "-uploaded") {
+ log.Fatal("-bucket must end in -uploaded")
+ }
+
+ cfg := config.NewConfig()
+ ctx := context.Background()
+
+ gcs, err := storage.NewGCSBucket(ctx, cfg.ProjectID, *bucket)
+ if err != nil {
+ log.Fatal(err)
+ }
+ fs, err := storage.NewFSBucket(ctx, cfg.LocalStorage, "local-telemetry-uploaded")
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ // Copy files concurrently.
+ const concurrency = 5
+ g, ctx := errgroup.WithContext(ctx)
+ g.SetLimit(concurrency)
+
+ start := time.Now()
+ for dayOffset := range *daysBack {
+ date := start.AddDate(0, 0, -dayOffset)
+ it := gcs.Objects(ctx, date.Format(time.DateOnly))
+ for {
+ name, err := it.Next()
+ if errors.Is(err, storage.ErrObjectIteratorDone) {
+ break
+ }
+
+ // Skip objects that already exist in local storage.
+ dest := fs.Object(name)
+ if _, err := os.Stat(dest.(*storage.FSObject).Filename()); err == nil {
+ if *verbose {
+ log.Printf("Skipping existing object %s", name)
+ }
+ continue
+ }
+ if *verbose {
+ log.Printf("Starting copying object %s", name)
+ }
+
+ g.Go(func() error {
+ if err != nil {
+ return err
+ }
+ return storage.Copy(ctx, dest, gcs.Object(name))
+ })
+ }
+ }
+
+ if err := g.Wait(); err != nil {
+ log.Fatal(err)
+ }
+}
diff --git a/godev/internal/config/config.go b/godev/internal/config/config.go
index fe3f710..b6eb55e 100644
--- a/godev/internal/config/config.go
+++ b/godev/internal/config/config.go
@@ -37,9 +37,6 @@
// ClientID is the OAuth client used in authentication for queue tasks.
ClientID string
- // StorageEmulatorHost is a network address for a Cloud Storage emulator.
- StorageEmulatorHost string
-
// LocalStorage is a directory for storage I/O used when the using the filesystem
// or storage emulator modes.
LocalStorage string
@@ -85,25 +82,24 @@
func NewConfig() *Config {
environment := env("GO_TELEMETRY_ENV", "local")
return &Config{
- ServerPort: env("PORT", "8080"),
- WorkerPort: env("PORT", "8082"),
- WorkerURL: env("GO_TELEMETRY_WORKER_URL", "http://localhost:8082"),
- ProjectID: env("GO_TELEMETRY_PROJECT_ID", ""),
- LocationID: env("GO_TELEMETRY_LOCATION_ID", ""),
- QueueID: environment + "-worker-tasks",
- IAPServiceAccount: env("GO_TELEMETRY_IAP_SERVICE_ACCOUNT", ""),
- ClientID: env("GO_TELEMETRY_CLIENT_ID", ""),
- StorageEmulatorHost: env("GO_TELEMETRY_STORAGE_EMULATOR_HOST", "localhost:8081"),
- LocalStorage: env("GO_TELEMETRY_LOCAL_STORAGE", ".localstorage"),
- ChartDataBucket: environment + "-telemetry-charted",
- Env: environment,
- MergedBucket: environment + "-telemetry-merged",
- UploadBucket: environment + "-telemetry-uploaded",
- UploadConfig: env("GO_TELEMETRY_UPLOAD_CONFIG", "./config/config.json"),
- MaxRequestBytes: env("GO_TELEMETRY_MAX_REQUEST_BYTES", int64(100*1024)),
- RequestTimeout: 10 * time.Duration(time.Minute),
- UseGCS: *useGCS,
- DevMode: *devMode,
+ ServerPort: env("PORT", "8080"),
+ WorkerPort: env("PORT", "8082"),
+ WorkerURL: env("GO_TELEMETRY_WORKER_URL", "http://localhost:8082"),
+ ProjectID: env("GO_TELEMETRY_PROJECT_ID", "go-telemetry"),
+ LocationID: env("GO_TELEMETRY_LOCATION_ID", ""),
+ QueueID: environment + "-worker-tasks",
+ IAPServiceAccount: env("GO_TELEMETRY_IAP_SERVICE_ACCOUNT", ""),
+ ClientID: env("GO_TELEMETRY_CLIENT_ID", ""),
+ LocalStorage: env("GO_TELEMETRY_LOCAL_STORAGE", ".localstorage"),
+ ChartDataBucket: environment + "-telemetry-charted",
+ Env: environment,
+ MergedBucket: environment + "-telemetry-merged",
+ UploadBucket: environment + "-telemetry-uploaded",
+ UploadConfig: env("GO_TELEMETRY_UPLOAD_CONFIG", "./config/config.json"),
+ MaxRequestBytes: env("GO_TELEMETRY_MAX_REQUEST_BYTES", int64(100*1024)),
+ RequestTimeout: 10 * time.Duration(time.Minute),
+ UseGCS: *useGCS,
+ DevMode: *devMode,
}
}
diff --git a/godev/internal/storage/storage.go b/godev/internal/storage/storage.go
index 18e0093..c91d73d 100644
--- a/godev/internal/storage/storage.go
+++ b/godev/internal/storage/storage.go
@@ -169,6 +169,10 @@
return &FSObject{filename}
}
+func (o *FSObject) Filename() string {
+ return o.filename
+}
+
func (o *FSObject) NewReader(ctx context.Context) (io.ReadCloser, error) {
r, err := os.Open(o.filename)
if errors.Is(err, os.ErrNotExist) {