cmd/ejobs, internal/analysis, etc.: support fetching BQ results
`ejobs results` will download the results of a job to JSON.
That is supported by a new worker route, "/jobs/results?jobid=J",
which reads the analysis table in BigQuery using the binary info
now stored in the job.
Change-Id: Iecbf61fba8f841574e7b5cb570dbbe02e6b706fd
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/512135
Reviewed-by: Maceo Thompson <maceothompson@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
diff --git a/cmd/ejobs/main.go b/cmd/ejobs/main.go
index 519b7bf..18c748c 100644
--- a/cmd/ejobs/main.go
+++ b/cmd/ejobs/main.go
@@ -28,6 +28,7 @@
"cloud.google.com/go/storage"
"golang.org/x/oauth2"
+ "golang.org/x/pkgsite-metrics/internal/analysis"
"golang.org/x/pkgsite-metrics/internal/jobs"
"google.golang.org/api/impersonate"
"google.golang.org/api/option"
@@ -467,7 +468,7 @@
if !force && done < job.NumEnqueued {
return fmt.Errorf("job not finished (%d/%d completed); use -f for partial results", done, job.NumEnqueued)
}
- results, err := requestJSON[jobs.Results](ctx, "jobs/results?jobid="+jobID, ts)
+ results, err := requestJSON[[]*analysis.Result](ctx, "jobs/results?jobid="+jobID, ts)
if err != nil {
return err
}
diff --git a/internal/analysis/analysis.go b/internal/analysis/analysis.go
index 6c3fbf3..1a80752 100644
--- a/internal/analysis/analysis.go
+++ b/internal/analysis/analysis.go
@@ -271,3 +271,27 @@
}
return diags
}
+
+func ReadResults(ctx context.Context, c *bigquery.Client, binaryName, binaryVersion, binaryArgs string) (_ []*Result, err error) {
+ defer derrors.Wrap(&err, "ReadResults")
+ q := bigquery.PartitionQuery{
+ From: c.FullTableName(TableName),
+ PartitionOn: "module_path, version",
+ Where: fmt.Sprintf("binary_name='%s' AND binary_version='%s' AND binary_args='%s'",
+ binaryName, binaryVersion, binaryArgs),
+ OrderBy: "created_at DESC",
+ }
+ iter, err := c.Query(ctx, q.String())
+ if err != nil {
+ return nil, err
+ }
+ var res []*Result
+ err = bigquery.ForEachRow(iter, func(r *Result) bool {
+ res = append(res, r)
+ return true
+ })
+ if err != nil {
+ return nil, err
+ }
+ return res, nil
+}
diff --git a/internal/analysis/analysis_test.go b/internal/analysis/analysis_test.go
index dd4dcdc..fc67459 100644
--- a/internal/analysis/analysis_test.go
+++ b/internal/analysis/analysis_test.go
@@ -39,5 +39,4 @@
if diff := cmp.Diff(want, got); diff != "" {
t.Errorf("mismatch (-want, +got)\n%s", diff)
}
-
}
diff --git a/internal/bigquery/bigquery.go b/internal/bigquery/bigquery.go
index d5ff368..5f51c6f 100644
--- a/internal/bigquery/bigquery.go
+++ b/internal/bigquery/bigquery.go
@@ -343,6 +343,7 @@
From string // should use full table name
Columns string // comma-separated columns to select, or "*" ("" => "*")
PartitionOn string // comma-separated columns defining the partition
+ Where string // WHERE clause
OrderBy string // text after ORDER BY: comma-separated columns, each
// optionally followed by DESC or ASC
}
@@ -363,13 +364,18 @@
ORDER BY %s
) AS rownum
FROM %s
+ %s
) WHERE rownum = 1
`
cols := q.Columns
if cols == "" {
cols = "*"
}
- return fmt.Sprintf(qf, cols, q.PartitionOn, q.OrderBy, q.From)
+ where := q.Where
+ if where != "" {
+ where = "WHERE " + where
+ }
+ return fmt.Sprintf(qf, cols, q.PartitionOn, q.OrderBy, q.From, where)
}
// InferSchema is a copy of cloud.google.com/go/bigquery.InferSchema so
diff --git a/internal/bigquery/bigquery_test.go b/internal/bigquery/bigquery_test.go
index ac5df97..d3e846e 100644
--- a/internal/bigquery/bigquery_test.go
+++ b/internal/bigquery/bigquery_test.go
@@ -6,6 +6,7 @@
import (
"context"
+ "strings"
"testing"
bq "cloud.google.com/go/bigquery"
@@ -25,3 +26,47 @@
t.Errorf("got false, want true for %v", err)
}
}
+
+func TestPartitionQuery(t *testing.T) {
+ // Remove newlines and extra white
+ clean := func(s string) string {
+ return strings.Join(strings.Fields(s), " ")
+ }
+
+ for i, test := range []struct {
+ q PartitionQuery
+ want string
+ }{
+ {
+ PartitionQuery{
+ From: "full.table",
+ Columns: "*",
+ PartitionOn: "p",
+ OrderBy: "o",
+ },
+ `SELECT * EXCEPT (rownum)
+ FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY p ORDER BY o ) AS rownum
+ FROM full.table ) WHERE rownum = 1`,
+ },
+ {
+ PartitionQuery{
+ From: "full.table",
+ Columns: "a, b, c",
+ PartitionOn: "p",
+ OrderBy: "o",
+ Where: "name = 'foo' AND args = 'bar baz'",
+ },
+ `SELECT * EXCEPT (rownum)
+ FROM ( SELECT a, b, c, ROW_NUMBER() OVER ( PARTITION BY p ORDER BY o ) AS rownum
+ FROM full.table
+ WHERE name = 'foo' AND args = 'bar baz'
+ ) WHERE rownum = 1`,
+ },
+ } {
+ got := clean(test.q.String())
+ want := clean(test.want)
+ if got != want {
+ t.Errorf("#%d:\ngot %s\nwant %s", i, got, want)
+ }
+ }
+}
diff --git a/internal/jobs/job.go b/internal/jobs/job.go
index 5e09912..dd07e36 100644
--- a/internal/jobs/job.go
+++ b/internal/jobs/job.go
@@ -7,8 +7,6 @@
import (
"time"
-
- "golang.org/x/pkgsite-metrics/internal/analysis"
)
// A Job is a set of related scan tasks enqueued at the same time.
@@ -51,10 +49,3 @@
func (j *Job) NumFinished() int {
return j.NumSkipped + j.NumFailed + j.NumErrored + j.NumSucceeded
}
-
-// Results hold the results of a job.
-type Results struct {
- JobID string
- Table string // bigquery table containing results
- Results []*analysis.Result
-}
diff --git a/internal/worker/jobs.go b/internal/worker/jobs.go
index 7c56abb..fe0e30e 100644
--- a/internal/worker/jobs.go
+++ b/internal/worker/jobs.go
@@ -23,6 +23,7 @@
"strings"
"time"
+ "golang.org/x/pkgsite-metrics/internal/analysis"
"golang.org/x/pkgsite-metrics/internal/derrors"
"golang.org/x/pkgsite-metrics/internal/jobs"
)
@@ -36,7 +37,7 @@
}
jobID := r.FormValue("jobid")
- return processJobRequest(ctx, w, r.URL.Path, jobID, s.jobDB)
+ return s.processJobRequest(ctx, w, r.URL.Path, jobID, s.jobDB)
}
type jobDB interface {
@@ -46,7 +47,7 @@
ListJobs(context.Context, func(*jobs.Job, time.Time) error) error
}
-func processJobRequest(ctx context.Context, w io.Writer, path, jobID string, db jobDB) error {
+func (s *Server) processJobRequest(ctx context.Context, w io.Writer, path, jobID string, db jobDB) error {
path = strings.TrimPrefix(path, "/jobs/")
switch path {
case "describe": // describe one job
@@ -79,6 +80,23 @@
}
return writeJSON(w, joblist)
+ case "results":
+ if jobID == "" {
+ return fmt.Errorf("missing jobid: %w", derrors.InvalidArgument)
+ }
+ job, err := db.GetJob(ctx, jobID)
+ if err != nil {
+ return err
+ }
+ if s.bqClient == nil {
+ return errors.New("bq client is nil")
+ }
+ results, err := analysis.ReadResults(ctx, s.bqClient, job.Binary, job.BinaryVersion, job.BinaryArgs)
+ if err != nil {
+ return err
+ }
+ return writeJSON(w, results)
+
default:
return fmt.Errorf("unknown path %q: %w", path, derrors.InvalidArgument)
}
diff --git a/internal/worker/jobs_test.go b/internal/worker/jobs_test.go
index d5a5ceb..f474f6c 100644
--- a/internal/worker/jobs_test.go
+++ b/internal/worker/jobs_test.go
@@ -28,8 +28,9 @@
if err := db.CreateJob(ctx, job); err != nil {
t.Fatal(err)
}
+ s := &Server{}
var buf bytes.Buffer
- if err := processJobRequest(ctx, &buf, "/jobs/describe", job.ID(), db); err != nil {
+ if err := s.processJobRequest(ctx, &buf, "/jobs/describe", job.ID(), db); err != nil {
t.Fatal(err)
}
@@ -41,7 +42,7 @@
t.Errorf("got\n%+v\nwant\n%+v", got, job)
}
- if err := processJobRequest(ctx, &buf, "/jobs/cancel", job.ID(), db); err != nil {
+ if err := s.processJobRequest(ctx, &buf, "/jobs/cancel", job.ID(), db); err != nil {
t.Fatal(err)
}
@@ -54,7 +55,7 @@
}
buf.Reset()
- if err := processJobRequest(ctx, &buf, "/jobs/list", "", db); err != nil {
+ if err := s.processJobRequest(ctx, &buf, "/jobs/list", "", db); err != nil {
t.Fatal(err)
}
// Don't check for specific output, just make sure there's something