blob: 591d8117d727730a2db7ac68955df999487fe79f [file] [log] [blame]
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package storage contains a client for the performance data storage server.
package storage
import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"mime/multipart"
"net/http"
"net/url"
"golang.org/x/net/context"
"golang.org/x/net/context/ctxhttp"
"golang.org/x/perf/storage/benchfmt"
)
// A Client issues queries to a performance data storage server.
// It is safe to use from multiple goroutines simultaneously.
type Client struct {
// BaseURL is the base URL of the storage server.
BaseURL string
// HTTPClient is the HTTP client for sending requests. If nil, http.DefaultClient will be used.
HTTPClient *http.Client
}
// httpClient returns the http.Client to use for requests.
func (c *Client) httpClient() *http.Client {
if c.HTTPClient != nil {
return c.HTTPClient
}
return http.DefaultClient
}
// Query searches for results matching the given query string.
//
// The query string is first parsed into quoted words (as in the shell)
// and then each word must be formatted as one of the following:
// key:value - exact match on label "key" = "value"
// key>value - value greater than (useful for dates)
// key<value - value less than (also useful for dates)
func (c *Client) Query(ctx context.Context, q string) *Query {
hc := c.httpClient()
resp, err := ctxhttp.Get(ctx, hc, c.BaseURL+"/search?"+url.Values{"q": []string{q}}.Encode())
if err != nil {
return &Query{err: err}
}
if resp.StatusCode != 200 {
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return &Query{err: err}
}
return &Query{err: fmt.Errorf("%s", body)}
}
br := benchfmt.NewReader(resp.Body)
return &Query{br: br, body: resp.Body}
}
// A Query allows iteration over the results of a search query.
// Use Next to advance through the results, making sure to call Close when done:
//
// q := client.Query("key:value")
// defer q.Close()
// for q.Next() {
// res := q.Result()
// ...
// }
// if err = q.Err(); err != nil {
// // handle error encountered during query
// }
type Query struct {
br *benchfmt.Reader
body io.ReadCloser
err error
}
// Next prepares the next result for reading with the Result
// method. It returns false when there are no more results, either by
// reaching the end of the input or an error.
func (q *Query) Next() bool {
if q.err != nil {
return false
}
return q.br.Next()
}
// Result returns the most recent result generated by a call to Next.
func (q *Query) Result() *benchfmt.Result {
return q.br.Result()
}
// Err returns the first error encountered during the query.
func (q *Query) Err() error {
if q.err != nil {
return q.err
}
return q.br.Err()
}
// Close frees resources associated with the query.
func (q *Query) Close() error {
if q.body != nil {
q.body.Close()
q.body = nil
}
return q.err
}
// UploadInfo represents an upload summary.
type UploadInfo struct {
Count int
UploadID string
LabelValues benchfmt.Labels `json:",omitempty"`
}
// ListUploads searches for uploads containing results matching the given query string.
// The query may be empty, in which case all uploads will be returned.
// extraLabels specifies other labels to be retrieved.
// If limit is 0, no limit will be provided to the server.
// The uploads are returned starting with the most recent upload.
func (c *Client) ListUploads(ctx context.Context, q string, extraLabels []string, limit int) *UploadList {
hc := c.httpClient()
v := url.Values{"extra_label": extraLabels}
if q != "" {
v["q"] = []string{q}
}
if limit != 0 {
v["limit"] = []string{fmt.Sprintf("%d", limit)}
}
u := c.BaseURL + "/uploads"
if len(v) > 0 {
u += "?" + v.Encode()
}
resp, err := ctxhttp.Get(ctx, hc, u)
if err != nil {
return &UploadList{err: err}
}
if resp.StatusCode != 200 {
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return &UploadList{err: err}
}
return &UploadList{err: fmt.Errorf("%s", body)}
}
return &UploadList{body: resp.Body, dec: json.NewDecoder(resp.Body)}
}
// UploadList is the result of ListUploads.
// Use Next to advance through the rows, making sure to call Close when done:
//
// q := db.ListUploads("key:value")
// defer q.Close()
// for q.Next() {
// id, count := q.Row()
// labels := q.LabelValues()
// ...
// }
// err = q.Err() // get any error encountered during iteration
// ...
type UploadList struct {
body io.Closer
dec *json.Decoder
// from last call to Next
ui UploadInfo
err error
}
// Next prepares the next result for reading with the Result
// method. It returns false when there are no more results, either by
// reaching the end of the input or an error.
func (ul *UploadList) Next() bool {
if ul.err != nil {
return false
}
// Clear UploadInfo before decoding new value.
ul.ui = UploadInfo{}
ul.err = ul.dec.Decode(&ul.ui)
return ul.err == nil
}
// Info returns the most recent UploadInfo generated by a call to Next.
func (ul *UploadList) Info() UploadInfo {
return ul.ui
}
// Err returns the error state of the query.
func (ul *UploadList) Err() error {
if ul.err == io.EOF {
return nil
}
return ul.err
}
// Close frees resources associated with the query.
func (ul *UploadList) Close() error {
if ul.body != nil {
return ul.body.Close()
ul.body = nil
}
return ul.Err()
}
// NewUpload starts a new upload to the storage server.
// The upload must have Abort or Commit called on it.
// If the server requires authentication for uploads, c.HTTPClient should be set to the result of oauth2.NewClient.
func (c *Client) NewUpload(ctx context.Context) *Upload {
hc := c.httpClient()
pr, pw := io.Pipe()
mpw := multipart.NewWriter(pw)
req, err := http.NewRequest("POST", c.BaseURL+"/upload", pr)
if err != nil {
return &Upload{err: err}
}
req.Header.Set("Content-Type", mpw.FormDataContentType())
req.Header.Set("User-Agent", "golang.org/x/perf/storage")
errCh := make(chan error)
u := &Upload{pw: pw, mpw: mpw, errCh: errCh}
go func() {
resp, err := ctxhttp.Do(ctx, hc, req)
if err != nil {
errCh <- err
return
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
body, _ := ioutil.ReadAll(resp.Body)
errCh <- fmt.Errorf("upload failed: %v\n%s", resp.Status, body)
return
}
status := &UploadStatus{}
if err := json.NewDecoder(resp.Body).Decode(status); err != nil {
errCh <- err
}
u.status = status
errCh <- nil
}()
return u
}
// UploadStatus contains information about a successful upload.
type UploadStatus struct {
// UploadID is the upload ID assigned to the upload.
UploadID string `json:"uploadid"`
// FileIDs is the list of file IDs assigned to the files in the upload.
FileIDs []string `json:"fileids"`
// ViewURL is a server-supplied URL to view the results.
ViewURL string `json:"viewurl"`
}
// An Upload is an in-progress upload.
// Use CreateFile to upload one or more files, then call Commit or Abort.
//
// u := client.NewUpload()
// w, err := u.CreateFile()
// if err != nil {
// u.Abort()
// return err
// }
// fmt.Fprintf(w, "BenchmarkResult 1 1 ns/op\n")
// if err := u.Commit(); err != nil {
// return err
// }
type Upload struct {
pw io.WriteCloser
mpw *multipart.Writer
status *UploadStatus
// errCh is used to report the success/failure of the HTTP request
errCh chan error
// err is the first observed error; it is only accessed from user-called methods for thread safety
err error
}
// CreateFile creates a new upload with the given name.
// The Writer may be used until CreateFile is called again.
// name may be the empty string if the file does not have a name.
func (u *Upload) CreateFile(name string) (io.Writer, error) {
if u.err != nil {
return nil, u.err
}
return u.mpw.CreateFormFile("file", name)
}
// Commit attempts to commit the upload.
func (u *Upload) Commit() (*UploadStatus, error) {
if u.err != nil {
return nil, u.err
}
if u.err = u.mpw.WriteField("commit", "1"); u.err != nil {
u.Abort()
return nil, u.err
}
if u.err = u.mpw.Close(); u.err != nil {
u.Abort()
return nil, u.err
}
u.mpw = nil
if u.err = u.pw.Close(); u.err != nil {
u.Abort()
return nil, u.err
}
u.pw = nil
u.err = <-u.errCh
u.errCh = nil
if u.err != nil {
return nil, u.err
}
return u.status, nil
}
// Abort attempts to cancel the in-progress upload.
func (u *Upload) Abort() error {
if u.mpw != nil {
u.mpw.WriteField("abort", "1")
// Writing the 'abort' field will cause the server to send back an error response.
u.mpw.Close()
u.mpw = nil
}
if u.pw != nil {
u.pw.Close()
u.pw = nil
}
err := <-u.errCh
u.errCh = nil
if u.err == nil {
u.err = err
}
return u.err
}