cmd/ejobs, internal/analysis, etc.: support fetching BQ results

`ejobs results` will download the results of a job to JSON.

That is supported by a new worker route, "/jobs/results?jobid=J",
which reads the analysis table in BigQuery using the binary info
now stored in the job.

Change-Id: Iecbf61fba8f841574e7b5cb570dbbe02e6b706fd
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/512135
Reviewed-by: Maceo Thompson <maceothompson@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
diff --git a/cmd/ejobs/main.go b/cmd/ejobs/main.go
index 519b7bf..18c748c 100644
--- a/cmd/ejobs/main.go
+++ b/cmd/ejobs/main.go
@@ -28,6 +28,7 @@
 
 	"cloud.google.com/go/storage"
 	"golang.org/x/oauth2"
+	"golang.org/x/pkgsite-metrics/internal/analysis"
 	"golang.org/x/pkgsite-metrics/internal/jobs"
 	"google.golang.org/api/impersonate"
 	"google.golang.org/api/option"
@@ -467,7 +468,7 @@
 	if !force && done < job.NumEnqueued {
 		return fmt.Errorf("job not finished (%d/%d completed); use -f for partial results", done, job.NumEnqueued)
 	}
-	results, err := requestJSON[jobs.Results](ctx, "jobs/results?jobid="+jobID, ts)
+	results, err := requestJSON[[]*analysis.Result](ctx, "jobs/results?jobid="+jobID, ts)
 	if err != nil {
 		return err
 	}
diff --git a/internal/analysis/analysis.go b/internal/analysis/analysis.go
index 6c3fbf3..1a80752 100644
--- a/internal/analysis/analysis.go
+++ b/internal/analysis/analysis.go
@@ -271,3 +271,27 @@
 	}
 	return diags
 }
+
+func ReadResults(ctx context.Context, c *bigquery.Client, binaryName, binaryVersion, binaryArgs string) (_ []*Result, err error) {
+	defer derrors.Wrap(&err, "ReadResults")
+	q := bigquery.PartitionQuery{
+		From:        c.FullTableName(TableName),
+		PartitionOn: "module_path, version",
+		Where: fmt.Sprintf("binary_name='%s' AND binary_version='%s' AND binary_args='%s'",
+			binaryName, binaryVersion, binaryArgs),
+		OrderBy: "created_at DESC",
+	}
+	iter, err := c.Query(ctx, q.String())
+	if err != nil {
+		return nil, err
+	}
+	var res []*Result
+	err = bigquery.ForEachRow(iter, func(r *Result) bool {
+		res = append(res, r)
+		return true
+	})
+	if err != nil {
+		return nil, err
+	}
+	return res, nil
+}
diff --git a/internal/analysis/analysis_test.go b/internal/analysis/analysis_test.go
index dd4dcdc..fc67459 100644
--- a/internal/analysis/analysis_test.go
+++ b/internal/analysis/analysis_test.go
@@ -39,5 +39,4 @@
 	if diff := cmp.Diff(want, got); diff != "" {
 		t.Errorf("mismatch (-want, +got)\n%s", diff)
 	}
-
 }
diff --git a/internal/bigquery/bigquery.go b/internal/bigquery/bigquery.go
index d5ff368..5f51c6f 100644
--- a/internal/bigquery/bigquery.go
+++ b/internal/bigquery/bigquery.go
@@ -343,6 +343,7 @@
 	From        string // should use full table name
 	Columns     string // comma-separated columns to select, or "*" ("" => "*")
 	PartitionOn string // comma-separated columns defining the partition
+	Where       string // WHERE clause
 	OrderBy     string // text after ORDER BY: comma-separated columns, each
 	// optionally followed by DESC or ASC
 }
@@ -363,13 +364,18 @@
 				ORDER BY %s
 			) AS rownum
 			FROM %s
