storage: initial perfdata AE app
This creates the skeleton of an AppEngine app for perfdata.golang.org
and adds an initial implementation of /upload that saves the provided
file into Cloud Storage.
Change-Id: I1fe19b27841ab62aad146d1d1019996634012d35
Reviewed-on: https://go-review.googlesource.com/34620
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/storage/app/app.go b/storage/app/app.go
new file mode 100644
index 0000000..adcae63
--- /dev/null
+++ b/storage/app/app.go
@@ -0,0 +1,28 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package app implements the performance data storage server. Combine
+// an App with a database and filesystem to get an HTTP server.
+package app
+
+import (
+ "net/http"
+
+ "golang.org/x/perf/storage/db"
+ "golang.org/x/perf/storage/fs"
+)
+
+// App manages the storage server logic. Construct an App instance
+// using a literal with DB and FS objects and call RegisterOnMux to
+// connect it with an HTTP server.
+type App struct {
+ DB *db.DB
+ FS fs.FS
+}
+
+// RegisterOnMux registers the app's URLs on mux.
+func (a *App) RegisterOnMux(mux *http.ServeMux) {
+ // TODO(quentin): Should we just make the App itself be an http.Handler?
+ mux.HandleFunc("/upload", a.upload)
+}
diff --git a/storage/app/appengine.go b/storage/app/appengine.go
new file mode 100644
index 0000000..b9ff1e6
--- /dev/null
+++ b/storage/app/appengine.go
@@ -0,0 +1,22 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build appengine
+
+package app
+
+import (
+ "net/http"
+
+ "golang.org/x/net/context"
+ "google.golang.org/appengine"
+ "google.golang.org/appengine/log"
+)
+
+// requestContext returns the Context object for a given request.
+func requestContext(r *http.Request) context.Context {
+ return appengine.NewContext(r)
+}
+
+var errorf = log.Errorf
diff --git a/storage/app/local.go b/storage/app/local.go
new file mode 100644
index 0000000..4a9cf43
--- /dev/null
+++ b/storage/app/local.go
@@ -0,0 +1,23 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build !appengine
+
+package app
+
+import (
+ "log"
+ "net/http"
+
+ "golang.org/x/net/context"
+)
+
+// requestContext returns the Context object for a given request.
+func requestContext(r *http.Request) context.Context {
+ return r.Context()
+}
+
+func errorf(_ context.Context, format string, args ...interface{}) {
+ log.Printf(format, args...)
+}
diff --git a/storage/app/upload.go b/storage/app/upload.go
new file mode 100644
index 0000000..1f968a2
--- /dev/null
+++ b/storage/app/upload.go
@@ -0,0 +1,133 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package app
+
+import (
+ "encoding/json"
+ "fmt"
+ "io"
+ "mime/multipart"
+ "net/http"
+
+ "golang.org/x/net/context"
+)
+
+// upload is the handler for the /upload endpoint. It serves a form on
+// GET requests and processes files in a multipart/x-form-data POST
+// request.
+func (a *App) upload(w http.ResponseWriter, r *http.Request) {
+ ctx := requestContext(r)
+
+ // TODO(quentin): Authentication
+
+ if r.Method == http.MethodGet {
+ http.ServeFile(w, r, "static/upload.html")
+ return
+ }
+ if r.Method != http.MethodPost {
+ http.Error(w, "/upload must be called as a POST request", http.StatusMethodNotAllowed)
+ return
+ }
+
+ // We use r.MultipartReader instead of r.ParseForm to avoid
+ // storing uploaded data in memory.
+ mr, err := r.MultipartReader()
+ if err != nil {
+ errorf(ctx, "%v", err)
+ http.Error(w, err.Error(), 500)
+ return
+ }
+
+ result, err := a.processUpload(ctx, mr)
+ if err != nil {
+ errorf(ctx, "%v", err)
+ http.Error(w, err.Error(), 500)
+ return
+ }
+
+ if err := json.NewEncoder(w).Encode(result); err != nil {
+ errorf(ctx, "%v", err)
+ http.Error(w, err.Error(), 500)
+ return
+ }
+}
+
+// uploadStatus is the response to an /upload POST served as JSON.
+type uploadStatus struct {
+ // UploadID is the upload ID assigned to the upload.
+ UploadID string `json:"uploadid"`
+ // FileIDs is the list of file IDs assigned to the files in the upload.
+ FileIDs []string `json:"fileids"`
+}
+
+// processUpload takes one or more files from a multipart.Reader,
+// writes them to the filesystem, and indexes their content.
+func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadStatus, error) {
+ var uploadid string
+
+ var status uploadStatus
+
+ for i := 0; ; i++ {
+ p, err := mr.NextPart()
+ if err == io.EOF {
+ break
+ }
+
+ name := p.FormName()
+ if name != "file" {
+ return nil, fmt.Errorf("unexpected field %q", name)
+ }
+
+ if uploadid == "" {
+ var err error
+ uploadid, err = a.DB.ReserveUploadID(ctx)
+ if err != nil {
+ return nil, err
+ }
+ status.UploadID = uploadid
+ }
+
+ // The incoming file needs to be stored in Cloud
+ // Storage and it also needs to be indexed. If the file
+ // is invalid (contains no valid records) it needs to
+ // be rejected and the Cloud Storage upload aborted.
+ // TODO(quentin): We might as well do these in parallel.
+
+ meta := fileMetadata(ctx, uploadid, i)
+
+ fw, err := a.FS.NewWriter(ctx, fmt.Sprintf("uploads/%s.txt", meta["fileid"]), meta)
+ if err != nil {
+ return nil, err
+ }
+
+ // TODO(quentin): Write metadata at top of file
+ if _, err := io.Copy(fw, p); err != nil {
+ fw.CloseWithError(err)
+ return nil, err
+ }
+ // TODO(quentin): Write records to database
+
+ if err := fw.Close(); err != nil {
+ return nil, err
+ }
+
+ status.FileIDs = append(status.FileIDs, meta["fileid"])
+ }
+
+ return &status, nil
+}
+
+// fileMetadata returns the extra metadata fields associated with an
+// uploaded file. It obtains the uploader's e-mail address from the
+// Context.
+func fileMetadata(_ context.Context, uploadid string, filenum int) map[string]string {
+ // TODO(quentin): Add the name of the uploader.
+ // TODO(quentin): Add the upload time.
+ // TODO(quentin): Add other fields?
+ return map[string]string{
+ "uploadid": uploadid,
+ "fileid": fmt.Sprintf("%s/%d", uploadid, filenum),
+ }
+}
diff --git a/storage/app/upload_test.go b/storage/app/upload_test.go
new file mode 100644
index 0000000..972ebdc
--- /dev/null
+++ b/storage/app/upload_test.go
@@ -0,0 +1,63 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package app
+
+import (
+ "fmt"
+ "io"
+ "io/ioutil"
+ "mime/multipart"
+ "net/http"
+ "net/http/httptest"
+ "testing"
+
+ _ "github.com/mattn/go-sqlite3"
+ "golang.org/x/perf/storage/db"
+ "golang.org/x/perf/storage/fs"
+)
+
+func TestUpload(t *testing.T) {
+ db, err := db.OpenSQL("sqlite3", ":memory:")
+ if err != nil {
+ t.Fatalf("open database: %v", err)
+ }
+ defer db.Close()
+
+ fs := fs.NewMemFS()
+
+ app := &App{DB: db, FS: fs}
+
+ srv := httptest.NewServer(http.HandlerFunc(app.upload))
+ defer srv.Close()
+ pr, pw := io.Pipe()
+ mpw := multipart.NewWriter(pw)
+ go func() {
+ defer pw.Close()
+ defer mpw.Close()
+ // Write the parts here
+ w, err := mpw.CreateFormFile("file", "1.txt")
+ if err != nil {
+ t.Errorf("CreateFormFile: %v", err)
+ }
+ fmt.Fprintf(w, "key: value\nBenchmarkOne 5 ns/op\n")
+ }()
+ resp, err := http.Post(srv.URL, mpw.FormDataContentType(), pr)
+ if err != nil {
+ t.Fatalf("post /upload: %v", err)
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 200 {
+ t.Errorf("post /upload: %v", resp.Status)
+ }
+ body, err := ioutil.ReadAll(resp.Body)
+ if err != nil {
+ t.Errorf("reading /upload response: %v", err)
+ }
+ t.Logf("/upload response:\n%s", body)
+
+ if len(fs.Files()) != 1 {
+ t.Errorf("/upload wrote %d files, want 1", len(fs.Files()))
+ }
+}
diff --git a/storage/appengine/app.go b/storage/appengine/app.go
new file mode 100644
index 0000000..a3b3007
--- /dev/null
+++ b/storage/appengine/app.go
@@ -0,0 +1,76 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package appengine contains an AppEngine app for perfdata.golang.org
+package appengine
+
+import (
+ "fmt"
+ "log"
+ "net/http"
+ "os"
+
+ _ "github.com/go-sql-driver/mysql"
+ "golang.org/x/perf/storage/app"
+ "golang.org/x/perf/storage/db"
+ "golang.org/x/perf/storage/fs/gcs"
+ "google.golang.org/appengine"
+ aelog "google.golang.org/appengine/log"
+)
+
+// connectDB returns a DB initialized from the environment variables set in app.yaml. CLOUDSQL_CONNECTION_NAME, CLOUDSQL_USER, and CLOUDSQL_DATABASE must be set to point to the Cloud SQL instance. CLOUDSQL_PASSWORD can be set if needed.
+func connectDB() (*db.DB, error) {
+ var (
+ connectionName = mustGetenv("CLOUDSQL_CONNECTION_NAME")
+ user = mustGetenv("CLOUDSQL_USER")
+ password = os.Getenv("CLOUDSQL_PASSWORD") // NOTE: password may be empty
+ dbName = mustGetenv("CLOUDSQL_DATABASE")
+ )
+
+ return db.OpenSQL("mysql", fmt.Sprintf("%s:%s@cloudsql(%s)/%s", user, password, connectionName, dbName))
+}
+
+func mustGetenv(k string) string {
+ v := os.Getenv(k)
+ if v == "" {
+ log.Panicf("%s environment variable not set.", k)
+ }
+ return v
+}
+
+// appHandler is the default handler, registered to serve "/".
+// It creates a new App instance using the appengine Context and then
+// dispatches the request to the App. The environment variable
+// GCS_BUCKET must be set in app.yaml with the name of the bucket to
+// write to.
+func appHandler(w http.ResponseWriter, r *http.Request) {
+ ctx := appengine.NewContext(r)
+ // GCS clients need to be constructed with an AppEngine
+ // context, so we can't actually make the App until the
+ // request comes in.
+ // TODO(quentin): Figure out if there's a way to construct the
+ // app and clients once, in init(), instead of on every request.
+ db, err := connectDB()
+ if err != nil {
+ aelog.Errorf(ctx, "connectDB: %v", err)
+ http.Error(w, err.Error(), 500)
+ return
+ }
+ defer db.Close()
+
+ fs, err := gcs.NewFS(ctx, mustGetenv("GCS_BUCKET"))
+ if err != nil {
+ aelog.Errorf(ctx, "gcs.NewFS: %v", err)
+ http.Error(w, err.Error(), 500)
+ return
+ }
+ mux := http.NewServeMux()
+ app := &app.App{DB: db, FS: fs}
+ app.RegisterOnMux(mux)
+ mux.ServeHTTP(w, r)
+}
+
+func init() {
+ http.HandleFunc("/", appHandler)
+}
diff --git a/storage/appengine/app.yaml b/storage/appengine/app.yaml
new file mode 100644
index 0000000..50002ed
--- /dev/null
+++ b/storage/appengine/app.yaml
@@ -0,0 +1,23 @@
+# Update with
+# google_appengine/appcfg.py [-V dev-test] update .
+#
+# Using -V dev-test will run as dev-test.perfdata.golang.org.
+
+application: golang-org
+module: perfdata
+version: main
+runtime: go
+api_version: go1
+
+handlers:
+- url: /_ah/remote_api
+ script: _go_app
+- url: /upload
+ script: _go_app
+ secure: always
+env_variables:
+ CLOUDSQL_CONNECTION_NAME: golang-org:us-central1:golang-org
+ CLOUDSQL_USER: root
+ CLOUDSQL_PASSWORD: ''
+ CLOUDSQL_DATABASE: perfdata
+ GCS_BUCKET: golang-perfdata
diff --git a/storage/appengine/static/upload.html b/storage/appengine/static/upload.html
new file mode 100644
index 0000000..57e114b
--- /dev/null
+++ b/storage/appengine/static/upload.html
@@ -0,0 +1,13 @@
+<!DOCTYPE html>
+<html>
+ <head>
+ <title>Upload Performance Results</title>
+ </head>
+ <body>
+ <p>Upload one or more <a href="https://github.com/golang/proposal/blob/master/design/14313-benchmark-format.md">benchmark files</a>.</p>
+ <form method="post" enctype="multipart/form-data">
+ <label>File: <input type="file" name="file" multiple></label><br>
+ <input type="submit" value="Upload">
+ </form>
+ </body>
+</html>
diff --git a/storage/db/db.go b/storage/db/db.go
new file mode 100644
index 0000000..4c84d14
--- /dev/null
+++ b/storage/db/db.go
@@ -0,0 +1,107 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package db provides the high-level database interface for the
+// storage app.
+package db
+
+import (
+ "database/sql"
+ "fmt"
+ "strings"
+
+ "golang.org/x/net/context"
+)
+
+// DB is a high-level interface to a database for the storage
+// app. It's safe for concurrent use by multiple goroutines.
+type DB struct {
+ sql *sql.DB
+ insertUpload *sql.Stmt
+}
+
+// OpenSQL creates a DB backed by a SQL database. The parameters are
+// the same as the parameters for sql.Open. Only mysql and sqlite3 are
+// explicitly supported; other database engines will receive MySQL
+// query syntax which may or may not be compatible.
+func OpenSQL(driverName, dataSourceName string) (*DB, error) {
+ db, err := sql.Open(driverName, dataSourceName)
+ if err != nil {
+ return nil, err
+ }
+ d := &DB{sql: db}
+ if err := d.createTables(driverName); err != nil {
+ return nil, err
+ }
+ if err := d.prepareStatements(driverName); err != nil {
+ return nil, err
+ }
+ return d, nil
+}
+
+// createTables creates any missing tables on the connection in
+// db.sql. driverName is the same driver name passed to sql.Open and
+// is used to select the correct syntax.
+func (db *DB) createTables(driverName string) error {
+ var schema string
+ switch driverName {
+ case "sqlite3":
+ schema = `
+CREATE TABLE IF NOT EXISTS Uploads (
+ UploadId INTEGER PRIMARY KEY AUTOINCREMENT
+);
+`
+ default: // MySQL syntax
+ schema = `
+CREATE TABLE IF NOT EXISTS Uploads (
+ UploadId SERIAL PRIMARY KEY AUTO_INCREMENT
+);`
+ }
+ for _, q := range strings.Split(schema, ";") {
+ if strings.TrimSpace(q) == "" {
+ continue
+ }
+ if _, err := db.sql.Exec(q); err != nil {
+ return fmt.Errorf("create table: %v", err)
+ }
+ }
+ return nil
+}
+
+// prepareStatements calls db.sql.Prepare on reusable SQL statements.
+func (db *DB) prepareStatements(driverName string) error {
+ var err error
+ q := "INSERT INTO Uploads() VALUES ()"
+ if driverName == "sqlite3" {
+ q = "INSERT INTO Uploads DEFAULT VALUES"
+ }
+ db.insertUpload, err = db.sql.Prepare(q)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+// ReserveUploadID returns an upload ID which can be used for storing new files.
+func (db *DB) ReserveUploadID(ctx context.Context) (string, error) {
+ // TODO(quentin): Use a transaction?
+ res, err := db.insertUpload.Exec()
+ if err != nil {
+ return "", err
+ }
+ // TODO(quentin): Use a date-based upload ID (YYYYMMDDnnn)
+ i, err := res.LastInsertId()
+ if err != nil {
+ return "", err
+ }
+ return fmt.Sprint(i), nil
+}
+
+// TODO(quentin): Implement
+// func (db *DB) InsertRecord(uploadid string, fields map[string]string, lines map[int]string) error
+
+// Close closes the database connections, releasing any open resources.
+func (db *DB) Close() error {
+ return db.sql.Close()
+}
diff --git a/storage/db/schema.sql b/storage/db/schema.sql
new file mode 100644
index 0000000..54233ec
--- /dev/null
+++ b/storage/db/schema.sql
@@ -0,0 +1,22 @@
+-- The intended production Cloud SQL schema. Committed here only as a
+-- form of notes (see the actual current schema in
+-- db.go:createTables).
+
+CREATE TABLE Uploads (
+ UploadId SERIAL PRIMARY KEY AUTO_INCREMENT
+);
+CREATE TABLE Records (
+ UploadId BIGINT UNSIGNED,
+ RecordId BIGINT UNSIGNED,
+ Contents BLOB,
+ PRIMARY KEY (UploadId, RecordId),
+ FOREIGN KEY (UploadId) REFERENCES Uploads(UploadId)
+);
+CREATE TABLE RecordLabels (
+ UploadId BIGINT UNSIGNED,
+ RecordId BIGINT UNSIGNED,
+ Name VARCHAR(255),
+ Value VARCHAR(8192),
+ INDEX (Name(100), Value(100)),
+ FOREIGN KEY (UploadId, RecordId) REFERENCES Records(UploadId, RecordId)
+);
diff --git a/storage/fs/fs.go b/storage/fs/fs.go
new file mode 100644
index 0000000..a2639b4
--- /dev/null
+++ b/storage/fs/fs.go
@@ -0,0 +1,98 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package fs provides a backend-agnostic filesystem layer for storing
+// performance results.
+package fs
+
+import (
+ "errors"
+ "io"
+ "sort"
+ "sync"
+
+ "golang.org/x/net/context"
+)
+
+// An FS stores uploaded benchmark data files.
+type FS interface {
+ // NewWriter returns a Writer for a given file name.
+ // When the Writer is closed, the file will be stored with the
+ // given metadata and the data written to the writer.
+ NewWriter(ctx context.Context, name string, metadata map[string]string) (Writer, error)
+}
+
+// A Writer is an io.Writer that can also be closed with an error.
+type Writer interface {
+ io.WriteCloser
+ // CloseWithError cancels the writing of the file, removing
+ // any partially written data.
+ CloseWithError(error) error
+}
+
+// MemFS is an in-memory filesystem implementing the FS interface.
+type MemFS struct {
+ mu sync.Mutex
+ content map[string]*memFile
+}
+
+// NewMemFS constructs a new, empty MemFS.
+func NewMemFS() *MemFS {
+ return &MemFS{
+ content: make(map[string]*memFile),
+ }
+}
+
+// NewWriter returns a Writer for a given file name. As a side effect,
+// it associates the given metadata with the file.
+func (fs *MemFS) NewWriter(_ context.Context, name string, metadata map[string]string) (Writer, error) {
+ meta := make(map[string]string)
+ for k, v := range metadata {
+ meta[k] = v
+ }
+ return &memFile{fs: fs, name: name, metadata: meta}, nil
+}
+
+// Files returns the names of the files written to fs.
+func (fs *MemFS) Files() []string {
+ fs.mu.Lock()
+ defer fs.mu.Unlock()
+ var files []string
+ for f := range fs.content {
+ files = append(files, f)
+ }
+ sort.Strings(files)
+ return files
+}
+
+// memFile represents a file in a MemFS. While the file is being
+// written, fs points to the filesystem. Close writes the file's
+// content to fs and sets fs to nil.
+type memFile struct {
+ fs *MemFS
+ name string
+ metadata map[string]string
+ content []byte
+}
+
+func (f *memFile) Write(p []byte) (int, error) {
+ f.content = append(f.content, p...)
+ return len(p), nil
+}
+
+func (f *memFile) Close() error {
+ if f.fs == nil {
+ return errors.New("already closed")
+ }
+ f.fs.mu.Lock()
+ defer f.fs.mu.Unlock()
+ f.fs.content[f.name] = f
+ f.fs = nil
+ return nil
+}
+
+func (f *memFile) CloseWithError(error) error {
+ f.fs = nil
+ return nil
+}
diff --git a/storage/fs/gcs/gcs.go b/storage/fs/gcs/gcs.go
new file mode 100644
index 0000000..60a12e4
--- /dev/null
+++ b/storage/fs/gcs/gcs.go
@@ -0,0 +1,34 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package gcs implements the fs.FS interface using Google Cloud Storage.
+package gcs
+
+import (
+ "cloud.google.com/go/storage"
+ "golang.org/x/net/context"
+ "golang.org/x/perf/storage/fs"
+)
+
+// impl is an fs.FS backed by Google Cloud Storage.
+type impl struct {
+ bucket *storage.BucketHandle
+}
+
+// NewFS constructs an FS that writes to the provided bucket.
+// On AppEngine, ctx must be a request-derived Context.
+func NewFS(ctx context.Context, bucketName string) (fs.FS, error) {
+ client, err := storage.NewClient(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return &impl{client.Bucket(bucketName)}, nil
+}
+
+func (fs *impl) NewWriter(ctx context.Context, name string, metadata map[string]string) (fs.Writer, error) {
+ w := fs.bucket.Object(name).NewWriter(ctx)
+ // TODO(quentin): Do these need "x-goog-meta-" prefixes?
+ w.Metadata = metadata
+ return w, nil
+}
diff --git a/storage/localserver/app.go b/storage/localserver/app.go
new file mode 100644
index 0000000..e832470
--- /dev/null
+++ b/storage/localserver/app.go
@@ -0,0 +1,35 @@
+// Copyright 2016 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package main
+
+import (
+ "flag"
+ "log"
+ "net/http"
+
+ _ "github.com/mattn/go-sqlite3"
+ "golang.org/x/perf/storage/app"
+ "golang.org/x/perf/storage/db"
+ "golang.org/x/perf/storage/fs"
+)
+
+var host = flag.String("port", ":8080", "(host and) port to bind on")
+
+func main() {
+ flag.Parse()
+
+ db, err := db.OpenSQL("sqlite3", ":memory:")
+ if err != nil {
+ log.Fatalf("open database: %v", err)
+ }
+ fs := fs.NewMemFS()
+
+ app := &app.App{DB: db, FS: fs}
+ app.RegisterOnMux(http.DefaultServeMux)
+
+ log.Printf("Listening on %s", *host)
+
+ log.Fatal(http.ListenAndServe(*host, nil))
+}