internal/bigquery: upload analysis results to BigQuery
Support uploading the results of an analysis to BigQuery.
Change-Id: I3b40552ebc28e53f1e0c7f36c63912e8020be0af
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/472655
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Zvonimir Pavlinovic <zpavlinovic@google.com>
diff --git a/internal/bigquery/analysis.go b/internal/bigquery/analysis.go
new file mode 100644
index 0000000..6b796aa
--- /dev/null
+++ b/internal/bigquery/analysis.go
@@ -0,0 +1,106 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package bigquery
+
+import (
+ "context"
+ "time"
+
+ bq "cloud.google.com/go/bigquery"
+ "golang.org/x/pkgsite-metrics/internal/derrors"
+)
+
+const AnalysisTableName = "analysis"
+
+// Note: before modifying AnalysisResult, make sure the change
+// is a valid schema modification.
+// The only supported changes are:
+// - adding a nullable or repeated column
+// - dropping a column
+// - changing a column from required to nullable.
+// See https://cloud.google.com/bigquery/docs/managing-table-schemas for details.
+
+// AnalysisResult is a row in the BigQuery analysis table. It corresponds to a
+// result from the output for an analysis.
+type AnalysisResult struct {
+ CreatedAt time.Time `bigquery:"created_at"`
+ ModulePath string `bigquery:"module_path"`
+ Version string `bigquery:"version"`
+ SortVersion string `bigquery:"sort_version"`
+ CommitTime time.Time `bigquery:"commit_time"`
+ // The name of the analysis binary that was executed.
+ // A single binary may run multiple analyzers.
+ BinaryName string `bigquery:"binary_name"`
+ Error string `bigquery:"error"`
+ ErrorCategory string `bigquery:"error_category"`
+ AnalysisWorkVersion // InferSchema flattens embedded fields
+
+ Diagnostics []*Diagnostic `bigquery:"diagnostic"`
+}
+
+func (r *AnalysisResult) AddError(err error) {
+ if err == nil {
+ return
+ }
+ r.Error = err.Error()
+ r.ErrorCategory = derrors.CategorizeError(err)
+}
+
+// AnalysisWorkVersion contains information that can be used to avoid duplicate work.
+// Given two AnalysisWorkVersion values v1 and v2 for the same module path and version,
+// if v1 == v2 then it is not necessary to scan the module.
+type AnalysisWorkVersion struct {
+ // A hash of the binary executed.
+ BinaryVersion string `bigquery:"binary_version"`
+ BinaryArgs string `bigquery:"binary_args"` // args passed to binary
+ // The version of the currently running code. This tracks changes in the
+ // logic of module scanning and processing.
+ WorkerVersion string `bigquery:"worker_version"`
+ // The version of the bigquery schema.
+ SchemaVersion string ` bigquery:"schema_version"`
+}
+
+// A Diagnostic is a single analyzer finding.
+type Diagnostic struct {
+ // The package ID as reported by the analysis binary.
+ PackageID string `bigquery:"package_id"`
+ AnalyzerName string `bigquery:"analyzer_name"`
+ Error string `bigquery:"error"`
+ // These fields are from internal/worker.JSONDiagnostic.
+ Category string `bigquery:"category"`
+ Position string `bigquery:"position"`
+ Message string `bigquery:"message"`
+}
+
+// AnalysisSchemaVersion changes whenever the analysis schema changes.
+var AnalysisSchemaVersion string
+
+func init() {
+ s, err := bq.InferSchema(AnalysisResult{})
+ if err != nil {
+ panic(err)
+ }
+ AnalysisSchemaVersion = schemaVersion(s)
+ addTable(AnalysisTableName, s)
+}
+
+// ReadAnalysisWorkVersions reads the most recent WorkVersions in the analysis table.
+func ReadAnalysisWorkVersions(ctx context.Context, c *Client) (_ map[[2]string]*AnalysisWorkVersion, err error) {
+ defer derrors.Wrap(&err, "ReadAnalysisWorkVersions")
+ m := map[[2]string]*AnalysisWorkVersion{}
+ query := partitionQuery(c.FullTableName(AnalysisTableName), "module_path, sort_version", "created_at DESC")
+ iter, err := c.Query(ctx, query)
+ if err != nil {
+ return nil, err
+ }
+ err = ForEachRow(iter, func(r *AnalysisResult) bool {
+ m[[2]string{r.ModulePath, r.Version}] = &r.AnalysisWorkVersion
+ return true
+ })
+ if err != nil {
+ return nil, err
+ }
+ return m, nil
+}
diff --git a/internal/worker/analysis.go b/internal/worker/analysis.go
index 54648a7..f4fd42a 100644
--- a/internal/worker/analysis.go
+++ b/internal/worker/analysis.go
@@ -6,22 +6,30 @@
import (
"context"
+ "crypto/sha256"
+ "encoding/hex"
"encoding/json"
"errors"
"fmt"
+ "io"
"net/http"
"os"
"os/exec"
"path"
+ "path/filepath"
+ "sort"
"strings"
"cloud.google.com/go/storage"
+ "golang.org/x/exp/maps"
+ "golang.org/x/pkgsite-metrics/internal/bigquery"
"golang.org/x/pkgsite-metrics/internal/derrors"
"golang.org/x/pkgsite-metrics/internal/log"
"golang.org/x/pkgsite-metrics/internal/modules"
"golang.org/x/pkgsite-metrics/internal/queue"
"golang.org/x/pkgsite-metrics/internal/sandbox"
"golang.org/x/pkgsite-metrics/internal/scan"
+ "golang.org/x/pkgsite-metrics/internal/version"
)
type analysisServer struct {
@@ -78,65 +86,101 @@
if err != nil {
return fmt.Errorf("%w: %v", derrors.InvalidArgument, err)
}
- jsonTree, err := s.scan(ctx, req)
+ jsonTree, binaryHash, err := s.scan(ctx, req)
if err != nil {
return err
}
- out, err := json.Marshal(jsonTree)
- if err != nil {
+
+ if req.Serve {
+ out, err := json.Marshal(jsonTree)
+ if err != nil {
+ return err
+ }
+ _, err = w.Write(out)
return err
}
- _, err = w.Write(out)
- return err
+ return s.writeToBigQuery(ctx, req, jsonTree, binaryHash)
}
const sandboxRoot = "/bundle/rootfs"
-func (s *analysisServer) scan(ctx context.Context, req *analysisRequest) (_ JSONTree, err error) {
+func (s *analysisServer) scan(ctx context.Context, req *analysisRequest) (_ JSONTree, binaryHash []byte, err error) {
if req.Binary == "" {
- return nil, fmt.Errorf("%w: analysis: missing binary", derrors.InvalidArgument)
- }
- if !req.Serve {
- return nil, fmt.Errorf("%w: analysis: writing to BigQuery unimplemented", derrors.InvalidArgument)
+ return nil, nil, fmt.Errorf("%w: analysis: missing binary", derrors.InvalidArgument)
}
if req.Suffix != "" {
- return nil, fmt.Errorf("%w: analysis: only implemented for whole modules (no suffix)", derrors.InvalidArgument)
+ return nil, nil, fmt.Errorf("%w: analysis: only implemented for whole modules (no suffix)", derrors.InvalidArgument)
}
- destPath := path.Join(sandboxRoot, "binaries", path.Base(req.Binary))
- if err := copyBinary(ctx, destPath, req.Binary, s.cfg.BinaryBucket); err != nil {
- return nil, err
+ var tempDir string
+ if req.Insecure {
+ tempDir, err = os.MkdirTemp("", "analysis")
+ if err != nil {
+ return nil, nil, err
+ }
+ defer func() {
+ err1 := os.RemoveAll(tempDir)
+ if err == nil {
+ err = err1
+ }
+ }()
}
+
+ var destPath string
+ if req.Insecure {
+ destPath = filepath.Join(tempDir, "binary")
+ } else {
+ destPath = path.Join(sandboxRoot, "binaries", path.Base(req.Binary))
+ }
+ if err := copyBinary(ctx, destPath, req.Binary, s.cfg.BinaryBucket); err != nil {
+ return nil, nil, err
+ }
+ binaryHash, err = hashFile(destPath)
+ if err != nil {
+ return nil, nil, err
+ }
+
if !req.Insecure {
sandboxDir, cleanup, err := downloadModuleSandbox(ctx, req.Module, req.Version, s.proxyClient)
if err != nil {
- return nil, err
+ return nil, nil, err
}
defer cleanup()
log.Infof(ctx, "running %s on %s@%s in sandbox", req.Binary, req.Module, req.Version)
sbox := sandbox.New("/bundle")
sbox.Runsc = "/usr/local/bin/runsc"
- return runAnalysisBinary(sbox, strings.TrimPrefix(destPath, sandboxRoot), req.Args, sandboxDir)
+ tree, err := runAnalysisBinary(sbox, strings.TrimPrefix(destPath, sandboxRoot), req.Args, sandboxDir)
+ if err != nil {
+ return nil, nil, err
+ }
+ return tree, binaryHash, nil
}
// Insecure mode.
// Download the module.
- tempDir, err := os.MkdirTemp("", "analysis")
- if err != nil {
- return nil, err
- }
- defer func() {
- err1 := os.RemoveAll(tempDir)
- if err == nil {
- err = err1
- }
- }()
-
log.Debugf(ctx, "fetching module zip: %s@%s", req.Module, req.Version)
const stripModulePrefix = true
if err := modules.Download(ctx, req.Module, req.Version, tempDir, s.proxyClient, stripModulePrefix); err != nil {
+ return nil, nil, err
+ }
+ tree, err := runAnalysisBinary(nil, destPath, req.Args, tempDir)
+ if err != nil {
+ return nil, nil, err
+ }
+ return tree, binaryHash, nil
+}
+
+func hashFile(filename string) (_ []byte, err error) {
+ defer derrors.Wrap(&err, "hashFile(%q)", filename)
+ f, err := os.Open(filename)
+ if err != nil {
return nil, err
}
- return runAnalysisBinary(nil, destPath, req.Args, tempDir)
+ defer f.Close()
+ h := sha256.New()
+ if _, err := io.Copy(h, f); err != nil {
+ return nil, err
+ }
+ return h.Sum(nil), nil
}
// copyBinary copies a binary from srcPath to destPath.
@@ -233,3 +277,76 @@
type jsonError struct {
Err string `json:"error"`
}
+
+func (s *analysisServer) writeToBigQuery(ctx context.Context, req *analysisRequest, jsonTree JSONTree, binaryHash []byte) (err error) {
+ defer derrors.Wrap(&err, "analysisServer.writeToBigQuery(%q, %q)", req.Module, req.Version)
+ row := &bigquery.AnalysisResult{
+ ModulePath: req.Module,
+ BinaryName: req.Binary,
+ AnalysisWorkVersion: bigquery.AnalysisWorkVersion{
+ BinaryVersion: hex.EncodeToString(binaryHash),
+ BinaryArgs: req.Args,
+ WorkerVersion: s.cfg.VersionID,
+ SchemaVersion: bigquery.AnalysisSchemaVersion,
+ },
+ }
+ info, err := s.proxyClient.Info(ctx, req.Module, req.Version)
+ if err != nil {
+ log.Errorf(ctx, err, "proxy error")
+ row.AddError(fmt.Errorf("%v: %w", err, derrors.ProxyError))
+ return nil
+ }
+ row.Version = info.Version
+ row.SortVersion = version.ForSorting(row.Version)
+ row.CommitTime = info.Time
+
+ row.Diagnostics = jsonTreeToDiagnostics(jsonTree)
+ if s.bqClient == nil {
+ log.Infof(ctx, "bigquery disabled, not uploading")
+ } else {
+ log.Infof(ctx, "uploading to bigquery: %s", req.Path())
+ if err := s.bqClient.Upload(ctx, bigquery.AnalysisTableName, row); err != nil {
+ // This is often caused by:
+ // "Upload: googleapi: got HTTP response code 413 with body"
+ // which happens for some modules.
+ row.AddError(fmt.Errorf("%v: %w", err, derrors.BigQueryError))
+ log.Errorf(ctx, err, "bq.Upload for %s", req.Path())
+ }
+ }
+ return nil
+}
+
+// jsonTreeToDiagnostics converts a jsonTree to a list of diagnostics for BigQuery.
+// It ignores the suggested fixes of the diagnostics.
+func jsonTreeToDiagnostics(jsonTree JSONTree) []*bigquery.Diagnostic {
+ var diags []*bigquery.Diagnostic
+ // Sort for determinism.
+ pkgIDs := maps.Keys(jsonTree)
+ sort.Strings(pkgIDs)
+ for _, pkgID := range pkgIDs {
+ amap := jsonTree[pkgID]
+ aNames := maps.Keys(amap)
+ sort.Strings(aNames)
+ for _, aName := range aNames {
+ diagsOrErr := amap[aName]
+ if diagsOrErr.Error != nil {
+ diags = append(diags, &bigquery.Diagnostic{
+ PackageID: pkgID,
+ AnalyzerName: aName,
+ Error: diagsOrErr.Error.Err,
+ })
+ } else {
+ for _, jd := range diagsOrErr.Diagnostics {
+ diags = append(diags, &bigquery.Diagnostic{
+ PackageID: pkgID,
+ AnalyzerName: aName,
+ Category: jd.Category,
+ Position: jd.Posn,
+ Message: jd.Message,
+ })
+ }
+ }
+ }
+ }
+ return diags
+}
diff --git a/internal/worker/analysis_test.go b/internal/worker/analysis_test.go
index 94d8c1c..a9ab991 100644
--- a/internal/worker/analysis_test.go
+++ b/internal/worker/analysis_test.go
@@ -9,6 +9,7 @@
"testing"
"github.com/google/go-cmp/cmp"
+ "golang.org/x/pkgsite-metrics/internal/bigquery"
"golang.org/x/pkgsite-metrics/internal/buildtest"
)
@@ -54,3 +55,35 @@
t.Errorf("mismatch (-want, +got):\n%s", diff)
}
}
+
+func TestJSONTreeToDiagnostics(t *testing.T) {
+ in := JSONTree{
+ "pkg1": {
+ "a": {
+ Diagnostics: []JSONDiagnostic{
+ {Category: "c1", Posn: "pos1", Message: "m1"},
+ {Category: "c2", Posn: "pos2", Message: "m2"},
+ },
+ },
+ "b": {
+ Diagnostics: []JSONDiagnostic{{Category: "c3", Posn: "pos3", Message: "m3"}},
+ },
+ },
+ "pkg2": {
+ "c": {
+ Error: &jsonError{Err: "fail"},
+ },
+ },
+ }
+ got := jsonTreeToDiagnostics(in)
+ want := []*bigquery.Diagnostic{
+ {PackageID: "pkg1", AnalyzerName: "a", Category: "c1", Position: "pos1", Message: "m1"},
+ {PackageID: "pkg1", AnalyzerName: "a", Category: "c2", Position: "pos2", Message: "m2"},
+ {PackageID: "pkg1", AnalyzerName: "b", Category: "c3", Position: "pos3", Message: "m3"},
+ {PackageID: "pkg2", AnalyzerName: "c", Error: "fail"},
+ }
+ if diff := cmp.Diff(want, got); diff != "" {
+ t.Errorf("mismatch (-want, +got)\n%s", diff)
+ }
+
+}