md/telemetrygodev: add a handler for report uploads

Added a handler for telemetry report uploads that writes files to
disk. An eventual implementation should handle validating data in
the reports.

Change-Id: I05ecb89ebc6fb39f4364b57a59b77c3e297a6ece
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/498815
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
Reviewed-by: Peter Weinberger <pjw@google.com>
Run-TryBot: Jamal Carvalho <jamal@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/godev/cmd/telemetrygodev/Dockerfile b/godev/cmd/telemetrygodev/Dockerfile
index a7a3f25..0dddf14 100644
--- a/godev/cmd/telemetrygodev/Dockerfile
+++ b/godev/cmd/telemetrygodev/Dockerfile
@@ -54,4 +54,4 @@
 
 WORKDIR /app
 
-CMD ["./telemetrygodev"]
+CMD ["./telemetrygodev", "--gcs"]
diff --git a/godev/cmd/telemetrygodev/config.go b/godev/cmd/telemetrygodev/config.go
index b92929c..c058eaa 100644
--- a/godev/cmd/telemetrygodev/config.go
+++ b/godev/cmd/telemetrygodev/config.go
@@ -35,6 +35,18 @@
 	DevMode bool
 }
 
+// onCloudRun reports whether the current process is running on Cloud Run.
+func (c *config) onCloudRun() bool {
+	// Use the presence of the environment variables provided by Cloud Run.
+	// See https://cloud.google.com/run/docs/reference/container-contract.
+	for _, ev := range []string{"K_SERVICE", "K_REVISION", "K_CONFIGURATION"} {
+		if os.Getenv(ev) == "" {
+			return false
+		}
+	}
+	return true
+}
+
 var (
 	devMode = flag.Bool("dev", false, "load static content and templates from the filesystem")
 	useGCS  = flag.Bool("gcs", false, "use Cloud Storage for reading and writing storage objects")
@@ -49,7 +61,7 @@
 		ProjectID:           env("GO_TELEMETRY_PROJECT_ID", "go-telemetry"),
 		StorageEmulatorHost: env("GO_TELEMETRY_STORAGE_EMULATOR_HOST", "localhost:8081"),
 		LocalStorage:        env("GO_TELEMETRY_LOCAL_STORAGE", ".localstorage"),
-		UploadBucket:        service + "-uploads",
+		UploadBucket:        service + "-uploaded",
 		UseGCS:              *useGCS,
 		DevMode:             *devMode,
 	}
diff --git a/godev/cmd/telemetrygodev/config_test.go b/godev/cmd/telemetrygodev/config_test.go
index b26c7b7..43e7741 100644
--- a/godev/cmd/telemetrygodev/config_test.go
+++ b/godev/cmd/telemetrygodev/config_test.go
@@ -15,7 +15,7 @@
 		ProjectID:           "go-telemetry",
 		StorageEmulatorHost: "localhost:8081",
 		LocalStorage:        ".localstorage",
-		UploadBucket:        "local-telemetry-uploads",
+		UploadBucket:        "local-telemetry-uploaded",
 	}
 	if got := newConfig(); !reflect.DeepEqual(got, want) {
 		t.Errorf("Config() = %v, want %v", got, want)
diff --git a/godev/cmd/telemetrygodev/main.go b/godev/cmd/telemetrygodev/main.go
index 7afad18..eecaa51 100644
--- a/godev/cmd/telemetrygodev/main.go
+++ b/godev/cmd/telemetrygodev/main.go
@@ -6,26 +6,70 @@
 package main
 
 import (
+	"context"
+	"encoding/json"
 	"flag"
 	"fmt"
 	"io/fs"
 	"log"
 	"net/http"
 	"os"
+	"path"
 
+	"golang.org/x/telemetry"
 	"golang.org/x/telemetry/godev"
 	"golang.org/x/telemetry/godev/internal/content"
 	"golang.org/x/telemetry/godev/internal/middleware"
+	"golang.org/x/telemetry/godev/internal/storage"
 	"golang.org/x/telemetry/godev/internal/unionfs"
 )
 
 func main() {
 	flag.Parse()
+	ctx := context.Background()
 	cfg := newConfig()
-	s := content.Server(fsys(cfg.DevMode))
-	mw := middleware.Default
+	store, err := uploadBucket(ctx, cfg)
+	if err != nil {
+		log.Fatal(err)
+	}
+	fsys := fsys(cfg.DevMode)
+	cserv := content.Server(fsys)
+	mux := http.NewServeMux()
+
+	mux.Handle("/", cserv)
+	mux.Handle("/upload/", handleUpload(store))
+
 	fmt.Printf("server listening at http://localhost:%s\n", cfg.Port)
-	log.Fatal(http.ListenAndServe(":"+cfg.Port, mw(s)))
+	log.Fatal(http.ListenAndServe(":"+cfg.Port, middleware.Default(mux)))
+}
+
+func handleUpload(store storage.Store) content.HandlerFunc {
+	return func(w http.ResponseWriter, r *http.Request) error {
+		if r.Method == "POST" {
+			var report telemetry.Report
+			if err := json.NewDecoder(r.Body).Decode(&report); err != nil {
+				return err
+			}
+			// TODO: validate the report, checking that the counters and stacks contained are things
+			// we want to collect and file name is something reasonable.
+			// TODO: capture metrics for collisions.
+			ctx := r.Context()
+			name := fmt.Sprintf("%s/%g.json", report.Week, report.X)
+			f, err := store.Writer(ctx, name)
+			if err != nil {
+				return err
+			}
+			defer f.Close()
+			if err := json.NewEncoder(f).Encode(report); err != nil {
+				return err
+			}
+			if err := f.Close(); err != nil {
+				return err
+			}
+			return content.Text(w, "ok", http.StatusOK)
+		}
+		return content.ErrorStatus(http.StatusMethodNotAllowed)
+	}
 }
 
 func fsys(fromOS bool) fs.FS {
@@ -39,3 +83,16 @@
 	}
 	return f
 }
+
+// uploadBucket returns a telemtry upload bucket for the given config.
+func uploadBucket(ctx context.Context, cfg *config) (storage.Store, error) {
+	if cfg.UseGCS && !cfg.onCloudRun() {
+		if err := os.Setenv("STORAGE_EMULATOR_HOST", cfg.StorageEmulatorHost); err != nil {
+			return nil, err
+		}
+	}
+	if cfg.UseGCS {
+		return storage.NewGCStore(ctx, cfg.ProjectID, cfg.UploadBucket)
+	}
+	return storage.NewFSStore(ctx, path.Join(cfg.LocalStorage, cfg.UploadBucket))
+}