internal/bigquery: upload analysis results to BigQuery

Support uploading the results of an analysis to BigQuery.

Change-Id: I3b40552ebc28e53f1e0c7f36c63912e8020be0af
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/472655
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Zvonimir Pavlinovic <zpavlinovic@google.com>
diff --git a/internal/bigquery/analysis.go b/internal/bigquery/analysis.go
new file mode 100644
index 0000000..6b796aa
--- /dev/null
+++ b/internal/bigquery/analysis.go
@@ -0,0 +1,106 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package bigquery
+
+import (
+	"context"
+	"time"
+
+	bq "cloud.google.com/go/bigquery"
+	"golang.org/x/pkgsite-metrics/internal/derrors"
+)
+
+const AnalysisTableName = "analysis"
+
+// Note: before modifying AnalysisResult, make sure the change
+// is a valid schema modification.
+// The only supported changes are:
+//   - adding a nullable or repeated column
+//   - dropping a column
+//   - changing a column from required to nullable.
+// See https://cloud.google.com/bigquery/docs/managing-table-schemas for details.
+
+// AnalysisResult is a row in the BigQuery analysis table. It corresponds to a
+// result from the output for an analysis.
+type AnalysisResult struct {
+	CreatedAt   time.Time `bigquery:"created_at"`
+	ModulePath  string    `bigquery:"module_path"`
+	Version     string    `bigquery:"version"`
+	SortVersion string    `bigquery:"sort_version"`
+	CommitTime  time.Time `bigquery:"commit_time"`
+	// The name of the analysis binary that was executed.
+	// A single binary may run multiple analyzers.
+	BinaryName          string `bigquery:"binary_name"`
+	Error               string `bigquery:"error"`
+	ErrorCategory       string `bigquery:"error_category"`
+	AnalysisWorkVersion        // InferSchema flattens embedded fields
+
+	Diagnostics []*Diagnostic `bigquery:"diagnostic"`
+}
+
+func (r *AnalysisResult) AddError(err error) {
+	if err == nil {
+		return
+	}
+	r.Error = err.Error()
+	r.ErrorCategory = derrors.CategorizeError(err)
+}
+
+// AnalysisWorkVersion contains information that can be used to avoid duplicate work.
+// Given two AnalysisWorkVersion values v1 and v2 for the same module path and version,
+// if v1 == v2 then it is not necessary to scan the module.
+type AnalysisWorkVersion struct {
+	// A hash of the  binary executed.
+	BinaryVersion string `bigquery:"binary_version"`
+	BinaryArgs    string `bigquery:"binary_args"` // args passed to binary
+	// The version of the currently running code. This tracks changes in the
+	// logic of module scanning and processing.
+	WorkerVersion string `bigquery:"worker_version"`
+	// The version of the bigquery schema.
+	SchemaVersion string ` bigquery:"schema_version"`
+}
+
+// A Diagnostic is a single analyzer finding.
+type Diagnostic struct {
+	// The package ID as reported by the analysis binary.
+	PackageID    string `bigquery:"package_id"`
+	AnalyzerName string `bigquery:"analyzer_name"`
+	Error        string `bigquery:"error"`
+	// These fields are from internal/worker.JSONDiagnostic.
+	Category string `bigquery:"category"`
+	Position string `bigquery:"position"`
+	Message  string `bigquery:"message"`
+}
+
+// AnalysisSchemaVersion changes whenever the analysis schema changes.
+var AnalysisSchemaVersion string
+
+func init() {
+	s, err := bq.InferSchema(AnalysisResult{})
+	if err != nil {
+		panic(err)
+	}
+	AnalysisSchemaVersion = schemaVersion(s)
+	addTable(AnalysisTableName, s)
+}
+
+// ReadAnalysisWorkVersions reads the most recent WorkVersions in the analysis table.
+func ReadAnalysisWorkVersions(ctx context.Context, c *Client) (_ map[[2]string]*AnalysisWorkVersion, err error) {
+	defer derrors.Wrap(&err, "ReadAnalysisWorkVersions")
+	m := map[[2]string]*AnalysisWorkVersion{}
+	query := partitionQuery(c.FullTableName(AnalysisTableName), "module_path, sort_version", "created_at DESC")
+	iter, err := c.Query(ctx, query)
+	if err != nil {
+		return nil, err
+	}
+	err = ForEachRow(iter, func(r *AnalysisResult) bool {
+		m[[2]string{r.ModulePath, r.Version}] = &r.AnalysisWorkVersion
+		return true
+	})
+	if err != nil {
+		return nil, err
+	}
+	return m, nil
+}
diff --git a/internal/worker/analysis.go b/internal/worker/analysis.go
index 54648a7..f4fd42a 100644
--- a/internal/worker/analysis.go
+++ b/internal/worker/analysis.go
@@ -6,22 +6,30 @@
 
 import (
 	"context"
+	"crypto/sha256"
+	"encoding/hex"
 	"encoding/json"
 	"errors"
 	"fmt"
+	"io"
 	"net/http"
 	"os"
 	"os/exec"
 	"path"
+	"path/filepath"
+	"sort"
 	"strings"
 
 	"cloud.google.com/go/storage"
+	"golang.org/x/exp/maps"
+	"golang.org/x/pkgsite-metrics/internal/bigquery"
 	"golang.org/x/pkgsite-metrics/internal/derrors"
 	"golang.org/x/pkgsite-metrics/internal/log"
 	"golang.org/x/pkgsite-metrics/internal/modules"
 	"golang.org/x/pkgsite-metrics/internal/queue"
 	"golang.org/x/pkgsite-metrics/internal/sandbox"
 	"golang.org/x/pkgsite-metrics/internal/scan"
+	"golang.org/x/pkgsite-metrics/internal/version"
 )
 
 type analysisServer struct {
@@ -78,65 +86,101 @@
 	if err != nil {
 		return fmt.Errorf("%w: %v", derrors.InvalidArgument, err)
 	}
-	jsonTree, err := s.scan(ctx, req)
+	jsonTree, binaryHash, err := s.scan(ctx, req)
 	if err != nil {
 		return err
 	}
-	out, err := json.Marshal(jsonTree)
-	if err != nil {
+
+	if req.Serve {
+		out, err := json.Marshal(jsonTree)
+		if err != nil {
+			return err
+		}
+		_, err = w.Write(out)
 		return err
 	}
-	_, err = w.Write(out)
-	return err
+	return s.writeToBigQuery(ctx, req, jsonTree, binaryHash)
 }
 
 const sandboxRoot = "/bundle/rootfs"
 
-func (s *analysisServer) scan(ctx context.Context, req *analysisRequest) (_ JSONTree, err error) {
+func (s *analysisServer) scan(ctx context.Context, req *analysisRequest) (_ JSONTree, binaryHash []byte, err error) {
 	if req.Binary == "" {
-		return nil, fmt.Errorf("%w: analysis: missing binary", derrors.InvalidArgument)
-	}
-	if !req.Serve {
-		return nil, fmt.Errorf("%w: analysis: writing to BigQuery unimplemented", derrors.InvalidArgument)
+		return nil, nil, fmt.Errorf("%w: analysis: missing binary", derrors.InvalidArgument)
 	}
 	if req.Suffix != "" {
-		return nil, fmt.Errorf("%w: analysis: only implemented for whole modules (no suffix)", derrors.InvalidArgument)
+		return nil, nil, fmt.Errorf("%w: analysis: only implemented for whole modules (no suffix)", derrors.InvalidArgument)
 	}
 
-	destPath := path.Join(sandboxRoot, "binaries", path.Base(req.Binary))
-	if err := copyBinary(ctx, destPath, req.Binary, s.cfg.BinaryBucket); err != nil {
-		return nil, err
+	var tempDir string
+	if req.Insecure {
+		tempDir, err = os.MkdirTemp("", "analysis")
+		if err != nil {
+			return nil, nil, err
+		}
+		defer func() {
+			err1 := os.RemoveAll(tempDir)
+			if err == nil {
+				err = err1
+			}
+		}()
 	}
+
+	var destPath string
+	if req.Insecure {
+		destPath = filepath.Join(tempDir, "binary")
+	} else {
+		destPath = path.Join(sandboxRoot, "binaries", path.Base(req.Binary))
+	}
+	if err := copyBinary(ctx, destPath, req.Binary, s.cfg.BinaryBucket); err != nil {
+		return nil, nil, err
+	}
+	binaryHash, err = hashFile(destPath)
+	if err != nil {
+		return nil, nil, err
+	}
+
 	if !req.Insecure {
 		sandboxDir, cleanup, err := downloadModuleSandbox(ctx, req.Module, req.Version, s.proxyClient)
 		if err != nil {
-			return nil, err
+			return nil, nil, err
 		}
 		defer cleanup()
 		log.Infof(ctx, "running %s on %s@%s in sandbox", req.Binary, req.Module, req.Version)
 		sbox := sandbox.New("/bundle")
 		sbox.Runsc = "/usr/local/bin/runsc"
-		return runAnalysisBinary(sbox, strings.TrimPrefix(destPath, sandboxRoot), req.Args, sandboxDir)
+		tree, err := runAnalysisBinary(sbox, strings.TrimPrefix(destPath, sandboxRoot), req.Args, sandboxDir)
+		if err != nil {
+			return nil, nil, err
+		}
+		return tree, binaryHash, nil
 	}
 	// Insecure mode.
 	// Download the module.
-	tempDir, err := os.MkdirTemp("", "analysis")
-	if err != nil {
-		return nil, err
-	}
-	defer func() {
-		err1 := os.RemoveAll(tempDir)
-		if err == nil {
-			err = err1
-		}
-	}()
-
 	log.Debugf(ctx, "fetching module zip: %s@%s", req.Module, req.Version)
 	const stripModulePrefix = true
 	if err := modules.Download(ctx, req.Module, req.Version, tempDir, s.proxyClient, stripModulePrefix); err != nil {
+		return nil, nil, err
+	}
+	tree, err := runAnalysisBinary(nil, destPath, req.Args, tempDir)
+	if err != nil {
+		return nil, nil, err
+	}
+	return tree, binaryHash, nil
+}
+
+func hashFile(filename string) (_ []byte, err error) {
+	defer derrors.Wrap(&err, "hashFile(%q)", filename)
+	f, err := os.Open(filename)
+	if err != nil {
 		return nil, err
 	}
-	return runAnalysisBinary(nil, destPath, req.Args, tempDir)
+	defer f.Close()
+	h := sha256.New()
+	if _, err := io.Copy(h, f); err != nil {
+		return nil, err
+	}
+	return h.Sum(nil), nil
 }
 
 // copyBinary copies a binary from srcPath to destPath.
@@ -233,3 +277,76 @@
 type jsonError struct {
 	Err string `json:"error"`
 }
+
+func (s *analysisServer) writeToBigQuery(ctx context.Context, req *analysisRequest, jsonTree JSONTree, binaryHash []byte) (err error) {
+	defer derrors.Wrap(&err, "analysisServer.writeToBigQuery(%q, %q)", req.Module, req.Version)
+	row := &bigquery.AnalysisResult{
+		ModulePath: req.Module,
+		BinaryName: req.Binary,
+		AnalysisWorkVersion: bigquery.AnalysisWorkVersion{
+			BinaryVersion: hex.EncodeToString(binaryHash),
+			BinaryArgs:    req.Args,
+			WorkerVersion: s.cfg.VersionID,
+			SchemaVersion: bigquery.AnalysisSchemaVersion,
+		},
+	}
+	info, err := s.proxyClient.Info(ctx, req.Module, req.Version)
+	if err != nil {
+		log.Errorf(ctx, err, "proxy error")
+		row.AddError(fmt.Errorf("%v: %w", err, derrors.ProxyError))
+		return nil
+	}
+	row.Version = info.Version
+	row.SortVersion = version.ForSorting(row.Version)
+	row.CommitTime = info.Time
+
+	row.Diagnostics = jsonTreeToDiagnostics(jsonTree)
+	if s.bqClient == nil {
+		log.Infof(ctx, "bigquery disabled, not uploading")
+	} else {
+		log.Infof(ctx, "uploading to bigquery: %s", req.Path())
+		if err := s.bqClient.Upload(ctx, bigquery.AnalysisTableName, row); err != nil {
+			// This is often caused by:
+			// "Upload: googleapi: got HTTP response code 413 with body"
+			// which happens for some modules.
+			row.AddError(fmt.Errorf("%v: %w", err, derrors.BigQueryError))
+			log.Errorf(ctx, err, "bq.Upload for %s", req.Path())
+		}
+	}
+	return nil
+}
+
+// jsonTreeToDiagnostics converts a jsonTree to a list of diagnostics for BigQuery.
+// It ignores the suggested fixes of the diagnostics.
+func jsonTreeToDiagnostics(jsonTree JSONTree) []*bigquery.Diagnostic {
+	var diags []*bigquery.Diagnostic
+	// Sort for determinism.
+	pkgIDs := maps.Keys(jsonTree)
+	sort.Strings(pkgIDs)
+	for _, pkgID := range pkgIDs {
+		amap := jsonTree[pkgID]
+		aNames := maps.Keys(amap)
+		sort.Strings(aNames)
+		for _, aName := range aNames {
+			diagsOrErr := amap[aName]
+			if diagsOrErr.Error != nil {
+				diags = append(diags, &bigquery.Diagnostic{
+					PackageID:    pkgID,
+					AnalyzerName: aName,
+					Error:        diagsOrErr.Error.Err,
+				})
+			} else {
+				for _, jd := range diagsOrErr.Diagnostics {
+					diags = append(diags, &bigquery.Diagnostic{
+						PackageID:    pkgID,
+						AnalyzerName: aName,
+						Category:     jd.Category,
+						Position:     jd.Posn,
+						Message:      jd.Message,
+					})
+				}
+			}
+		}
+	}
+	return diags
+}
diff --git a/internal/worker/analysis_test.go b/internal/worker/analysis_test.go
index 94d8c1c..a9ab991 100644
--- a/internal/worker/analysis_test.go
+++ b/internal/worker/analysis_test.go
@@ -9,6 +9,7 @@
 	"testing"
 
 	"github.com/google/go-cmp/cmp"
+	"golang.org/x/pkgsite-metrics/internal/bigquery"
 	"golang.org/x/pkgsite-metrics/internal/buildtest"
 )
 
@@ -54,3 +55,35 @@
 		t.Errorf("mismatch (-want, +got):\n%s", diff)
 	}
 }
