storage, cmd/benchsave: move upload logic into Client

Change-Id: Ie2cafb6ee0fe30860803cc332133fc1922857831
Reviewed-on: https://go-review.googlesource.com/38304
Run-TryBot: Quentin Smith <quentin@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Russ Cox <rsc@golang.org>
diff --git a/cmd/benchsave/benchsave.go b/cmd/benchsave/benchsave.go
index 50c70f0..e5f857e 100644
--- a/cmd/benchsave/benchsave.go
+++ b/cmd/benchsave/benchsave.go
@@ -18,20 +18,19 @@
 import (
 	"bytes"
 	"context"
-	"encoding/json"
 	"flag"
 	"fmt"
 	"io"
 	"io/ioutil"
 	"log"
 	"mime"
-	"mime/multipart"
 	"net/http"
 	"os"
 	"path/filepath"
 	"time"
 
 	"golang.org/x/oauth2"
+	"golang.org/x/perf/storage"
 )
 
 var (
@@ -42,18 +41,9 @@
 
 const userAgent = "Benchsave/1.0"
 
-type uploadStatus struct {
-	// UploadID is the upload ID assigned to the upload.
-	UploadID string `json:"uploadid"`
-	// FileIDs is the list of file IDs assigned to the files in the upload.
-	FileIDs []string `json:"fileids"`
-	// ViewURL is a server-supplied URL to view the results.
-	ViewURL string `json:"viewurl"`
-}
-
-// writeOneFile reads name and writes it to mpw.
-func writeOneFile(mpw *multipart.Writer, name string, header []byte) error {
-	w, err := mpw.CreateFormFile("file", filepath.Base(name))
+// writeOneFile reads name and writes it to u.
+func writeOneFile(u *storage.Upload, name string, header []byte) error {
+	w, err := u.CreateFile(filepath.Base(name))
 	if err != nil {
 		return err
 	}
@@ -108,50 +98,24 @@
 	// Or they might need non-Google authentication.
 	hc := oauth2.NewClient(context.Background(), newTokenSource())
 
-	pr, pw := io.Pipe()
-	mpw := multipart.NewWriter(pw)
-
-	go func() {
-		defer pw.Close()
-		defer mpw.Close()
-
-		for _, name := range files {
-			if err := writeOneFile(mpw, name, headerData); err != nil {
-				log.Print(err)
-				mpw.WriteField("abort", "1")
-				// Writing the 'abort' field will cause the server to send back an error response,
-				// which will cause the main goroutine to  below.
-				return
-			}
-		}
-
-		mpw.WriteField("commit", "1")
-	}()
+	client := &storage.Client{BaseURL: *server, HTTPClient: hc}
 
 	start := time.Now()
 
-	req, err := http.NewRequest("POST", *server+"/upload", pr)
-	if err != nil {
-		log.Fatalf("NewRequest failed: %v\n", err)
+	u := client.NewUpload()
+
+	for _, name := range files {
+		if err := writeOneFile(u, name, headerData); err != nil {
+			log.Print(err)
+			u.Abort()
+			return
+		}
 	}
-	req.Header.Set("Content-Type", mpw.FormDataContentType())
-	req.Header.Set("User-Agent", userAgent)
-	resp, err := hc.Do(req)
+
+	status, err := u.Commit()
 	if err != nil {
 		log.Fatalf("upload failed: %v\n", err)
 	}
-	defer resp.Body.Close()
-
-	if resp.StatusCode != 200 {
-		log.Printf("upload failed: %v\n", resp.Status)
-		io.Copy(os.Stderr, resp.Body)
-		os.Exit(1)
-	}
-
-	status := &uploadStatus{}
-	if err := json.NewDecoder(resp.Body).Decode(status); err != nil {
-		log.Fatalf("cannot parse upload response: %v\n", err)
-	}
 
 	if *verbose {
 		s := ""
diff --git a/storage/client.go b/storage/client.go
index 03749f0..c734153 100644
--- a/storage/client.go
+++ b/storage/client.go
@@ -10,6 +10,7 @@
 	"fmt"
 	"io"
 	"io/ioutil"
+	"mime/multipart"
 	"net/http"
 	"net/url"
 
@@ -47,7 +48,6 @@
 	if err != nil {
 		return &Query{err: err}
 	}
-
 	if resp.StatusCode != 200 {
 		defer resp.Body.Close()
 		body, err := ioutil.ReadAll(resp.Body)
@@ -210,4 +210,131 @@
 	return ul.Err()
 }
 
-// TODO(quentin): Move upload code here from cmd/benchsave?
+// NewUpload starts a new upload to the storage server.
+// The upload must have Abort or Commit called on it.
+// If the server requires authentication for uploads, c.HTTPClient should be set to the result of oauth2.NewClient.
+func (c *Client) NewUpload() *Upload {
+	hc := c.httpClient()
+
+	pr, pw := io.Pipe()
+	mpw := multipart.NewWriter(pw)
+
+	req, err := http.NewRequest("POST", c.BaseURL+"/upload", pr)
+	if err != nil {
+		return &Upload{err: err}
+	}
+	req.Header.Set("Content-Type", mpw.FormDataContentType())
+	req.Header.Set("User-Agent", "golang.org/x/perf/storage")
+	errCh := make(chan error)
+	u := &Upload{pw: pw, mpw: mpw, errCh: errCh}
+	go func() {
+		resp, err := hc.Do(req)
+		if err != nil {
+			errCh <- err
+			return
+		}
+		defer resp.Body.Close()
+		if resp.StatusCode != 200 {
+			body, _ := ioutil.ReadAll(resp.Body)
+			errCh <- fmt.Errorf("upload failed: %v\n%s", resp.Status, body)
+			return
+		}
+		status := &UploadStatus{}
+		if err := json.NewDecoder(resp.Body).Decode(status); err != nil {
+			errCh <- err
+		}
+		u.status = status
+		errCh <- nil
+	}()
+	return u
+}
+
+// UploadStatus contains information about a successful upload.
+type UploadStatus struct {
+	// UploadID is the upload ID assigned to the upload.
+	UploadID string `json:"uploadid"`
+	// FileIDs is the list of file IDs assigned to the files in the upload.
+	FileIDs []string `json:"fileids"`
+	// ViewURL is a server-supplied URL to view the results.
+	ViewURL string `json:"viewurl"`
+}
+
+// An Upload is an in-progress upload.
+// Use CreateFile to upload one or more files, then call Commit or Abort.
+//
+//   u := client.NewUpload()
+//   w, err := u.CreateFile()
+//   if err != nil {
+//     u.Abort()
+//     return err
+//   }
+//   fmt.Fprintf(w, "BenchmarkResult 1 1 ns/op\n")
+//   if err := u.Commit(); err != nil {
+//     return err
+//   }
+type Upload struct {
+	pw     io.WriteCloser
+	mpw    *multipart.Writer
+	status *UploadStatus
+	// errCh is used to report the success/failure of the HTTP request
+	errCh chan error
+	// err is the first observed error; it is only accessed from user-called methods for thread safety
+	err error
+}
+
+// CreateFile creates a new upload with the given name.
+// The Writer may be used until CreateFile is called again.
+// name may be the empty string if the file does not have a name.
+func (u *Upload) CreateFile(name string) (io.Writer, error) {
+	if u.err != nil {
+		return nil, u.err
+	}
+	return u.mpw.CreateFormFile("file", name)
+}
+
+// Commit attempts to commit the upload.
+func (u *Upload) Commit() (*UploadStatus, error) {
+	if u.err != nil {
+		return nil, u.err
+	}
+	if u.err = u.mpw.WriteField("commit", "1"); u.err != nil {
+		u.Abort()
+		return nil, u.err
+	}
+	if u.err = u.mpw.Close(); u.err != nil {
+		u.Abort()
+		return nil, u.err
+	}
+	u.mpw = nil
+	if u.err = u.pw.Close(); u.err != nil {
+		u.Abort()
+		return nil, u.err
+	}
+	u.pw = nil
+	u.err = <-u.errCh
+	u.errCh = nil
+	if u.err != nil {
+		return nil, u.err
+	}
+	return u.status, nil
+}
+
+// Abort attempts to cancel the in-progress upload.
+func (u *Upload) Abort() error {
+	if u.mpw != nil {
+		u.mpw.WriteField("abort", "1")
+		// Writing the 'abort' field will cause the server to send back an error response.
+		u.mpw.Close()
+		u.mpw = nil
+	}
+	if u.pw != nil {
+		u.pw.Close()
+		u.pw = nil
+	}
+	err := <-u.errCh
+	u.errCh = nil
+	if u.err == nil {
+		u.err = err
+	}
+	return u.err
+}
diff --git a/storage/client_test.go b/storage/client_test.go
index 230d800..ee327a7 100644
--- a/storage/client_test.go
+++ b/storage/client_test.go
@@ -7,6 +7,8 @@
 import (
 	"bytes"
 	"fmt"
+	"io"
+	"io/ioutil"
 	"net/http"
 	"net/http/httptest"
 	"reflect"
@@ -90,3 +92,115 @@
 		t.Fatalf("Err: %v", err)
 	}
 }
+
+func TestNewUpload(t *testing.T) {
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		if have, want := r.URL.RequestURI(), "/upload"; have != want {
+			t.Errorf("RequestURI = %q, want %q", have, want)
+		}
+		mr, err := r.MultipartReader()
+		if err != nil {
+			t.Error(err)
+		}
+		i := 0
+		for i = 0; ; i++ {
+			p, err := mr.NextPart()
+			if err == io.EOF {
+				break
+			}
+			name := p.FormName()
+			if name == "commit" {
+				continue
+			}
+			if name != "file" {
+				t.Errorf("unexpected field %q, want file", name)
+			}
+			if have, want := p.FileName(), fmt.Sprintf("want%d.txt", i); have != want {
+				t.Errorf("file name = %q, want %q", have, want)
+			}
+			content, _ := ioutil.ReadAll(p)
+			if have, want := string(content), "content"; have != want {
+				t.Errorf("unexpected content %q, want %q", have, want)
+			}
+		}
+		if i != 3 {
+			t.Errorf("number of files = %d, want %d", i, 3)
+		}
+		fmt.Fprintf(w, "%s\n", `{"uploadid": "id", "fileids": ["id/1", "id/2"]}`)
+	}))
+	defer ts.Close()
+
+	c := &Client{BaseURL: ts.URL}
+
+	u := c.NewUpload()
+	for i := 0; i < 2; i++ {
+		w, err := u.CreateFile(fmt.Sprintf("want%d.txt", i))
+		if err != nil {
+			t.Fatalf("CreateFile = %v", err)
+		}
+		if _, err := fmt.Fprintf(w, "content"); err != nil {
+			t.Fatalf("Write returned %v", err)
+		}
+	}
+	status, err := u.Commit()
+	if err != nil {
+		t.Errorf("Commit = %v", err)
+	}
+	if status.UploadID != "id" {
+		t.Errorf("status.UploadID = %q, want %q", status.UploadID, "id")
+	}
+}
+
+func TestNewUploadAbort(t *testing.T) {
+	ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		if have, want := r.URL.RequestURI(), "/upload"; have != want {
+			t.Errorf("RequestURI = %q, want %q", have, want)
+		}
+		mr, err := r.MultipartReader()
+		if err != nil {
+			t.Error(err)
+		}
+		i := 0
+		for i = 0; ; i++ {
+			p, err := mr.NextPart()
+			if err == io.EOF {
+				break
+			}
+			name := p.FormName()
+			if name == "abort" {
+				continue
+			}
+			if name != "file" {
+				t.Errorf("unexpected field %q, want file or abort", name)
+			}
+			if have, want := p.FileName(), fmt.Sprintf("want%d.txt", i); have != want {
+				t.Errorf("file name = %q, want %q", have, want)
+			}
+			content, _ := ioutil.ReadAll(p)
+			if have, want := string(content), "content"; have != want {
+				t.Errorf("unexpected content %q, want %q", have, want)
+			}
+		}
+		if i != 3 {
+			t.Errorf("number of files = %d, want %d", i, 3)
+		}
+		fmt.Fprintf(w, "%s\n", `{"uploadid": "id", "fileids": ["id/1", "id/2"]}`)
+	}))
+	defer ts.Close()
+
+	c := &Client{BaseURL: ts.URL}
+
+	u := c.NewUpload()
+	for i := 0; i < 2; i++ {
+		w, err := u.CreateFile(fmt.Sprintf("want%d.txt", i))
+		if err != nil {
+			t.Fatalf("CreateFile = %v", err)
+		}
+		if _, err := fmt.Fprintf(w, "content"); err != nil {
+			t.Fatalf("Write returned %v", err)
+		}
+	}
+	if err := u.Abort(); err != nil {
+		t.Errorf("Abort = %v", err)
+	}
+}