blob: 91ccf095ac9814063027a8086bb6dcf6b7df0f6b [file] [log] [blame]
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package app
import (
"encoding/json"
"errors"
"fmt"
"io"
"mime/multipart"
"net/http"
"net/url"
"path/filepath"
"sort"
"strings"
"time"
"golang.org/x/net/context"
"golang.org/x/perf/storage/benchfmt"
"golang.org/x/perf/storage/db"
)
// upload is the handler for the /upload endpoint. It serves a form on
// GET requests and processes files in a multipart/x-form-data POST
// request.
func (a *App) upload(w http.ResponseWriter, r *http.Request) {
ctx := requestContext(r)
user, err := a.Auth(w, r)
switch {
case err == ErrResponseWritten:
return
case err != nil:
errorf(ctx, "%v", err)
http.Error(w, err.Error(), 500)
return
}
if r.Method == http.MethodGet {
http.ServeFile(w, r, filepath.Join(a.BaseDir, "static/upload.html"))
return
}
if r.Method != http.MethodPost {
http.Error(w, "/upload must be called as a POST request", http.StatusMethodNotAllowed)
return
}
// We use r.MultipartReader instead of r.ParseForm to avoid
// storing uploaded data in memory.
mr, err := r.MultipartReader()
if err != nil {
errorf(ctx, "%v", err)
http.Error(w, err.Error(), 500)
return
}
result, err := a.processUpload(ctx, user, mr)
if err != nil {
errorf(ctx, "%v", err)
http.Error(w, err.Error(), 500)
return
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(result); err != nil {
errorf(ctx, "%v", err)
http.Error(w, err.Error(), 500)
return
}
}
// uploadStatus is the response to an /upload POST served as JSON.
type uploadStatus struct {
// UploadID is the upload ID assigned to the upload.
UploadID string `json:"uploadid"`
// FileIDs is the list of file IDs assigned to the files in the upload.
FileIDs []string `json:"fileids"`
// ViewURL is a URL that can be used to interactively view the upload.
ViewURL string `json:"viewurl,omitempty"`
}
// processUpload takes one or more files from a multipart.Reader,
// writes them to the filesystem, and indexes their content.
func (a *App) processUpload(ctx context.Context, user string, mr *multipart.Reader) (*uploadStatus, error) {
var upload *db.Upload
var fileids []string
uploadtime := time.Now().UTC().Format(time.RFC3339)
for i := 0; ; i++ {
p, err := mr.NextPart()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
name := p.FormName()
if name == "commit" {
continue
}
if name != "file" {
return nil, fmt.Errorf("unexpected field %q", name)
}
if upload == nil {
var err error
upload, err = a.DB.NewUpload(ctx)
if err != nil {
return nil, err
}
defer func() {
if upload != nil {
upload.Abort()
}
}()
}
// The incoming file needs to be stored in Cloud
// Storage and it also needs to be indexed. If the file
// is invalid (contains no valid records) it needs to
// be rejected and the Cloud Storage upload aborted.
meta := map[string]string{
"upload": upload.ID,
"upload-part": fmt.Sprintf("%s/%d", upload.ID, i),
"upload-time": uploadtime,
}
name = p.FileName()
if slash := strings.LastIndexAny(name, `/\`); slash >= 0 {
name = name[slash+1:]
}
if name != "" {
meta["upload-file"] = name
}
if user != "" {
meta["by"] = user
}
// We need to do two things with the incoming data:
// - Write it to permanent storage via a.FS
// - Write index records to a.DB
// AND if anything fails, attempt to clean up both the
// FS and the index records.
if err := a.indexFile(ctx, upload, p, meta); err != nil {
return nil, err
}
fileids = append(fileids, meta["upload-part"])
}
if upload == nil {
return nil, errors.New("no files processed")
}
if err := upload.Commit(); err != nil {
return nil, err
}
status := &uploadStatus{UploadID: upload.ID, FileIDs: fileids}
if a.ViewURLBase != "" {
status.ViewURL = a.ViewURLBase + url.QueryEscape(upload.ID)
}
upload = nil
return status, nil
}
func (a *App) indexFile(ctx context.Context, upload *db.Upload, p io.Reader, meta map[string]string) (err error) {
path := fmt.Sprintf("uploads/%s.txt", meta["upload-part"])
fw, err := a.FS.NewWriter(ctx, path, meta)
if err != nil {
return err
}
defer func() {
start := time.Now()
if err != nil {
fw.CloseWithError(err)
} else {
err = fw.Close()
}
infof(ctx, "Close(%q) took %.2f seconds", path, time.Since(start).Seconds())
}()
var keys []string
for k := range meta {
keys = append(keys, k)
}
sort.Strings(keys)
for _, k := range keys {
if _, err := fmt.Fprintf(fw, "%s: %s\n", k, meta[k]); err != nil {
return err
}
}
// Write a blank line to separate metadata from user-generated content.
fmt.Fprintf(fw, "\n")
// TODO(quentin): Add a separate goroutine and buffer for writes to fw?
tr := io.TeeReader(p, fw)
br := benchfmt.NewReader(tr)
br.AddLabels(meta)
i := 0
for br.Next() {
i++
if err := upload.InsertRecord(br.Result()); err != nil {
return err
}
}
if err := br.Err(); err != nil {
return err
}
if i == 0 {
return errors.New("no valid benchmark lines found")
}
return nil
}