storage, cmd/benchsave: move upload logic into Client
Change-Id: Ie2cafb6ee0fe30860803cc332133fc1922857831
Reviewed-on: https://go-review.googlesource.com/38304
Run-TryBot: Quentin Smith <quentin@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/cmd/benchsave/benchsave.go b/cmd/benchsave/benchsave.go
index 50c70f0..e5f857e 100644
--- a/cmd/benchsave/benchsave.go
+++ b/cmd/benchsave/benchsave.go
@@ -18,20 +18,19 @@
import (
"bytes"
"context"
- "encoding/json"
"flag"
"fmt"
"io"
"io/ioutil"
"log"
"mime"
- "mime/multipart"
"net/http"
"os"
"path/filepath"
"time"
"golang.org/x/oauth2"
+ "golang.org/x/perf/storage"
)
var (
@@ -42,18 +41,9 @@
const userAgent = "Benchsave/1.0"
-type uploadStatus struct {
- // UploadID is the upload ID assigned to the upload.
- UploadID string `json:"uploadid"`
- // FileIDs is the list of file IDs assigned to the files in the upload.
- FileIDs []string `json:"fileids"`
- // ViewURL is a server-supplied URL to view the results.
- ViewURL string `json:"viewurl"`
-}
-
-// writeOneFile reads name and writes it to mpw.
-func writeOneFile(mpw *multipart.Writer, name string, header []byte) error {
- w, err := mpw.CreateFormFile("file", filepath.Base(name))
+// writeOneFile reads name and writes it to u.
+func writeOneFile(u *storage.Upload, name string, header []byte) error {
+ w, err := u.CreateFile(filepath.Base(name))
if err != nil {
return err
}
@@ -108,50 +98,24 @@
// Or they might need non-Google authentication.
hc := oauth2.NewClient(context.Background(), newTokenSource())
- pr, pw := io.Pipe()
- mpw := multipart.NewWriter(pw)
-
- go func() {
- defer pw.Close()
- defer mpw.Close()
-
- for _, name := range files {
- if err := writeOneFile(mpw, name, headerData); err != nil {
- log.Print(err)
- mpw.WriteField("abort", "1")
- // Writing the 'abort' field will cause the server to send back an error response,
- // which will cause the main goroutine to below.
- return
- }
- }
-
- mpw.WriteField("commit", "1")
- }()
+ client := &storage.Client{BaseURL: *server, HTTPClient: hc}
start := time.Now()
- req, err := http.NewRequest("POST", *server+"/upload", pr)
- if err != nil {
- log.Fatalf("NewRequest failed: %v\n", err)
+ u := client.NewUpload()
+
+ for _, name := range files {
+ if err := writeOneFile(u, name, headerData); err != nil {
+ log.Print(err)
+ u.Abort()
+ return
+ }
}
- req.Header.Set("Content-Type", mpw.FormDataContentType())
- req.Header.Set("User-Agent", userAgent)
- resp, err := hc.Do(req)
+
+ status, err := u.Commit()
if err != nil {
log.Fatalf("upload failed: %v\n", err)
}
- defer resp.Body.Close()
-
- if resp.StatusCode != 200 {
- log.Printf("upload failed: %v\n", resp.Status)
- io.Copy(os.Stderr, resp.Body)
- os.Exit(1)
- }
-
- status := &uploadStatus{}
- if err := json.NewDecoder(resp.Body).Decode(status); err != nil {
- log.Fatalf("cannot parse upload response: %v\n", err)
- }
if *verbose {
s := ""
diff --git a/storage/client.go b/storage/client.go
index 03749f0..c734153 100644
--- a/storage/client.go
+++ b/storage/client.go
@@ -10,6 +10,7 @@
"fmt"
"io"
"io/ioutil"
+ "mime/multipart"
"net/http"
"net/url"
@@ -47,7 +48,6 @@
if err != nil {
return &Query{err: err}
}
-
if resp.StatusCode != 200 {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
@@ -210,4 +210,131 @@
return ul.Err()
}
-// TODO(quentin): Move upload code here from cmd/benchsave?
+// NewUpload starts a new upload to the storage server.
+// The upload must have Abort or Commit called on it.
+// If the server requires authentication for uploads, c.HTTPClient should be set to the result of oauth2.NewClient.
+func (c *Client) NewUpload() *Upload {
+ hc := c.httpClient()
+
+ pr, pw := io.Pipe()
+ mpw := multipart.NewWriter(pw)
+
+ req, err := http.NewRequest("POST", c.BaseURL+"/upload", pr)
+ if err != nil {
+ return &Upload{err: err}
+ }
+ req.Header.Set("Content-Type", mpw.FormDataContentType())
+ req.Header.Set("User-Agent", "golang.org/x/perf/storage")
+ errCh := make(chan error)
+ u := &Upload{pw: pw, mpw: mpw, errCh: errCh}
+ go func() {
+ resp, err := hc.Do(req)
+ if err != nil {
+ errCh <- err
+ return
+ }
+ defer resp.Body.Close()
+ if resp.StatusCode != 200 {
+ body, _ := ioutil.ReadAll(resp.Body)
+ errCh <- fmt.Errorf("upload failed: %v\n%s", resp.Status, body)
+ return
+ }
+ status := &UploadStatus{}
+ if err := json.NewDecoder(resp.Body).Decode(status); err != nil {
+ errCh <- err
+ }
+ u.status = status
+ errCh <- nil
+ }()
+ return u
+}
+
+// UploadStatus contains information about a successful upload.
+type UploadStatus struct {
+ // UploadID is the upload ID assigned to the upload.
+ UploadID string `json:"uploadid"`
+ // FileIDs is the list of file IDs assigned to the files in the upload.
+ FileIDs []string `json:"fileids"`
+ // ViewURL is a server-supplied URL to view the results.
+ ViewURL string `json:"viewurl"`
+}
+
+// An Upload is an in-progress upload.
+// Use CreateFile to upload one or more files, then call Commit or Abort.
+//
+// u := client.NewUpload()
+// w, err := u.CreateFile()
+// if err != nil {
+// u.Abort()
+// return err
+// }
+// fmt.Fprintf(w, "BenchmarkResult 1 1 ns/op\n")
+// if err := u.Commit(); err != nil {
+// return err
+// }
+type Upload struct {
+ pw io.WriteCloser
+ mpw *multipart.Writer
+ status *UploadStatus
+ // errCh is used to report the success/failure of the HTTP request
+ errCh chan error
+ // err is the first observed error; it is only accessed from user-called methods for thread safety
+ err error
+}
+
+// CreateFile creates a new upload with the given name.
+// The Writer may be used until CreateFile is called again.
+// name may be the empty string if the file does not have a name.
+func (u *Upload) CreateFile(name string) (io.Writer, error) {
+ if u.err != nil {
+ return nil, u.err
+ }
+ return u.mpw.CreateFormFile("file", name)
+}
+
+// Commit attempts to commit the upload.
+func (u *Upload) Commit() (*UploadStatus, error) {
+ if u.err != nil {
+ return nil, u.err
+ }
+ if u.err = u.mpw.WriteField("commit", "1"); u.err != nil {
+ u.Abort()
+ return nil, u.err
+ }
+ if u.err = u.mpw.Close(); u.err != nil {
+ u.Abort()
+ return nil, u.err
+ }
+ u.mpw = nil
+ if u.err = u.pw.Close(); u.err != nil {
+ u.Abort()
+ return nil, u.err
+ }
+ u.pw = nil
+ u.err = <-u.errCh
+ u.errCh = nil
+ if u.err != nil {
+ return nil, u.err
+ }
+ return u.status, nil
+}
+
+// Abort attempts to cancel the in-progress upload.
+func (u *Upload) Abort() error {
+ if u.mpw != nil {
+ u.mpw.WriteField("abort", "1")
+ // Writing the 'abort' field will cause the server to send back an error response.
+ u.mpw.Close()
+ u.mpw = nil
+ }
+ if u.pw != nil {
+ u.pw.Close()
+ u.pw = nil
+ }
+ err := <-u.errCh
+ u.errCh = nil
+ if u.err == nil {
+ u.err = err
+ }
+ return u.err
+}
diff --git a/storage/client_test.go b/storage/client_test.go
index 230d800..ee327a7 100644
--- a/storage/client_test.go
+++ b/storage/client_test.go
@@ -7,6 +7,8 @@
import (
"bytes"
"fmt"
+ "io"
+ "io/ioutil"
"net/http"
"net/http/httptest"
"reflect"
@@ -90,3 +92,115 @@
t.Fatalf("Err: %v", err)
}
}
+
+func TestNewUpload(t *testing.T) {
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if have, want := r.URL.RequestURI(), "/upload"; have != want {
+ t.Errorf("RequestURI = %q, want %q", have, want)
+ }
+ mr, err := r.MultipartReader()
+ if err != nil {
+ t.Error(err)
+ }
+ i := 0
+ for i = 0; ; i++ {
+ p, err := mr.NextPart()
+ if err == io.EOF {
+ break
+ }
+ name := p.FormName()
+ if name == "commit" {
+ continue
+ }
+ if name != "file" {
+ t.Errorf("unexpected field %q, want file", name)
+ }
+ if have, want := p.FileName(), fmt.Sprintf("want%d.txt", i); have != want {
+ t.Errorf("file name = %q, want %q", have, want)
+ }
+ content, _ := ioutil.ReadAll(p)
+ if have, want := string(content), "content"; have != want {
+ t.Errorf("unexpected content %q, want %q", have, want)
+ }
+ }
+ if i != 3 {
+ t.Errorf("number of files = %d, want %d", i, 3)
+ }
+ fmt.Fprintf(w, "%s\n", `{"uploadid": "id", "fileids": ["id/1", "id/2"]}`)
+ }))
+ defer ts.Close()
+
+ c := &Client{BaseURL: ts.URL}
+
+ u := c.NewUpload()
+ for i := 0; i < 2; i++ {
+ w, err := u.CreateFile(fmt.Sprintf("want%d.txt", i))
+ if err != nil {
+ t.Fatalf("CreateFile = %v", err)
+ }
+ if _, err := fmt.Fprintf(w, "content"); err != nil {
+ t.Fatalf("Write returned %v", err)
+ }
+ }
+ status, err := u.Commit()
+ if err != nil {
+ t.Errorf("Commit = %v", err)
+ }
+ if status.UploadID != "id" {
+ t.Errorf("status.UploadID = %q, want %q", status.UploadID, "id")
+ }
+}
+
+func TestNewUploadAbort(t *testing.T) {
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ if have, want := r.URL.RequestURI(), "/upload"; have != want {
+ t.Errorf("RequestURI = %q, want %q", have, want)
+ }
+ mr, err := r.MultipartReader()
+ if err != nil {
+ t.Error(err)
+ }
+ i := 0
+ for i = 0; ; i++ {
+ p, err := mr.NextPart()
+ if err == io.EOF {
+ break
+ }
+ name := p.FormName()
+ if name == "abort" {
+ continue
+ }
+ if name != "file" {
+ t.Errorf("unexpected field %q, want file or abort", name)
+ }
+ if have, want := p.FileName(), fmt.Sprintf("want%d.txt", i); have != want {
+ t.Errorf("file name = %q, want %q", have, want)
+ }
+ content, _ := ioutil.ReadAll(p)
+ if have, want := string(content), "content"; have != want {
+ t.Errorf("unexpected content %q, want %q", have, want)
+ }
+ }
+ if i != 3 {
+ t.Errorf("number of files = %d, want %d", i, 3)
+ }
+ fmt.Fprintf(w, "%s\n", `{"uploadid": "id", "fileids": ["id/1", "id/2"]}`)
+ }))
+ defer ts.Close()
+
+ c := &Client{BaseURL: ts.URL}
+
+ u := c.NewUpload()
+ for i := 0; i < 2; i++ {
+ w, err := u.CreateFile(fmt.Sprintf("want%d.txt", i))
+ if err != nil {
+ t.Fatalf("CreateFile = %v", err)
+ }
+ if _, err := fmt.Fprintf(w, "content"); err != nil {
+ t.Fatalf("Write returned %v", err)
+ }
+ }
+ if err := u.Abort(); err != nil {
+ t.Errorf("Abort = %v", err)
+ }
+}