storage: initial perfdata AE app This creates the skeleton of an AppEngine app for perfdata.golang.org and adds an initial implementation of /upload that saves the provided file into Cloud Storage. Change-Id: I1fe19b27841ab62aad146d1d1019996634012d35 Reviewed-on: https://go-review.googlesource.com/34620 Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/storage/app/app.go b/storage/app/app.go new file mode 100644 index 0000000..adcae63 --- /dev/null +++ b/storage/app/app.go
@@ -0,0 +1,28 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package app implements the performance data storage server. Combine +// an App with a database and filesystem to get an HTTP server. +package app + +import ( + "net/http" + + "golang.org/x/perf/storage/db" + "golang.org/x/perf/storage/fs" +) + +// App manages the storage server logic. Construct an App instance +// using a literal with DB and FS objects and call RegisterOnMux to +// connect it with an HTTP server. +type App struct { + DB *db.DB + FS fs.FS +} + +// RegisterOnMux registers the app's URLs on mux. +func (a *App) RegisterOnMux(mux *http.ServeMux) { + // TODO(quentin): Should we just make the App itself be an http.Handler? + mux.HandleFunc("/upload", a.upload) +}
diff --git a/storage/app/appengine.go b/storage/app/appengine.go new file mode 100644 index 0000000..b9ff1e6 --- /dev/null +++ b/storage/app/appengine.go
@@ -0,0 +1,22 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build appengine + +package app + +import ( + "net/http" + + "golang.org/x/net/context" + "google.golang.org/appengine" + "google.golang.org/appengine/log" +) + +// requestContext returns the Context object for a given request. +func requestContext(r *http.Request) context.Context { + return appengine.NewContext(r) +} + +var errorf = log.Errorf
diff --git a/storage/app/local.go b/storage/app/local.go new file mode 100644 index 0000000..4a9cf43 --- /dev/null +++ b/storage/app/local.go
@@ -0,0 +1,23 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// +build !appengine + +package app + +import ( + "log" + "net/http" + + "golang.org/x/net/context" +) + +// requestContext returns the Context object for a given request. +func requestContext(r *http.Request) context.Context { + return r.Context() +} + +func errorf(_ context.Context, format string, args ...interface{}) { + log.Printf(format, args...) +}
diff --git a/storage/app/upload.go b/storage/app/upload.go new file mode 100644 index 0000000..1f968a2 --- /dev/null +++ b/storage/app/upload.go
@@ -0,0 +1,133 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package app + +import ( + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + + "golang.org/x/net/context" +) + +// upload is the handler for the /upload endpoint. It serves a form on +// GET requests and processes files in a multipart/x-form-data POST +// request. +func (a *App) upload(w http.ResponseWriter, r *http.Request) { + ctx := requestContext(r) + + // TODO(quentin): Authentication + + if r.Method == http.MethodGet { + http.ServeFile(w, r, "static/upload.html") + return + } + if r.Method != http.MethodPost { + http.Error(w, "/upload must be called as a POST request", http.StatusMethodNotAllowed) + return + } + + // We use r.MultipartReader instead of r.ParseForm to avoid + // storing uploaded data in memory. + mr, err := r.MultipartReader() + if err != nil { + errorf(ctx, "%v", err) + http.Error(w, err.Error(), 500) + return + } + + result, err := a.processUpload(ctx, mr) + if err != nil { + errorf(ctx, "%v", err) + http.Error(w, err.Error(), 500) + return + } + + if err := json.NewEncoder(w).Encode(result); err != nil { + errorf(ctx, "%v", err) + http.Error(w, err.Error(), 500) + return + } +} + +// uploadStatus is the response to an /upload POST served as JSON. +type uploadStatus struct { + // UploadID is the upload ID assigned to the upload. + UploadID string `json:"uploadid"` + // FileIDs is the list of file IDs assigned to the files in the upload. + FileIDs []string `json:"fileids"` +} + +// processUpload takes one or more files from a multipart.Reader, +// writes them to the filesystem, and indexes their content. +func (a *App) processUpload(ctx context.Context, mr *multipart.Reader) (*uploadStatus, error) { + var uploadid string + + var status uploadStatus + + for i := 0; ; i++ { + p, err := mr.NextPart() + if err == io.EOF { + break + } + + name := p.FormName() + if name != "file" { + return nil, fmt.Errorf("unexpected field %q", name) + } + + if uploadid == "" { + var err error + uploadid, err = a.DB.ReserveUploadID(ctx) + if err != nil { + return nil, err + } + status.UploadID = uploadid + } + + // The incoming file needs to be stored in Cloud + // Storage and it also needs to be indexed. If the file + // is invalid (contains no valid records) it needs to + // be rejected and the Cloud Storage upload aborted. + // TODO(quentin): We might as well do these in parallel. + + meta := fileMetadata(ctx, uploadid, i) + + fw, err := a.FS.NewWriter(ctx, fmt.Sprintf("uploads/%s.txt", meta["fileid"]), meta) + if err != nil { + return nil, err + } + + // TODO(quentin): Write metadata at top of file + if _, err := io.Copy(fw, p); err != nil { + fw.CloseWithError(err) + return nil, err + } + // TODO(quentin): Write records to database + + if err := fw.Close(); err != nil { + return nil, err + } + + status.FileIDs = append(status.FileIDs, meta["fileid"]) + } + + return &status, nil +} + +// fileMetadata returns the extra metadata fields associated with an +// uploaded file. It obtains the uploader's e-mail address from the +// Context. +func fileMetadata(_ context.Context, uploadid string, filenum int) map[string]string { + // TODO(quentin): Add the name of the uploader. + // TODO(quentin): Add the upload time. + // TODO(quentin): Add other fields? + return map[string]string{ + "uploadid": uploadid, + "fileid": fmt.Sprintf("%s/%d", uploadid, filenum), + } +}
diff --git a/storage/app/upload_test.go b/storage/app/upload_test.go new file mode 100644 index 0000000..972ebdc --- /dev/null +++ b/storage/app/upload_test.go
@@ -0,0 +1,63 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package app + +import ( + "fmt" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "net/http/httptest" + "testing" + + _ "github.com/mattn/go-sqlite3" + "golang.org/x/perf/storage/db" + "golang.org/x/perf/storage/fs" +) + +func TestUpload(t *testing.T) { + db, err := db.OpenSQL("sqlite3", ":memory:") + if err != nil { + t.Fatalf("open database: %v", err) + } + defer db.Close() + + fs := fs.NewMemFS() + + app := &App{DB: db, FS: fs} + + srv := httptest.NewServer(http.HandlerFunc(app.upload)) + defer srv.Close() + pr, pw := io.Pipe() + mpw := multipart.NewWriter(pw) + go func() { + defer pw.Close() + defer mpw.Close() + // Write the parts here + w, err := mpw.CreateFormFile("file", "1.txt") + if err != nil { + t.Errorf("CreateFormFile: %v", err) + } + fmt.Fprintf(w, "key: value\nBenchmarkOne 5 ns/op\n") + }() + resp, err := http.Post(srv.URL, mpw.FormDataContentType(), pr) + if err != nil { + t.Fatalf("post /upload: %v", err) + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + t.Errorf("post /upload: %v", resp.Status) + } + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + t.Errorf("reading /upload response: %v", err) + } + t.Logf("/upload response:\n%s", body) + + if len(fs.Files()) != 1 { + t.Errorf("/upload wrote %d files, want 1", len(fs.Files())) + } +}
diff --git a/storage/appengine/app.go b/storage/appengine/app.go new file mode 100644 index 0000000..a3b3007 --- /dev/null +++ b/storage/appengine/app.go
@@ -0,0 +1,76 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package appengine contains an AppEngine app for perfdata.golang.org +package appengine + +import ( + "fmt" + "log" + "net/http" + "os" + + _ "github.com/go-sql-driver/mysql" + "golang.org/x/perf/storage/app" + "golang.org/x/perf/storage/db" + "golang.org/x/perf/storage/fs/gcs" + "google.golang.org/appengine" + aelog "google.golang.org/appengine/log" +) + +// connectDB returns a DB initialized from the environment variables set in app.yaml. CLOUDSQL_CONNECTION_NAME, CLOUDSQL_USER, and CLOUDSQL_DATABASE must be set to point to the Cloud SQL instance. CLOUDSQL_PASSWORD can be set if needed. +func connectDB() (*db.DB, error) { + var ( + connectionName = mustGetenv("CLOUDSQL_CONNECTION_NAME") + user = mustGetenv("CLOUDSQL_USER") + password = os.Getenv("CLOUDSQL_PASSWORD") // NOTE: password may be empty + dbName = mustGetenv("CLOUDSQL_DATABASE") + ) + + return db.OpenSQL("mysql", fmt.Sprintf("%s:%s@cloudsql(%s)/%s", user, password, connectionName, dbName)) +} + +func mustGetenv(k string) string { + v := os.Getenv(k) + if v == "" { + log.Panicf("%s environment variable not set.", k) + } + return v +} + +// appHandler is the default handler, registered to serve "/". +// It creates a new App instance using the appengine Context and then +// dispatches the request to the App. The environment variable +// GCS_BUCKET must be set in app.yaml with the name of the bucket to +// write to. +func appHandler(w http.ResponseWriter, r *http.Request) { + ctx := appengine.NewContext(r) + // GCS clients need to be constructed with an AppEngine + // context, so we can't actually make the App until the + // request comes in. + // TODO(quentin): Figure out if there's a way to construct the + // app and clients once, in init(), instead of on every request. + db, err := connectDB() + if err != nil { + aelog.Errorf(ctx, "connectDB: %v", err) + http.Error(w, err.Error(), 500) + return + } + defer db.Close() + + fs, err := gcs.NewFS(ctx, mustGetenv("GCS_BUCKET")) + if err != nil { + aelog.Errorf(ctx, "gcs.NewFS: %v", err) + http.Error(w, err.Error(), 500) + return + } + mux := http.NewServeMux() + app := &app.App{DB: db, FS: fs} + app.RegisterOnMux(mux) + mux.ServeHTTP(w, r) +} + +func init() { + http.HandleFunc("/", appHandler) +}
diff --git a/storage/appengine/app.yaml b/storage/appengine/app.yaml new file mode 100644 index 0000000..50002ed --- /dev/null +++ b/storage/appengine/app.yaml
@@ -0,0 +1,23 @@ +# Update with +# google_appengine/appcfg.py [-V dev-test] update . +# +# Using -V dev-test will run as dev-test.perfdata.golang.org. + +application: golang-org +module: perfdata +version: main +runtime: go +api_version: go1 + +handlers: +- url: /_ah/remote_api + script: _go_app +- url: /upload + script: _go_app + secure: always +env_variables: + CLOUDSQL_CONNECTION_NAME: golang-org:us-central1:golang-org + CLOUDSQL_USER: root + CLOUDSQL_PASSWORD: '' + CLOUDSQL_DATABASE: perfdata + GCS_BUCKET: golang-perfdata
diff --git a/storage/appengine/static/upload.html b/storage/appengine/static/upload.html new file mode 100644 index 0000000..57e114b --- /dev/null +++ b/storage/appengine/static/upload.html
@@ -0,0 +1,13 @@ +<!DOCTYPE html> +<html> + <head> + <title>Upload Performance Results</title> + </head> + <body> + <p>Upload one or more <a href="https://github.com/golang/proposal/blob/master/design/14313-benchmark-format.md">benchmark files</a>.</p> + <form method="post" enctype="multipart/form-data"> + <label>File: <input type="file" name="file" multiple></label><br> + <input type="submit" value="Upload"> + </form> + </body> +</html>
diff --git a/storage/db/db.go b/storage/db/db.go new file mode 100644 index 0000000..4c84d14 --- /dev/null +++ b/storage/db/db.go
@@ -0,0 +1,107 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package db provides the high-level database interface for the +// storage app. +package db + +import ( + "database/sql" + "fmt" + "strings" + + "golang.org/x/net/context" +) + +// DB is a high-level interface to a database for the storage +// app. It's safe for concurrent use by multiple goroutines. +type DB struct { + sql *sql.DB + insertUpload *sql.Stmt +} + +// OpenSQL creates a DB backed by a SQL database. The parameters are +// the same as the parameters for sql.Open. Only mysql and sqlite3 are +// explicitly supported; other database engines will receive MySQL +// query syntax which may or may not be compatible. +func OpenSQL(driverName, dataSourceName string) (*DB, error) { + db, err := sql.Open(driverName, dataSourceName) + if err != nil { + return nil, err + } + d := &DB{sql: db} + if err := d.createTables(driverName); err != nil { + return nil, err + } + if err := d.prepareStatements(driverName); err != nil { + return nil, err + } + return d, nil +} + +// createTables creates any missing tables on the connection in +// db.sql. driverName is the same driver name passed to sql.Open and +// is used to select the correct syntax. +func (db *DB) createTables(driverName string) error { + var schema string + switch driverName { + case "sqlite3": + schema = ` +CREATE TABLE IF NOT EXISTS Uploads ( + UploadId INTEGER PRIMARY KEY AUTOINCREMENT +); +` + default: // MySQL syntax + schema = ` +CREATE TABLE IF NOT EXISTS Uploads ( + UploadId SERIAL PRIMARY KEY AUTO_INCREMENT +);` + } + for _, q := range strings.Split(schema, ";") { + if strings.TrimSpace(q) == "" { + continue + } + if _, err := db.sql.Exec(q); err != nil { + return fmt.Errorf("create table: %v", err) + } + } + return nil +} + +// prepareStatements calls db.sql.Prepare on reusable SQL statements. +func (db *DB) prepareStatements(driverName string) error { + var err error + q := "INSERT INTO Uploads() VALUES ()" + if driverName == "sqlite3" { + q = "INSERT INTO Uploads DEFAULT VALUES" + } + db.insertUpload, err = db.sql.Prepare(q) + if err != nil { + return err + } + return nil +} + +// ReserveUploadID returns an upload ID which can be used for storing new files. +func (db *DB) ReserveUploadID(ctx context.Context) (string, error) { + // TODO(quentin): Use a transaction? + res, err := db.insertUpload.Exec() + if err != nil { + return "", err + } + // TODO(quentin): Use a date-based upload ID (YYYYMMDDnnn) + i, err := res.LastInsertId() + if err != nil { + return "", err + } + return fmt.Sprint(i), nil +} + +// TODO(quentin): Implement +// func (db *DB) InsertRecord(uploadid string, fields map[string]string, lines map[int]string) error + +// Close closes the database connections, releasing any open resources. +func (db *DB) Close() error { + return db.sql.Close() +}
diff --git a/storage/db/schema.sql b/storage/db/schema.sql new file mode 100644 index 0000000..54233ec --- /dev/null +++ b/storage/db/schema.sql
@@ -0,0 +1,22 @@ +-- The intended production Cloud SQL schema. Committed here only as a +-- form of notes (see the actual current schema in +-- db.go:createTables). + +CREATE TABLE Uploads ( + UploadId SERIAL PRIMARY KEY AUTO_INCREMENT +); +CREATE TABLE Records ( + UploadId BIGINT UNSIGNED, + RecordId BIGINT UNSIGNED, + Contents BLOB, + PRIMARY KEY (UploadId, RecordId), + FOREIGN KEY (UploadId) REFERENCES Uploads(UploadId) +); +CREATE TABLE RecordLabels ( + UploadId BIGINT UNSIGNED, + RecordId BIGINT UNSIGNED, + Name VARCHAR(255), + Value VARCHAR(8192), + INDEX (Name(100), Value(100)), + FOREIGN KEY (UploadId, RecordId) REFERENCES Records(UploadId, RecordId) +);
diff --git a/storage/fs/fs.go b/storage/fs/fs.go new file mode 100644 index 0000000..a2639b4 --- /dev/null +++ b/storage/fs/fs.go
@@ -0,0 +1,98 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package fs provides a backend-agnostic filesystem layer for storing +// performance results. +package fs + +import ( + "errors" + "io" + "sort" + "sync" + + "golang.org/x/net/context" +) + +// An FS stores uploaded benchmark data files. +type FS interface { + // NewWriter returns a Writer for a given file name. + // When the Writer is closed, the file will be stored with the + // given metadata and the data written to the writer. + NewWriter(ctx context.Context, name string, metadata map[string]string) (Writer, error) +} + +// A Writer is an io.Writer that can also be closed with an error. +type Writer interface { + io.WriteCloser + // CloseWithError cancels the writing of the file, removing + // any partially written data. + CloseWithError(error) error +} + +// MemFS is an in-memory filesystem implementing the FS interface. +type MemFS struct { + mu sync.Mutex + content map[string]*memFile +} + +// NewMemFS constructs a new, empty MemFS. +func NewMemFS() *MemFS { + return &MemFS{ + content: make(map[string]*memFile), + } +} + +// NewWriter returns a Writer for a given file name. As a side effect, +// it associates the given metadata with the file. +func (fs *MemFS) NewWriter(_ context.Context, name string, metadata map[string]string) (Writer, error) { + meta := make(map[string]string) + for k, v := range metadata { + meta[k] = v + } + return &memFile{fs: fs, name: name, metadata: meta}, nil +} + +// Files returns the names of the files written to fs. +func (fs *MemFS) Files() []string { + fs.mu.Lock() + defer fs.mu.Unlock() + var files []string + for f := range fs.content { + files = append(files, f) + } + sort.Strings(files) + return files +} + +// memFile represents a file in a MemFS. While the file is being +// written, fs points to the filesystem. Close writes the file's +// content to fs and sets fs to nil. +type memFile struct { + fs *MemFS + name string + metadata map[string]string + content []byte +} + +func (f *memFile) Write(p []byte) (int, error) { + f.content = append(f.content, p...) + return len(p), nil +} + +func (f *memFile) Close() error { + if f.fs == nil { + return errors.New("already closed") + } + f.fs.mu.Lock() + defer f.fs.mu.Unlock() + f.fs.content[f.name] = f + f.fs = nil + return nil +} + +func (f *memFile) CloseWithError(error) error { + f.fs = nil + return nil +}
diff --git a/storage/fs/gcs/gcs.go b/storage/fs/gcs/gcs.go new file mode 100644 index 0000000..60a12e4 --- /dev/null +++ b/storage/fs/gcs/gcs.go
@@ -0,0 +1,34 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +// Package gcs implements the fs.FS interface using Google Cloud Storage. +package gcs + +import ( + "cloud.google.com/go/storage" + "golang.org/x/net/context" + "golang.org/x/perf/storage/fs" +) + +// impl is an fs.FS backed by Google Cloud Storage. +type impl struct { + bucket *storage.BucketHandle +} + +// NewFS constructs an FS that writes to the provided bucket. +// On AppEngine, ctx must be a request-derived Context. +func NewFS(ctx context.Context, bucketName string) (fs.FS, error) { + client, err := storage.NewClient(ctx) + if err != nil { + return nil, err + } + return &impl{client.Bucket(bucketName)}, nil +} + +func (fs *impl) NewWriter(ctx context.Context, name string, metadata map[string]string) (fs.Writer, error) { + w := fs.bucket.Object(name).NewWriter(ctx) + // TODO(quentin): Do these need "x-goog-meta-" prefixes? + w.Metadata = metadata + return w, nil +}
diff --git a/storage/localserver/app.go b/storage/localserver/app.go new file mode 100644 index 0000000..e832470 --- /dev/null +++ b/storage/localserver/app.go
@@ -0,0 +1,35 @@ +// Copyright 2016 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package main + +import ( + "flag" + "log" + "net/http" + + _ "github.com/mattn/go-sqlite3" + "golang.org/x/perf/storage/app" + "golang.org/x/perf/storage/db" + "golang.org/x/perf/storage/fs" +) + +var host = flag.String("port", ":8080", "(host and) port to bind on") + +func main() { + flag.Parse() + + db, err := db.OpenSQL("sqlite3", ":memory:") + if err != nil { + log.Fatalf("open database: %v", err) + } + fs := fs.NewMemFS() + + app := &app.App{DB: db, FS: fs} + app.RegisterOnMux(http.DefaultServeMux) + + log.Printf("Listening on %s", *host) + + log.Fatal(http.ListenAndServe(*host, nil)) +}