storage, cmd/benchsave: move upload logic into Client Change-Id: Ie2cafb6ee0fe30860803cc332133fc1922857831 Reviewed-on: https://go-review.googlesource.com/38304 Run-TryBot: Quentin Smith <quentin@golang.org> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/cmd/benchsave/benchsave.go b/cmd/benchsave/benchsave.go index 50c70f0..e5f857e 100644 --- a/cmd/benchsave/benchsave.go +++ b/cmd/benchsave/benchsave.go
@@ -18,20 +18,19 @@ import ( "bytes" "context" - "encoding/json" "flag" "fmt" "io" "io/ioutil" "log" "mime" - "mime/multipart" "net/http" "os" "path/filepath" "time" "golang.org/x/oauth2" + "golang.org/x/perf/storage" ) var ( @@ -42,18 +41,9 @@ const userAgent = "Benchsave/1.0" -type uploadStatus struct { - // UploadID is the upload ID assigned to the upload. - UploadID string `json:"uploadid"` - // FileIDs is the list of file IDs assigned to the files in the upload. - FileIDs []string `json:"fileids"` - // ViewURL is a server-supplied URL to view the results. - ViewURL string `json:"viewurl"` -} - -// writeOneFile reads name and writes it to mpw. -func writeOneFile(mpw *multipart.Writer, name string, header []byte) error { - w, err := mpw.CreateFormFile("file", filepath.Base(name)) +// writeOneFile reads name and writes it to u. +func writeOneFile(u *storage.Upload, name string, header []byte) error { + w, err := u.CreateFile(filepath.Base(name)) if err != nil { return err } @@ -108,50 +98,24 @@ // Or they might need non-Google authentication. hc := oauth2.NewClient(context.Background(), newTokenSource()) - pr, pw := io.Pipe() - mpw := multipart.NewWriter(pw) - - go func() { - defer pw.Close() - defer mpw.Close() - - for _, name := range files { - if err := writeOneFile(mpw, name, headerData); err != nil { - log.Print(err) - mpw.WriteField("abort", "1") - // Writing the 'abort' field will cause the server to send back an error response, - // which will cause the main goroutine to below. - return - } - } - - mpw.WriteField("commit", "1") - }() + client := &storage.Client{BaseURL: *server, HTTPClient: hc} start := time.Now() - req, err := http.NewRequest("POST", *server+"/upload", pr) - if err != nil { - log.Fatalf("NewRequest failed: %v\n", err) + u := client.NewUpload() + + for _, name := range files { + if err := writeOneFile(u, name, headerData); err != nil { + log.Print(err) + u.Abort() + return + } } - req.Header.Set("Content-Type", mpw.FormDataContentType()) - req.Header.Set("User-Agent", userAgent) - resp, err := hc.Do(req) + + status, err := u.Commit() if err != nil { log.Fatalf("upload failed: %v\n", err) } - defer resp.Body.Close() - - if resp.StatusCode != 200 { - log.Printf("upload failed: %v\n", resp.Status) - io.Copy(os.Stderr, resp.Body) - os.Exit(1) - } - - status := &uploadStatus{} - if err := json.NewDecoder(resp.Body).Decode(status); err != nil { - log.Fatalf("cannot parse upload response: %v\n", err) - } if *verbose { s := ""
diff --git a/storage/client.go b/storage/client.go index 03749f0..c734153 100644 --- a/storage/client.go +++ b/storage/client.go
@@ -10,6 +10,7 @@ "fmt" "io" "io/ioutil" + "mime/multipart" "net/http" "net/url" @@ -47,7 +48,6 @@ if err != nil { return &Query{err: err} } - if resp.StatusCode != 200 { defer resp.Body.Close() body, err := ioutil.ReadAll(resp.Body) @@ -210,4 +210,131 @@ return ul.Err() } -// TODO(quentin): Move upload code here from cmd/benchsave? +// NewUpload starts a new upload to the storage server. +// The upload must have Abort or Commit called on it. +// If the server requires authentication for uploads, c.HTTPClient should be set to the result of oauth2.NewClient. +func (c *Client) NewUpload() *Upload { + hc := c.httpClient() + + pr, pw := io.Pipe() + mpw := multipart.NewWriter(pw) + + req, err := http.NewRequest("POST", c.BaseURL+"/upload", pr) + if err != nil { + return &Upload{err: err} + } + req.Header.Set("Content-Type", mpw.FormDataContentType()) + req.Header.Set("User-Agent", "golang.org/x/perf/storage") + errCh := make(chan error) + u := &Upload{pw: pw, mpw: mpw, errCh: errCh} + go func() { + resp, err := hc.Do(req) + if err != nil { + errCh <- err + return + } + defer resp.Body.Close() + if resp.StatusCode != 200 { + body, _ := ioutil.ReadAll(resp.Body) + errCh <- fmt.Errorf("upload failed: %v\n%s", resp.Status, body) + return + } + status := &UploadStatus{} + if err := json.NewDecoder(resp.Body).Decode(status); err != nil { + errCh <- err + } + u.status = status + errCh <- nil + }() + return u +} + +// UploadStatus contains information about a successful upload. +type UploadStatus struct { + // UploadID is the upload ID assigned to the upload. + UploadID string `json:"uploadid"` + // FileIDs is the list of file IDs assigned to the files in the upload. + FileIDs []string `json:"fileids"` + // ViewURL is a server-supplied URL to view the results. + ViewURL string `json:"viewurl"` +} + +// An Upload is an in-progress upload. +// Use CreateFile to upload one or more files, then call Commit or Abort. +// +// u := client.NewUpload() +// w, err := u.CreateFile() +// if err != nil { +// u.Abort() +// return err +// } +// fmt.Fprintf(w, "BenchmarkResult 1 1 ns/op\n") +// if err := u.Commit(); err != nil { +// return err +// } +type Upload struct { + pw io.WriteCloser + mpw *multipart.Writer + status *UploadStatus + // errCh is used to report the success/failure of the HTTP request + errCh chan error + // err is the first observed error; it is only accessed from user-called methods for thread safety + err error +} + +// CreateFile creates a new upload with the given name. +// The Writer may be used until CreateFile is called again. +// name may be the empty string if the file does not have a name. +func (u *Upload) CreateFile(name string) (io.Writer, error) { + if u.err != nil { + return nil, u.err + } + return u.mpw.CreateFormFile("file", name) +} + +// Commit attempts to commit the upload. +func (u *Upload) Commit() (*UploadStatus, error) { + if u.err != nil { + return nil, u.err + } + if u.err = u.mpw.WriteField("commit", "1"); u.err != nil { + u.Abort() + return nil, u.err + } + if u.err = u.mpw.Close(); u.err != nil { + u.Abort() + return nil, u.err + } + u.mpw = nil + if u.err = u.pw.Close(); u.err != nil { + u.Abort() + return nil, u.err + } + u.pw = nil + u.err = <-u.errCh + u.errCh = nil + if u.err != nil { + return nil, u.err + } + return u.status, nil +} + +// Abort attempts to cancel the in-progress upload. +func (u *Upload) Abort() error { + if u.mpw != nil { + u.mpw.WriteField("abort", "1") + // Writing the 'abort' field will cause the server to send back an error response. + u.mpw.Close() + u.mpw = nil + } + if u.pw != nil { + u.pw.Close() + u.pw = nil + } + err := <-u.errCh + u.errCh = nil + if u.err == nil { + u.err = err + } + return u.err +}
diff --git a/storage/client_test.go b/storage/client_test.go index 230d800..ee327a7 100644 --- a/storage/client_test.go +++ b/storage/client_test.go
@@ -7,6 +7,8 @@ import ( "bytes" "fmt" + "io" + "io/ioutil" "net/http" "net/http/httptest" "reflect" @@ -90,3 +92,115 @@ t.Fatalf("Err: %v", err) } } + +func TestNewUpload(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if have, want := r.URL.RequestURI(), "/upload"; have != want { + t.Errorf("RequestURI = %q, want %q", have, want) + } + mr, err := r.MultipartReader() + if err != nil { + t.Error(err) + } + i := 0 + for i = 0; ; i++ { + p, err := mr.NextPart() + if err == io.EOF { + break + } + name := p.FormName() + if name == "commit" { + continue + } + if name != "file" { + t.Errorf("unexpected field %q, want file", name) + } + if have, want := p.FileName(), fmt.Sprintf("want%d.txt", i); have != want { + t.Errorf("file name = %q, want %q", have, want) + } + content, _ := ioutil.ReadAll(p) + if have, want := string(content), "content"; have != want { + t.Errorf("unexpected content %q, want %q", have, want) + } + } + if i != 3 { + t.Errorf("number of files = %d, want %d", i, 3) + } + fmt.Fprintf(w, "%s\n", `{"uploadid": "id", "fileids": ["id/1", "id/2"]}`) + })) + defer ts.Close() + + c := &Client{BaseURL: ts.URL} + + u := c.NewUpload() + for i := 0; i < 2; i++ { + w, err := u.CreateFile(fmt.Sprintf("want%d.txt", i)) + if err != nil { + t.Fatalf("CreateFile = %v", err) + } + if _, err := fmt.Fprintf(w, "content"); err != nil { + t.Fatalf("Write returned %v", err) + } + } + status, err := u.Commit() + if err != nil { + t.Errorf("Commit = %v", err) + } + if status.UploadID != "id" { + t.Errorf("status.UploadID = %q, want %q", status.UploadID, "id") + } +} + +func TestNewUploadAbort(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if have, want := r.URL.RequestURI(), "/upload"; have != want { + t.Errorf("RequestURI = %q, want %q", have, want) + } + mr, err := r.MultipartReader() + if err != nil { + t.Error(err) + } + i := 0 + for i = 0; ; i++ { + p, err := mr.NextPart() + if err == io.EOF { + break + } + name := p.FormName() + if name == "abort" { + continue + } + if name != "file" { + t.Errorf("unexpected field %q, want file or abort", name) + } + if have, want := p.FileName(), fmt.Sprintf("want%d.txt", i); have != want { + t.Errorf("file name = %q, want %q", have, want) + } + content, _ := ioutil.ReadAll(p) + if have, want := string(content), "content"; have != want { + t.Errorf("unexpected content %q, want %q", have, want) + } + } + if i != 3 { + t.Errorf("number of files = %d, want %d", i, 3) + } + fmt.Fprintf(w, "%s\n", `{"uploadid": "id", "fileids": ["id/1", "id/2"]}`) + })) + defer ts.Close() + + c := &Client{BaseURL: ts.URL} + + u := c.NewUpload() + for i := 0; i < 2; i++ { + w, err := u.CreateFile(fmt.Sprintf("want%d.txt", i)) + if err != nil { + t.Fatalf("CreateFile = %v", err) + } + if _, err := fmt.Fprintf(w, "content"); err != nil { + t.Fatalf("Write returned %v", err) + } + } + if err := u.Abort(); err != nil { + t.Errorf("Abort = %v", err) + } +}