storage: initial perfdata AE app

This creates the skeleton of an AppEngine app for perfdata.golang.org
and adds an initial implementation of /upload that saves the provided
file into Cloud Storage.

Change-Id: I1fe19b27841ab62aad146d1d1019996634012d35
Reviewed-on: https://go-review.googlesource.com/34620
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/storage/app/app.go b/storage/app/app.go
new file mode 100644
index 0000000..adcae63
--- /dev/null
+++ b/storage/app/app.go
@@ -0,0 +1,28 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package app implements the performance data storage server. Combine
+// an App with a database and filesystem to get an HTTP server.
+package app
+
+import (
+	"net/http"
+
+	"golang.org/x/perf/storage/db"
+	"golang.org/x/perf/storage/fs"
+)
+
+// App manages the storage server logic. Construct an App instance
+// using a literal with DB and FS objects and call RegisterOnMux to
+// connect it with an HTTP server.
+type App struct {
+	DB *db.DB
+	FS fs.FS
+}
+
+// RegisterOnMux registers the app's URLs on mux.
+func (a *App) RegisterOnMux(mux *http.ServeMux) {
+	// TODO(quentin): Should we just make the App itself be an http.Handler?
+	mux.HandleFunc("/upload", a.upload)
+}
diff --git a/storage/app/appengine.go b/storage/app/appengine.go
new file mode 100644
index 0000000..b9ff1e6
--- /dev/null
+++ b/storage/app/appengine.go
@@ -0,0 +1,22 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build appengine
+
+package app
+
+import (
+	"net/http"
+
+	"golang.org/x/net/context"
+	"google.golang.org/appengine"
+	"google.golang.org/appengine/log"
+)
+
+// requestContext returns the Context object for a given request.
+func requestContext(r *http.Request) context.Context {
+	return appengine.NewContext(r)
+}
+
+var errorf = log.Errorf
diff --git a/storage/app/local.go b/storage/app/local.go
new file mode 100644
index 0000000..4a9cf43
--- /dev/null
+++ b/storage/app/local.go
@@ -0,0 +1,23 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !appengine
+
+package app
+
+import (
+	"log"
+	"net/http"
+
+	"golang.org/x/net/context"
+)
+
+// requestContext returns the Context object for a given request.
+func requestContext(r *http.Request) context.Context {
+	return r.Context()
+}
+
+func errorf(_ context.Context, format string, args ...interface{}) {
+	log.Printf(format, args...)
+}
diff --git a/storage/app/upload.go b/storage/app/upload.go
new file mode 100644
index 0000000..1f968a2
--- /dev/null
+++ b/storage/app/upload.go
@@ -0,0 +1,133 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package app
+
+import (
+	"encoding/json"
+	"fmt"
+	"io"
+	"mime/multipart"
+	"net/http"
+
+	"golang.org/x/net/context"
+)
+
+// upload is the handler for the /upload endpoint. It serves a form on
+// GET requests and processes files in a multipart/x-form-data POST
+// request.
+func (a *App) upload(w http.ResponseWriter, r *http.Request) {
+	ctx := requestContext(r)
+
+	// TODO(quentin): Authentication
+
+	if r.Method == http.MethodGet {
+		http.ServeFile(w, r, "static/upload.html")
+		return
+	}
+	if r.Method != http.MethodPost {
+		http.Error(w, "/upload must be called as a POST request", http.StatusMethodNotAllowed)
+		return
+	}
+
+	// We use r.MultipartReader instead of r.ParseForm to avoid
+	// storing uploaded data in memory.
+	mr, err := r.MultipartReader()
+	if err != nil {
+		errorf(ctx, "%v", err)
+		http.Error(w, err.Error(), 500)
+		return
+	}
+
+	result, err := a.processUpload(ctx, mr)
+	if err != nil {
+		errorf(ctx, "%v", err)
+		http.Error(w, err.Error(), 500)
+		return
+	}
+
+	if err := json.NewEncoder(w).Encode(result); err != nil {
+		errorf(ctx, "%v", err)
+		http.Error(w, err.Error(), 500)
+		return
+	}
+}
+
+// uploadStatus is the response to an /upload POST served as JSON.
+type uploadStatus struct {
+	// UploadID is the upload ID assigned to the upload.
+	UploadID string `json:"uploadid"`
+	// FileIDs is the list of file IDs assigned to the files in the upload.
+	FileIDs []string `json:"fileids"`
+}
+
+// processUpload takes one or more files from a multipart.Reader,
+// writes them to the filesystem, and indexes their content.
+func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadStatus, error) {
+	var uploadid string
+
+	var status uploadStatus
+
+	for i := 0; ; i++ {
+		p, err := mr.NextPart()
+		if err == io.EOF {
+			break
+		}
+
+		name := p.FormName()
+		if name != "file" {
+			return nil, fmt.Errorf("unexpected field %q", name)
+		}
+
+		if uploadid == "" {
+			var err error
+			uploadid, err = a.DB.ReserveUploadID(ctx)
+			if err != nil {
+				return nil, err
+			}
+			status.UploadID = uploadid
+		}
+
+		// The incoming file needs to be stored in Cloud
+		// Storage and it also needs to be indexed. If the file
+		// is invalid (contains no valid records) it needs to
+		// be rejected and the Cloud Storage upload aborted.
+		// TODO(quentin): We might as well do these in parallel.
+
+		meta := fileMetadata(ctx, uploadid, i)
+
+		fw, err := a.FS.NewWriter(ctx, fmt.Sprintf("uploads/%s.txt", meta["fileid"]), meta)
+		if err != nil {
+			return nil, err
+		}
+
+		// TODO(quentin): Write metadata at top of file
+		if _, err := io.Copy(fw, p); err != nil {
+			fw.CloseWithError(err)
+			return nil, err
+		}
+		// TODO(quentin): Write records to database
+
+		if err := fw.Close(); err != nil {
+			return nil, err
+		}
+
+		status.FileIDs = append(status.FileIDs, meta["fileid"])
+	}
+
+	return &status, nil
+}
+
+// fileMetadata returns the extra metadata fields associated with an
+// uploaded file. It obtains the uploader's e-mail address from the
+// Context.
+func fileMetadata(_ context.Context, uploadid string, filenum int) map[string]string {
+	// TODO(quentin): Add the name of the uploader.
+	// TODO(quentin): Add the upload time.
+	// TODO(quentin): Add other fields?
+	return map[string]string{
+		"uploadid": uploadid,
+		"fileid":   fmt.Sprintf("%s/%d", uploadid, filenum),
+	}
+}
diff --git a/storage/app/upload_test.go b/storage/app/upload_test.go
new file mode 100644
index 0000000..972ebdc
--- /dev/null
+++ b/storage/app/upload_test.go
@@ -0,0 +1,63 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package app
+
+import (
+	"fmt"
+	"io"
+	"io/ioutil"
+	"mime/multipart"
+	"net/http"
+	"net/http/httptest"
+	"testing"
+
+	_ "github.com/mattn/go-sqlite3"
+	"golang.org/x/perf/storage/db"
+	"golang.org/x/perf/storage/fs"
+)
+
+func TestUpload(t *testing.T) {
+	db, err := db.OpenSQL("sqlite3", ":memory:")
+	if err != nil {
+		t.Fatalf("open database: %v", err)
+	}
+	defer db.Close()
+
+	fs := fs.NewMemFS()
+
+	app := &App{DB: db, FS: fs}
+
+	srv := httptest.NewServer(http.HandlerFunc(app.upload))
+	defer srv.Close()
+	pr, pw := io.Pipe()
+	mpw := multipart.NewWriter(pw)
+	go func() {
+		defer pw.Close()
+		defer mpw.Close()
+		// Write the parts here
+		w, err := mpw.CreateFormFile("file", "1.txt")
+		if err != nil {
+			t.Errorf("CreateFormFile: %v", err)
+		}
+		fmt.Fprintf(w, "key: value\nBenchmarkOne 5 ns/op\n")
+	}()
+	resp, err := http.Post(srv.URL, mpw.FormDataContentType(), pr)
+	if err != nil {
+		t.Fatalf("post /upload: %v", err)
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != 200 {
+		t.Errorf("post /upload: %v", resp.Status)
+	}
+	body, err := ioutil.ReadAll(resp.Body)
+	if err != nil {
+		t.Errorf("reading /upload response: %v", err)
+	}
+	t.Logf("/upload response:\n%s", body)
+
+	if len(fs.Files()) != 1 {
+		t.Errorf("/upload wrote %d files, want 1", len(fs.Files()))
+	}
+}
diff --git a/storage/appengine/app.go b/storage/appengine/app.go
new file mode 100644
index 0000000..a3b3007
--- /dev/null
+++ b/storage/appengine/app.go
@@ -0,0 +1,76 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package appengine contains an AppEngine app for perfdata.golang.org
+package appengine
+
+import (
+	"fmt"
+	"log"
+	"net/http"
+	"os"
+
+	_ "github.com/go-sql-driver/mysql"
+	"golang.org/x/perf/storage/app"
+	"golang.org/x/perf/storage/db"
+	"golang.org/x/perf/storage/fs/gcs"
+	"google.golang.org/appengine"
+	aelog "google.golang.org/appengine/log"
+)
+
+// connectDB returns a DB initialized from the environment variables set in app.yaml. CLOUDSQL_CONNECTION_NAME, CLOUDSQL_USER, and CLOUDSQL_DATABASE must be set to point to the Cloud SQL instance. CLOUDSQL_PASSWORD can be set if needed.
+func connectDB() (*db.DB, error) {
+	var (
+		connectionName = mustGetenv("CLOUDSQL_CONNECTION_NAME")
+		user           = mustGetenv("CLOUDSQL_USER")
+		password       = os.Getenv("CLOUDSQL_PASSWORD") // NOTE: password may be empty
+		dbName         = mustGetenv("CLOUDSQL_DATABASE")
+	)
+
+	return db.OpenSQL("mysql", fmt.Sprintf("%s:%s@cloudsql(%s)/%s", user, password, connectionName, dbName))
+}
+
+func mustGetenv(k string) string {
+	v := os.Getenv(k)
+	if v == "" {
+		log.Panicf("%s environment variable not set.", k)
+	}
+	return v
+}
+
+// appHandler is the default handler, registered to serve "/".
+// It creates a new App instance using the appengine Context and then
+// dispatches the request to the App. The environment variable
+// GCS_BUCKET must be set in app.yaml with the name of the bucket to
+// write to.
+func appHandler(w http.ResponseWriter, r *http.Request) {
+	ctx := appengine.NewContext(r)
+	// GCS clients need to be constructed with an AppEngine
+	// context, so we can't actually make the App until the
+	// request comes in.
+	// TODO(quentin): Figure out if there's a way to construct the
+	// app and clients once, in init(), instead of on every request.
+	db, err := connectDB()
+	if err != nil {
+		aelog.Errorf(ctx, "connectDB: %v", err)
+		http.Error(w, err.Error(), 500)
+		return
+	}
+	defer db.Close()
+
+	fs, err := gcs.NewFS(ctx, mustGetenv("GCS_BUCKET"))
+	if err != nil {
+		aelog.Errorf(ctx, "gcs.NewFS: %v", err)
+		http.Error(w, err.Error(), 500)
+		return
+	}
+	mux := http.NewServeMux()
+	app := &app.App{DB: db, FS: fs}
+	app.RegisterOnMux(mux)
+	mux.ServeHTTP(w, r)
+}
+
+func init() {
+	http.HandleFunc("/", appHandler)
+}
diff --git a/storage/appengine/app.yaml b/storage/appengine/app.yaml
new file mode 100644
index 0000000..50002ed
--- /dev/null
+++ b/storage/appengine/app.yaml
@@ -0,0 +1,23 @@
+# Update with
+#	google_appengine/appcfg.py [-V dev-test] update .
+#
+# Using -V dev-test will run as dev-test.perfdata.golang.org.
+
+application: golang-org
+module: perfdata
+version: main
+runtime: go
+api_version: go1
+
+handlers:
+- url: /_ah/remote_api
+  script: _go_app
+- url: /upload
+  script: _go_app
+  secure: always
+env_variables:
+  CLOUDSQL_CONNECTION_NAME: golang-org:us-central1:golang-org
+  CLOUDSQL_USER: root
+  CLOUDSQL_PASSWORD: ''
+  CLOUDSQL_DATABASE: perfdata
+  GCS_BUCKET: golang-perfdata
diff --git a/storage/appengine/static/upload.html b/storage/appengine/static/upload.html
new file mode 100644
index 0000000..57e114b
--- /dev/null
+++ b/storage/appengine/static/upload.html
@@ -0,0 +1,13 @@
+<!DOCTYPE html>
+<html>
+  <head>
+    <title>Upload Performance Results</title>
+  </head>
+  <body>
+    <p>Upload one or more <a href="https://github.com/golang/proposal/blob/master/design/14313-benchmark-format.md">benchmark files</a>.</p>
+    <form method="post" enctype="multipart/form-data">
+      <label>File: <input type="file" name="file" multiple></label><br>
+      <input type="submit" value="Upload">
+    </form>
+  </body>
+</html>
diff --git a/storage/db/db.go b/storage/db/db.go
new file mode 100644
index 0000000..4c84d14
--- /dev/null
+++ b/storage/db/db.go
@@ -0,0 +1,107 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package db provides the high-level database interface for the
+// storage app.
+package db
+
+import (
+	"database/sql"
+	"fmt"
+	"strings"
+
+	"golang.org/x/net/context"
+)
+
+// DB is a high-level interface to a database for the storage
+// app. It's safe for concurrent use by multiple goroutines.
+type DB struct {
+	sql          *sql.DB
+	insertUpload *sql.Stmt
+}
+
+// OpenSQL creates a DB backed by a SQL database. The parameters are
+// the same as the parameters for sql.Open. Only mysql and sqlite3 are
+// explicitly supported; other database engines will receive MySQL
+// query syntax which may or may not be compatible.
+func OpenSQL(driverName, dataSourceName string) (*DB, error) {
+	db, err := sql.Open(driverName, dataSourceName)
+	if err != nil {
+		return nil, err
+	}
+	d := &DB{sql: db}
+	if err := d.createTables(driverName); err != nil {
+		return nil, err
+	}
+	if err := d.prepareStatements(driverName); err != nil {
+		return nil, err
+	}
+	return d, nil
+}
+
+// createTables creates any missing tables on the connection in
+// db.sql. driverName is the same driver name passed to sql.Open and
+// is used to select the correct syntax.
+func (db *DB) createTables(driverName string) error {
+	var schema string
+	switch driverName {
+	case "sqlite3":
+		schema = `
+CREATE TABLE IF NOT EXISTS Uploads (
+       UploadId INTEGER PRIMARY KEY AUTOINCREMENT
+);
+`
+	default: // MySQL syntax
+		schema = `
+CREATE TABLE IF NOT EXISTS Uploads (
+       UploadId SERIAL PRIMARY KEY AUTO_INCREMENT
+);`
+	}
+	for _, q := range strings.Split(schema, ";") {
+		if strings.TrimSpace(q) == "" {
+			continue
+		}
+		if _, err := db.sql.Exec(q); err != nil {
+			return fmt.Errorf("create table: %v", err)
+		}
+	}
+	return nil
+}
+
+// prepareStatements calls db.sql.Prepare on reusable SQL statements.
+func (db *DB) prepareStatements(driverName string) error {
+	var err error
+	q := "INSERT INTO Uploads() VALUES ()"
+	if driverName == "sqlite3" {
+		q = "INSERT INTO Uploads DEFAULT VALUES"
+	}
+	db.insertUpload, err = db.sql.Prepare(q)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+// ReserveUploadID returns an upload ID which can be used for storing new files.
+func (db *DB) ReserveUploadID(ctx context.Context) (string, error) {
+	// TODO(quentin): Use a transaction?
+	res, err := db.insertUpload.Exec()
+	if err != nil {
+		return "", err
+	}
+	// TODO(quentin): Use a date-based upload ID (YYYYMMDDnnn)
+	i, err := res.LastInsertId()
+	if err != nil {
+		return "", err
+	}
+	return fmt.Sprint(i), nil
+}
+
+// TODO(quentin): Implement
+// func (db *DB) InsertRecord(uploadid string, fields map[string]string, lines map[int]string) error
+
+// Close closes the database connections, releasing any open resources.
+func (db *DB) Close() error {
+	return db.sql.Close()
+}
diff --git a/storage/db/schema.sql b/storage/db/schema.sql
new file mode 100644
index 0000000..54233ec
--- /dev/null
+++ b/storage/db/schema.sql
@@ -0,0 +1,22 @@
+-- The intended production Cloud SQL schema. Committed here only as a
+-- form of notes (see the actual current schema in
+-- db.go:createTables).
+
+CREATE TABLE Uploads (
+       UploadId SERIAL PRIMARY KEY AUTO_INCREMENT
+);
+CREATE TABLE Records (
+       UploadId BIGINT UNSIGNED,
+       RecordId BIGINT UNSIGNED,
+       Contents BLOB,
+       PRIMARY KEY (UploadId, RecordId),
+       FOREIGN KEY (UploadId) REFERENCES Uploads(UploadId)
+);
+CREATE TABLE RecordLabels (
+       UploadId BIGINT UNSIGNED,
+       RecordId BIGINT UNSIGNED,
+       Name VARCHAR(255),
+       Value VARCHAR(8192),
+       INDEX (Name(100), Value(100)),
+       FOREIGN KEY (UploadId, RecordId) REFERENCES Records(UploadId, RecordId)
+);
diff --git a/storage/fs/fs.go b/storage/fs/fs.go
new file mode 100644
index 0000000..a2639b4
--- /dev/null
+++ b/storage/fs/fs.go
@@ -0,0 +1,98 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package fs provides a backend-agnostic filesystem layer for storing
+// performance results.
+package fs
+
+import (
+	"errors"
+	"io"
+	"sort"
+	"sync"
+
+	"golang.org/x/net/context"
+)
+
+// An FS stores uploaded benchmark data files.
+type FS interface {
+	// NewWriter returns a Writer for a given file name.
+	// When the Writer is closed, the file will be stored with the
+	// given metadata and the data written to the writer.
+	NewWriter(ctx context.Context, name string, metadata map[string]string) (Writer, error)
+}
+
+// A Writer is an io.Writer that can also be closed with an error.
+type Writer interface {
+	io.WriteCloser
+	// CloseWithError cancels the writing of the file, removing
+	// any partially written data.
+	CloseWithError(error) error
+}
+
+// MemFS is an in-memory filesystem implementing the FS interface.
+type MemFS struct {
+	mu      sync.Mutex
+	content map[string]*memFile
+}
+
+// NewMemFS constructs a new, empty MemFS.
+func NewMemFS() *MemFS {
+	return &MemFS{
+		content: make(map[string]*memFile),
+	}
+}
+
+// NewWriter returns a Writer for a given file name. As a side effect,
+// it associates the given metadata with the file.
+func (fs *MemFS) NewWriter(_ context.Context, name string, metadata map[string]string) (Writer, error) {
+	meta := make(map[string]string)
+	for k, v := range metadata {
+		meta[k] = v
+	}
+	return &memFile{fs: fs, name: name, metadata: meta}, nil
+}
+
+// Files returns the names of the files written to fs.
+func (fs *MemFS) Files() []string {
+	fs.mu.Lock()
+	defer fs.mu.Unlock()
+	var files []string
+	for f := range fs.content {
+		files = append(files, f)
+	}
+	sort.Strings(files)
+	return files
+}
+
+// memFile represents a file in a MemFS. While the file is being
+// written, fs points to the filesystem. Close writes the file's
+// content to fs and sets fs to nil.
+type memFile struct {
+	fs       *MemFS
+	name     string
+	metadata map[string]string
+	content  []byte
+}
+
+func (f *memFile) Write(p []byte) (int, error) {
+	f.content = append(f.content, p...)
+	return len(p), nil
+}
+
+func (f *memFile) Close() error {
+	if f.fs == nil {
+		return errors.New("already closed")
+	}
+	f.fs.mu.Lock()
+	defer f.fs.mu.Unlock()
+	f.fs.content[f.name] = f
+	f.fs = nil
+	return nil
+}
+
+func (f *memFile) CloseWithError(error) error {
+	f.fs = nil
+	return nil
+}
diff --git a/storage/fs/gcs/gcs.go b/storage/fs/gcs/gcs.go
new file mode 100644
index 0000000..60a12e4
--- /dev/null
+++ b/storage/fs/gcs/gcs.go
@@ -0,0 +1,34 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package gcs implements the fs.FS interface using Google Cloud Storage.
+package gcs
+
+import (
+	"cloud.google.com/go/storage"
+	"golang.org/x/net/context"
+	"golang.org/x/perf/storage/fs"
+)
+
+// impl is an fs.FS backed by Google Cloud Storage.
+type impl struct {
+	bucket *storage.BucketHandle
+}
+
+// NewFS constructs an FS that writes to the provided bucket.
+// On AppEngine, ctx must be a request-derived Context.
+func NewFS(ctx context.Context, bucketName string) (fs.FS, error) {
+	client, err := storage.NewClient(ctx)
+	if err != nil {
+		return nil, err
+	}
+	return &impl{client.Bucket(bucketName)}, nil
+}
+
+func (fs *impl) NewWriter(ctx context.Context, name string, metadata map[string]string) (fs.Writer, error) {
+	w := fs.bucket.Object(name).NewWriter(ctx)
+	// TODO(quentin): Do these need "x-goog-meta-" prefixes?
+	w.Metadata = metadata
+	return w, nil
+}
diff --git a/storage/localserver/app.go b/storage/localserver/app.go
new file mode 100644
index 0000000..e832470
--- /dev/null
+++ b/storage/localserver/app.go
@@ -0,0 +1,35 @@
+// Copyright 2016 The Go Authors.  All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+	"flag"
+	"log"
+	"net/http"
+
+	_ "github.com/mattn/go-sqlite3"
+	"golang.org/x/perf/storage/app"
+	"golang.org/x/perf/storage/db"
+	"golang.org/x/perf/storage/fs"
+)
+
+var host = flag.String("port", ":8080", "(host and) port to bind on")
+
+func main() {
+	flag.Parse()
+
+	db, err := db.OpenSQL("sqlite3", ":memory:")
+	if err != nil {
+		log.Fatalf("open database: %v", err)
+	}
+	fs := fs.NewMemFS()
+
+	app := &app.App{DB: db, FS: fs}
+	app.RegisterOnMux(http.DefaultServeMux)
+
+	log.Printf("Listening on %s", *host)
+
+	log.Fatal(http.ListenAndServe(*host, nil))
+}