+			%s
 		) WHERE rownum = 1
 	`
 	cols := q.Columns
 	if cols == "" {
 		cols = "*"
 	}
-	return fmt.Sprintf(qf, cols, q.PartitionOn, q.OrderBy, q.From)
+	where := q.Where
+	if where != "" {
+		where = "WHERE " + where
+	}
+	return fmt.Sprintf(qf, cols, q.PartitionOn, q.OrderBy, q.From, where)
 }
 
 // InferSchema is a copy of cloud.google.com/go/bigquery.InferSchema so
diff --git a/internal/bigquery/bigquery_test.go b/internal/bigquery/bigquery_test.go
index ac5df97..d3e846e 100644
--- a/internal/bigquery/bigquery_test.go
+++ b/internal/bigquery/bigquery_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"context"
+	"strings"
 	"testing"
 
 	bq "cloud.google.com/go/bigquery"
@@ -25,3 +26,47 @@
 		t.Errorf("got false, want true for %v", err)
 	}
 }
+
+func TestPartitionQuery(t *testing.T) {
+	// Remove newlines and extra white
+	clean := func(s string) string {
+		return strings.Join(strings.Fields(s), " ")
+	}
+
+	for i, test := range []struct {
+		q    PartitionQuery
+		want string
+	}{
+		{
+			PartitionQuery{
+				From:        "full.table",
+				Columns:     "*",
+				PartitionOn: "p",
+				OrderBy:     "o",
+			},
+			`SELECT * EXCEPT (rownum)
+				 FROM ( SELECT *, ROW_NUMBER() OVER ( PARTITION BY p ORDER BY o ) AS rownum
+				 FROM full.table ) WHERE rownum  = 1`,
+		},
+		{
+			PartitionQuery{
+				From:        "full.table",
+				Columns:     "a, b, c",
+				PartitionOn: "p",
+				OrderBy:     "o",
+				Where:       "name = 'foo' AND args = 'bar baz'",
+			},
+			`SELECT * EXCEPT (rownum)
+				 FROM ( SELECT a, b, c, ROW_NUMBER() OVER ( PARTITION BY p ORDER BY o ) AS rownum
+				 FROM full.table
+				 WHERE name = 'foo' AND args = 'bar baz'
+				) WHERE rownum  = 1`,
+		},
+	} {
+		got := clean(test.q.String())
+		want := clean(test.want)
+		if got != want {
+			t.Errorf("#%d:\ngot  %s\nwant %s", i, got, want)
+		}
+	}
+}
diff --git a/internal/jobs/job.go b/internal/jobs/job.go
index 5e09912..dd07e36 100644
--- a/internal/jobs/job.go
+++ b/internal/jobs/job.go
@@ -7,8 +7,6 @@
 
 import (
 	"time"
-
-	"golang.org/x/pkgsite-metrics/internal/analysis"
 )
 
 // A Job is a set of related scan tasks enqueued at the same time.
@@ -51,10 +49,3 @@
 func (j *Job) NumFinished() int {
 	return j.NumSkipped + j.NumFailed + j.NumErrored + j.NumSucceeded
 }
-
-// Results hold the results of a job.
-type Results struct {
-	JobID   string
-	Table   string // bigquery table containing results
-	Results []*analysis.Result
-}
diff --git a/internal/worker/jobs.go b/internal/worker/jobs.go
index 7c56abb..fe0e30e 100644
--- a/internal/worker/jobs.go
+++ b/internal/worker/jobs.go
@@ -23,6 +23,7 @@
 	"strings"
 	"time"
 
+	"golang.org/x/pkgsite-metrics/internal/analysis"
 	"golang.org/x/pkgsite-metrics/internal/derrors"
 	"golang.org/x/pkgsite-metrics/internal/jobs"
 )
@@ -36,7 +37,7 @@
 	}
 
 	jobID := r.FormValue("jobid")
-	return processJobRequest(ctx, w, r.URL.Path, jobID, s.jobDB)
+	return s.processJobRequest(ctx, w, r.URL.Path, jobID, s.jobDB)
 }
 
 type jobDB interface {
@@ -46,7 +47,7 @@
 	ListJobs(context.Context, func(*jobs.Job, time.Time) error) error
 }
 
-func processJobRequest(ctx context.Context, w io.Writer, path, jobID string, db jobDB) error {
+func (s *Server) processJobRequest(ctx context.Context, w io.Writer, path, jobID string, db jobDB) error {
 	path = strings.TrimPrefix(path, "/jobs/")
 	switch path {
 	case "describe": // describe one job
@@ -79,6 +80,23 @@
 		}
 		return writeJSON(w, joblist)
 
+	case "results":
+		if jobID == "" {
+			return fmt.Errorf("missing jobid: %w", derrors.InvalidArgument)
+		}
+		job, err := db.GetJob(ctx, jobID)
+		if err != nil {
+			return err
+		}
+		if s.bqClient == nil {
+			return errors.New("bq client is nil")
+		}
+		results, err := analysis.ReadResults(ctx, s.bqClient, job.Binary, job.BinaryVersion, job.BinaryArgs)
+		if err != nil {
+			return err
+		}
+		return writeJSON(w, results)
+
 	default:
 		return fmt.Errorf("unknown path %q: %w", path, derrors.InvalidArgument)
 	}
diff --git a/internal/worker/jobs_test.go b/internal/worker/jobs_test.go
index d5a5ceb..f474f6c 100644
--- a/internal/worker/jobs_test.go
+++ b/internal/worker/jobs_test.go
@@ -28,8 +28,9 @@
 	if err := db.CreateJob(ctx, job); err != nil {
 		t.Fatal(err)
 	}
+	s := &Server{}
 	var buf bytes.Buffer
-	if err := processJobRequest(ctx, &buf, "/jobs/describe", job.ID(), db); err != nil {
+	if err := s.processJobRequest(ctx, &buf, "/jobs/describe", job.ID(), db); err != nil {
 		t.Fatal(err)
 	}
 
@@ -41,7 +42,7 @@
 		t.Errorf("got\n%+v\nwant\n%+v", got, job)
 	}
 
-	if err := processJobRequest(ctx, &buf, "/jobs/cancel", job.ID(), db); err != nil {
+	if err := s.processJobRequest(ctx, &buf, "/jobs/cancel", job.ID(), db); err != nil {
 		t.Fatal(err)
 	}
 
@@ -54,7 +55,7 @@
 	}
 
 	buf.Reset()
-	if err := processJobRequest(ctx, &buf, "/jobs/list", "", db); err != nil {
+	if err := s.processJobRequest(ctx, &buf, "/jobs/list", "", db); err != nil {
 		t.Fatal(err)
 	}
 	// Don't check for specific output, just make sure there's something