| // Copyright 2017 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Package storage contains a client for the performance data storage server. |
| package storage |
| |
| import ( |
| "encoding/json" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "mime/multipart" |
| "net/http" |
| "net/url" |
| |
| "golang.org/x/net/context" |
| "golang.org/x/net/context/ctxhttp" |
| "golang.org/x/perf/storage/benchfmt" |
| ) |
| |
| // A Client issues queries to a performance data storage server. |
| // It is safe to use from multiple goroutines simultaneously. |
| type Client struct { |
| // BaseURL is the base URL of the storage server. |
| BaseURL string |
| // HTTPClient is the HTTP client for sending requests. If nil, http.DefaultClient will be used. |
| HTTPClient *http.Client |
| } |
| |
| // httpClient returns the http.Client to use for requests. |
| func (c *Client) httpClient() *http.Client { |
| if c.HTTPClient != nil { |
| return c.HTTPClient |
| } |
| return http.DefaultClient |
| } |
| |
| // Query searches for results matching the given query string. |
| // |
| // The query string is first parsed into quoted words (as in the shell) |
| // and then each word must be formatted as one of the following: |
| // key:value - exact match on label "key" = "value" |
| // key>value - value greater than (useful for dates) |
| // key<value - value less than (also useful for dates) |
| func (c *Client) Query(ctx context.Context, q string) *Query { |
| hc := c.httpClient() |
| |
| resp, err := ctxhttp.Get(ctx, hc, c.BaseURL+"/search?"+url.Values{"q": []string{q}}.Encode()) |
| if err != nil { |
| return &Query{err: err} |
| } |
| if resp.StatusCode != 200 { |
| defer resp.Body.Close() |
| body, err := ioutil.ReadAll(resp.Body) |
| if err != nil { |
| return &Query{err: err} |
| } |
| return &Query{err: fmt.Errorf("%s", body)} |
| } |
| |
| br := benchfmt.NewReader(resp.Body) |
| |
| return &Query{br: br, body: resp.Body} |
| } |
| |
| // A Query allows iteration over the results of a search query. |
| // Use Next to advance through the results, making sure to call Close when done: |
| // |
| // q := client.Query("key:value") |
| // defer q.Close() |
| // for q.Next() { |
| // res := q.Result() |
| // ... |
| // } |
| // if err = q.Err(); err != nil { |
| // // handle error encountered during query |
| // } |
| type Query struct { |
| br *benchfmt.Reader |
| body io.ReadCloser |
| err error |
| } |
| |
| // Next prepares the next result for reading with the Result |
| // method. It returns false when there are no more results, either by |
| // reaching the end of the input or an error. |
| func (q *Query) Next() bool { |
| if q.err != nil { |
| return false |
| } |
| return q.br.Next() |
| } |
| |
| // Result returns the most recent result generated by a call to Next. |
| func (q *Query) Result() *benchfmt.Result { |
| return q.br.Result() |
| } |
| |
| // Err returns the first error encountered during the query. |
| func (q *Query) Err() error { |
| if q.err != nil { |
| return q.err |
| } |
| return q.br.Err() |
| } |
| |
| // Close frees resources associated with the query. |
| func (q *Query) Close() error { |
| if q.body != nil { |
| q.body.Close() |
| q.body = nil |
| } |
| return q.err |
| } |
| |
| // UploadInfo represents an upload summary. |
| type UploadInfo struct { |
| Count int |
| UploadID string |
| LabelValues benchfmt.Labels `json:",omitempty"` |
| } |
| |
| // ListUploads searches for uploads containing results matching the given query string. |
| // The query may be empty, in which case all uploads will be returned. |
| // extraLabels specifies other labels to be retrieved. |
| // If limit is 0, no limit will be provided to the server. |
| // The uploads are returned starting with the most recent upload. |
| func (c *Client) ListUploads(ctx context.Context, q string, extraLabels []string, limit int) *UploadList { |
| hc := c.httpClient() |
| |
| v := url.Values{"extra_label": extraLabels} |
| if q != "" { |
| v["q"] = []string{q} |
| } |
| if limit != 0 { |
| v["limit"] = []string{fmt.Sprintf("%d", limit)} |
| } |
| |
| u := c.BaseURL + "/uploads" |
| if len(v) > 0 { |
| u += "?" + v.Encode() |
| } |
| resp, err := ctxhttp.Get(ctx, hc, u) |
| if err != nil { |
| return &UploadList{err: err} |
| } |
| if resp.StatusCode != 200 { |
| body, err := ioutil.ReadAll(resp.Body) |
| if err != nil { |
| return &UploadList{err: err} |
| } |
| return &UploadList{err: fmt.Errorf("%s", body)} |
| } |
| return &UploadList{body: resp.Body, dec: json.NewDecoder(resp.Body)} |
| } |
| |
| // UploadList is the result of ListUploads. |
| // Use Next to advance through the rows, making sure to call Close when done: |
| // |
| // q := db.ListUploads("key:value") |
| // defer q.Close() |
| // for q.Next() { |
| // id, count := q.Row() |
| // labels := q.LabelValues() |
| // ... |
| // } |
| // err = q.Err() // get any error encountered during iteration |
| // ... |
| type UploadList struct { |
| body io.Closer |
| dec *json.Decoder |
| // from last call to Next |
| ui UploadInfo |
| err error |
| } |
| |
| // Next prepares the next result for reading with the Result |
| // method. It returns false when there are no more results, either by |
| // reaching the end of the input or an error. |
| func (ul *UploadList) Next() bool { |
| if ul.err != nil { |
| return false |
| } |
| |
| // Clear UploadInfo before decoding new value. |
| ul.ui = UploadInfo{} |
| |
| ul.err = ul.dec.Decode(&ul.ui) |
| return ul.err == nil |
| } |
| |
| // Info returns the most recent UploadInfo generated by a call to Next. |
| func (ul *UploadList) Info() UploadInfo { |
| return ul.ui |
| } |
| |
| // Err returns the error state of the query. |
| func (ul *UploadList) Err() error { |
| if ul.err == io.EOF { |
| return nil |
| } |
| return ul.err |
| } |
| |
| // Close frees resources associated with the query. |
| func (ul *UploadList) Close() error { |
| if ul.body != nil { |
| err := ul.body.Close() |
| ul.body = nil |
| return err |
| } |
| return ul.Err() |
| } |
| |
| // NewUpload starts a new upload to the storage server. |
| // The upload must have Abort or Commit called on it. |
| // If the server requires authentication for uploads, c.HTTPClient should be set to the result of oauth2.NewClient. |
| func (c *Client) NewUpload(ctx context.Context) *Upload { |
| hc := c.httpClient() |
| |
| pr, pw := io.Pipe() |
| mpw := multipart.NewWriter(pw) |
| |
| req, err := http.NewRequest("POST", c.BaseURL+"/upload", pr) |
| if err != nil { |
| return &Upload{err: err} |
| } |
| req.Header.Set("Content-Type", mpw.FormDataContentType()) |
| req.Header.Set("User-Agent", "golang.org/x/perf/storage") |
| errCh := make(chan error) |
| u := &Upload{pw: pw, mpw: mpw, errCh: errCh} |
| go func() { |
| resp, err := ctxhttp.Do(ctx, hc, req) |
| if err != nil { |
| errCh <- err |
| return |
| } |
| defer resp.Body.Close() |
| if resp.StatusCode != 200 { |
| body, _ := ioutil.ReadAll(resp.Body) |
| errCh <- fmt.Errorf("upload failed: %v\n%s", resp.Status, body) |
| return |
| } |
| status := &UploadStatus{} |
| if err := json.NewDecoder(resp.Body).Decode(status); err != nil { |
| errCh <- err |
| } |
| u.status = status |
| errCh <- nil |
| }() |
| return u |
| } |
| |
| // UploadStatus contains information about a successful upload. |
| type UploadStatus struct { |
| // UploadID is the upload ID assigned to the upload. |
| UploadID string `json:"uploadid"` |
| // FileIDs is the list of file IDs assigned to the files in the upload. |
| FileIDs []string `json:"fileids"` |
| // ViewURL is a server-supplied URL to view the results. |
| ViewURL string `json:"viewurl"` |
| } |
| |
| // An Upload is an in-progress upload. |
| // Use CreateFile to upload one or more files, then call Commit or Abort. |
| // |
| // u := client.NewUpload() |
| // w, err := u.CreateFile() |
| // if err != nil { |
| // u.Abort() |
| // return err |
| // } |
| // fmt.Fprintf(w, "BenchmarkResult 1 1 ns/op\n") |
| // if err := u.Commit(); err != nil { |
| // return err |
| // } |
| type Upload struct { |
| pw io.WriteCloser |
| mpw *multipart.Writer |
| status *UploadStatus |
| // errCh is used to report the success/failure of the HTTP request |
| errCh chan error |
| // err is the first observed error; it is only accessed from user-called methods for thread safety |
| err error |
| } |
| |
| // CreateFile creates a new upload with the given name. |
| // The Writer may be used until CreateFile is called again. |
| // name may be the empty string if the file does not have a name. |
| func (u *Upload) CreateFile(name string) (io.Writer, error) { |
| if u.err != nil { |
| return nil, u.err |
| } |
| return u.mpw.CreateFormFile("file", name) |
| } |
| |
| // Commit attempts to commit the upload. |
| func (u *Upload) Commit() (*UploadStatus, error) { |
| if u.err != nil { |
| return nil, u.err |
| } |
| if u.err = u.mpw.WriteField("commit", "1"); u.err != nil { |
| u.Abort() |
| return nil, u.err |
| } |
| if u.err = u.mpw.Close(); u.err != nil { |
| u.Abort() |
| return nil, u.err |
| } |
| u.mpw = nil |
| if u.err = u.pw.Close(); u.err != nil { |
| u.Abort() |
| return nil, u.err |
| } |
| u.pw = nil |
| u.err = <-u.errCh |
| u.errCh = nil |
| if u.err != nil { |
| return nil, u.err |
| } |
| return u.status, nil |
| } |
| |
| // Abort attempts to cancel the in-progress upload. |
| func (u *Upload) Abort() error { |
| if u.mpw != nil { |
| u.mpw.WriteField("abort", "1") |
| // Writing the 'abort' field will cause the server to send back an error response. |
| u.mpw.Close() |
| u.mpw = nil |
| } |
| if u.pw != nil { |
| u.pw.Close() |
| u.pw = nil |
| } |
| err := <-u.errCh |
| u.errCh = nil |
| if u.err == nil { |
| u.err = err |
| } |
| return u.err |
| } |