+
+func TestJSONTreeToDiagnostics(t *testing.T) {
+	in := JSONTree{
+		"pkg1": {
+			"a": {
+				Diagnostics: []JSONDiagnostic{
+					{Category: "c1", Posn: "pos1", Message: "m1"},
+					{Category: "c2", Posn: "pos2", Message: "m2"},
+				},
+			},
+			"b": {
+				Diagnostics: []JSONDiagnostic{{Category: "c3", Posn: "pos3", Message: "m3"}},
+			},
+		},
+		"pkg2": {
+			"c": {
+				Error: &jsonError{Err: "fail"},
+			},
+		},
+	}
+	got := jsonTreeToDiagnostics(in)
+	want := []*bigquery.Diagnostic{
+		{PackageID: "pkg1", AnalyzerName: "a", Category: "c1", Position: "pos1", Message: "m1"},
+		{PackageID: "pkg1", AnalyzerName: "a", Category: "c2", Position: "pos2", Message: "m2"},
+		{PackageID: "pkg1", AnalyzerName: "b", Category: "c3", Position: "pos3", Message: "m3"},
+		{PackageID: "pkg2", AnalyzerName: "c", Error: "fail"},
+	}
+	if diff := cmp.Diff(want, got); diff != "" {
+		t.Errorf("mismatch (-want, +got)\n%s", diff)
+	}
+
+}