md/telemetrygodev: add a handler for report uploads
Added a handler for telemetry report uploads that writes files to
disk. An eventual implementation should handle validating data in
the reports.
Change-Id: I05ecb89ebc6fb39f4364b57a59b77c3e297a6ece
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/498815
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
Reviewed-by: Peter Weinberger <pjw@google.com>
Run-TryBot: Jamal Carvalho <jamal@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/godev/cmd/telemetrygodev/Dockerfile b/godev/cmd/telemetrygodev/Dockerfile
index a7a3f25..0dddf14 100644
--- a/godev/cmd/telemetrygodev/Dockerfile
+++ b/godev/cmd/telemetrygodev/Dockerfile
@@ -54,4 +54,4 @@
WORKDIR /app
-CMD ["./telemetrygodev"]
+CMD ["./telemetrygodev", "--gcs"]
diff --git a/godev/cmd/telemetrygodev/config.go b/godev/cmd/telemetrygodev/config.go
index b92929c..c058eaa 100644
--- a/godev/cmd/telemetrygodev/config.go
+++ b/godev/cmd/telemetrygodev/config.go
@@ -35,6 +35,18 @@
DevMode bool
}
+// onCloudRun reports whether the current process is running on Cloud Run.
+func (c *config) onCloudRun() bool {
+ // Use the presence of the environment variables provided by Cloud Run.
+ // See https://cloud.google.com/run/docs/reference/container-contract.
+ for _, ev := range []string{"K_SERVICE", "K_REVISION", "K_CONFIGURATION"} {
+ if os.Getenv(ev) == "" {
+ return false
+ }
+ }
+ return true
+}
+
var (
devMode = flag.Bool("dev", false, "load static content and templates from the filesystem")
useGCS = flag.Bool("gcs", false, "use Cloud Storage for reading and writing storage objects")
@@ -49,7 +61,7 @@
ProjectID: env("GO_TELEMETRY_PROJECT_ID", "go-telemetry"),
StorageEmulatorHost: env("GO_TELEMETRY_STORAGE_EMULATOR_HOST", "localhost:8081"),
LocalStorage: env("GO_TELEMETRY_LOCAL_STORAGE", ".localstorage"),
- UploadBucket: service + "-uploads",
+ UploadBucket: service + "-uploaded",
UseGCS: *useGCS,
DevMode: *devMode,
}
diff --git a/godev/cmd/telemetrygodev/config_test.go b/godev/cmd/telemetrygodev/config_test.go
index b26c7b7..43e7741 100644
--- a/godev/cmd/telemetrygodev/config_test.go
+++ b/godev/cmd/telemetrygodev/config_test.go
@@ -15,7 +15,7 @@
ProjectID: "go-telemetry",
StorageEmulatorHost: "localhost:8081",
LocalStorage: ".localstorage",
- UploadBucket: "local-telemetry-uploads",
+ UploadBucket: "local-telemetry-uploaded",
}
if got := newConfig(); !reflect.DeepEqual(got, want) {
t.Errorf("Config() = %v, want %v", got, want)
diff --git a/godev/cmd/telemetrygodev/main.go b/godev/cmd/telemetrygodev/main.go
index 7afad18..eecaa51 100644
--- a/godev/cmd/telemetrygodev/main.go
+++ b/godev/cmd/telemetrygodev/main.go
@@ -6,26 +6,70 @@
package main
import (
+ "context"
+ "encoding/json"
"flag"
"fmt"
"io/fs"
"log"
"net/http"
"os"
+ "path"
+ "golang.org/x/telemetry"
"golang.org/x/telemetry/godev"
"golang.org/x/telemetry/godev/internal/content"
"golang.org/x/telemetry/godev/internal/middleware"
+ "golang.org/x/telemetry/godev/internal/storage"
"golang.org/x/telemetry/godev/internal/unionfs"
)
func main() {
flag.Parse()
+ ctx := context.Background()
cfg := newConfig()
- s := content.Server(fsys(cfg.DevMode))
- mw := middleware.Default
+ store, err := uploadBucket(ctx, cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+ fsys := fsys(cfg.DevMode)
+ cserv := content.Server(fsys)
+ mux := http.NewServeMux()
+
+ mux.Handle("/", cserv)
+ mux.Handle("/upload/", handleUpload(store))
+
fmt.Printf("server listening at http://localhost:%s\n", cfg.Port)
- log.Fatal(http.ListenAndServe(":"+cfg.Port, mw(s)))
+ log.Fatal(http.ListenAndServe(":"+cfg.Port, middleware.Default(mux)))
+}
+
+func handleUpload(store storage.Store) content.HandlerFunc {
+ return func(w http.ResponseWriter, r *http.Request) error {
+ if r.Method == "POST" {
+ var report telemetry.Report
+ if err := json.NewDecoder(r.Body).Decode(&report); err != nil {
+ return err
+ }
+ // TODO: validate the report, checking that the counters and stacks contained are things
+ // we want to collect and file name is something reasonable.
+ // TODO: capture metrics for collisions.
+ ctx := r.Context()
+ name := fmt.Sprintf("%s/%g.json", report.Week, report.X)
+ f, err := store.Writer(ctx, name)
+ if err != nil {
+ return err
+ }
+ defer f.Close()
+ if err := json.NewEncoder(f).Encode(report); err != nil {
+ return err
+ }
+ if err := f.Close(); err != nil {
+ return err
+ }
+ return content.Text(w, "ok", http.StatusOK)
+ }
+ return content.ErrorStatus(http.StatusMethodNotAllowed)
+ }
}
func fsys(fromOS bool) fs.FS {
@@ -39,3 +83,16 @@
}
return f
}
+
+// uploadBucket returns a telemtry upload bucket for the given config.
+func uploadBucket(ctx context.Context, cfg *config) (storage.Store, error) {
+ if cfg.UseGCS && !cfg.onCloudRun() {
+ if err := os.Setenv("STORAGE_EMULATOR_HOST", cfg.StorageEmulatorHost); err != nil {
+ return nil, err
+ }
+ }
+ if cfg.UseGCS {
+ return storage.NewGCStore(ctx, cfg.ProjectID, cfg.UploadBucket)
+ }
+ return storage.NewFSStore(ctx, path.Join(cfg.LocalStorage, cfg.UploadBucket))
+}