internal/worker: check analysis work version

Before running an analysis scan, check if the result is already
in BigQuery.

As with govulncheck, we do this by storing a WorkVersion for
each analysis, keyed by module path, version, and binary. If the work
version of the incoming work matches a stored one that is read at
startup, then the scan can be skipped.

This required some refactoring to download the binary earlier,
so its hash can be computed before scanning starts.

Change-Id: I24506c7669ed282224c987d0700123820dd64ab4
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/476955
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Zvonimir Pavlinovic <zpavlinovic@google.com>
diff --git a/internal/analysis/analysis.go b/internal/analysis/analysis.go
index 3c2130e..9a86823 100644
--- a/internal/analysis/analysis.go
+++ b/internal/analysis/analysis.go
@@ -196,14 +196,22 @@
 	bigquery.AddTable(TableName, s)
 }
 
+// WorkVersionKey is the key for a WorkVersion.
+// Always compare two WorkVersions with the same key.
+type WorkVersionKey struct {
+	Module  string
+	Version string
+	Binary  string
+}
+
 // ReadWorkVersions reads the most recent WorkVersions in the analysis table.
-func ReadWorkVersions(ctx context.Context, c *bigquery.Client) (_ map[[2]string]*WorkVersion, err error) {
+func ReadWorkVersions(ctx context.Context, c *bigquery.Client) (_ map[WorkVersionKey]WorkVersion, err error) {
 	defer derrors.Wrap(&err, "ReadWorkVersions")
-	m := map[[2]string]*WorkVersion{}
+	m := map[WorkVersionKey]WorkVersion{}
 	query := bigquery.PartitionQuery{
 		Table:       c.FullTableName(TableName),
-		Columns:     "module_path, version, binary_version, binary_args, worker_version, schema_version",
-		PartitionOn: "module_path, sort_version",
+		Columns:     "module_path, version, binary_name, binary_version, binary_args, worker_version, schema_version",
+		PartitionOn: "module_path, sort_version, binary_name",
 		OrderBy:     "created_at DESC",
 	}.String()
 	iter, err := c.Query(ctx, query)
@@ -211,7 +219,7 @@
 		return nil, err
 	}
 	err = bigquery.ForEachRow(iter, func(r *Result) bool {
-		m[[2]string{r.ModulePath, r.Version}] = &r.WorkVersion
+		m[WorkVersionKey{r.ModulePath, r.Version, r.BinaryName}] = r.WorkVersion
 		return true
 	})
 	if err != nil {
diff --git a/internal/worker/analysis.go b/internal/worker/analysis.go
index 5b991de..df25c77 100644
--- a/internal/worker/analysis.go
+++ b/internal/worker/analysis.go
@@ -18,8 +18,10 @@
 	"path"
 	"strings"
 
+	"cloud.google.com/go/storage"
 	"golang.org/x/pkgsite-metrics/internal/analysis"
 	"golang.org/x/pkgsite-metrics/internal/derrors"
+	"golang.org/x/pkgsite-metrics/internal/log"
 	"golang.org/x/pkgsite-metrics/internal/queue"
 	"golang.org/x/pkgsite-metrics/internal/sandbox"
 	"golang.org/x/pkgsite-metrics/internal/scan"
@@ -28,7 +30,32 @@
 
 type analysisServer struct {
 	*Server
-	openFile openFileFunc // Used to open binary files from GCS, except for testing.
+	openFile           openFileFunc // Used to open binary files from GCS, except for testing.
+	storedWorkVersions map[analysis.WorkVersionKey]analysis.WorkVersion
+}
+
+func newAnalysisServer(ctx context.Context, s *Server) (*analysisServer, error) {
+	if s.cfg.BinaryBucket == "" {
+		return nil, errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
+	}
+	c, err := storage.NewClient(ctx)
+	if err != nil {
+		return nil, err
+	}
+	bucket := c.Bucket(s.cfg.BinaryBucket)
+	var wvs map[analysis.WorkVersionKey]analysis.WorkVersion
+	if s.bqClient != nil {
+		wvs, err = analysis.ReadWorkVersions(ctx, s.bqClient)
+		if err != nil {
+			return nil, err
+		}
+		log.Infof(ctx, "read %d work versions", len(wvs))
+	}
+	return &analysisServer{
+		Server:             s,
+		openFile:           gcsOpenFileFunc(ctx, bucket),
+		storedWorkVersions: wvs,
+	}, nil
 }
 
 const analysisBinariesBucketDir = "analysis-binaries"
@@ -41,35 +68,54 @@
 	if err != nil {
 		return fmt.Errorf("%w: %v", derrors.InvalidArgument, err)
 	}
-	if req.Binary == "" {
-		return fmt.Errorf("%w: analysis: missing binary", derrors.InvalidArgument)
-	}
 	if req.Suffix != "" {
 		return fmt.Errorf("%w: analysis: only implemented for whole modules (no suffix)", derrors.InvalidArgument)
 	}
+	if req.Binary == "" {
+		return fmt.Errorf("%w: analysis: missing binary", derrors.InvalidArgument)
+	}
+	if req.Binary != path.Base(req.Binary) {
+		return fmt.Errorf("%w: analysis: binary name contains slashes (must be a basename)", derrors.InvalidArgument)
+	}
+	localBinaryPath := path.Join(binaryDir, req.Binary)
+	srcPath := path.Join(analysisBinariesBucketDir, req.Binary)
+	const executable = true
+	if err := copyToLocalFile(localBinaryPath, executable, srcPath, s.openFile); err != nil {
+		return err
+	}
+	defer cleanup(&err, func() error { return os.Remove(localBinaryPath) })
 
-	row := s.scan(ctx, req)
+	binaryHash, err := hashFile(localBinaryPath)
+	if err != nil {
+		return err
+	}
+	wv := analysis.WorkVersion{
+		BinaryArgs:    req.Args,
+		WorkerVersion: s.cfg.VersionID,
+		SchemaVersion: analysis.SchemaVersion,
+		BinaryVersion: hex.EncodeToString(binaryHash),
+	}
+	key := analysis.WorkVersionKey{Module: req.Module, Version: req.Version, Binary: req.Binary}
+	if wv == s.storedWorkVersions[key] {
+		log.Infof(ctx, "skipping (work version unchanged): %+v", key)
+		return nil
+	}
+	row := s.scan(ctx, req, localBinaryPath, wv)
 	return writeResult(ctx, req.Serve, w, s.bqClient, analysis.TableName, row)
 }
 
-func (s *analysisServer) scan(ctx context.Context, req *analysis.ScanRequest) *analysis.Result {
+func (s *analysisServer) scan(ctx context.Context, req *analysis.ScanRequest, localBinaryPath string, wv analysis.WorkVersion) *analysis.Result {
 	row := &analysis.Result{
-		ModulePath: req.Module,
-		Version:    req.Version,
-		BinaryName: req.Binary,
-		WorkVersion: analysis.WorkVersion{
-			BinaryArgs:    req.Args,
-			WorkerVersion: s.cfg.VersionID,
-			SchemaVersion: analysis.SchemaVersion,
-		},
+		ModulePath:  req.Module,
+		Version:     req.Version,
+		BinaryName:  req.Binary,
+		WorkVersion: wv,
 	}
-
 	err := doScan(ctx, req.Module, req.Version, req.Insecure, func() error {
-		jsonTree, binaryHash, err := s.scanInternal(ctx, req)
+		jsonTree, err := s.scanInternal(ctx, req, localBinaryPath)
 		if err != nil {
 			return err
 		}
-		row.WorkVersion.BinaryVersion = hex.EncodeToString(binaryHash)
 		info, err := s.proxyClient.Info(ctx, req.Module, req.Version)
 		if err != nil {
 			return fmt.Errorf("%w: %v", derrors.ProxyError, err)
@@ -86,38 +132,22 @@
 	return row
 }
 
-func (s *analysisServer) scanInternal(ctx context.Context, req *analysis.ScanRequest) (jt analysis.JSONTree, binaryHash []byte, err error) {
-	if s.cfg.BinaryBucket == "" {
-		return nil, nil, errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
-	}
-	destPath := path.Join(binaryDir, path.Base(req.Binary))
-	srcPath := path.Join(analysisBinariesBucketDir, req.Binary)
-	if err := copyToLocalFile(destPath, true, srcPath, s.openFile); err != nil {
-		return nil, nil, err
-	}
-	defer cleanup(&err, func() error { return os.Remove(destPath) })
-
-	binaryHash, err = hashFile(destPath)
-	if err != nil {
-		return nil, nil, err
-	}
-
+func (s *analysisServer) scanInternal(ctx context.Context, req *analysis.ScanRequest, binaryPath string) (jt analysis.JSONTree, err error) {
 	mdir := moduleDir(req.Module, req.Version)
 	defer cleanup(&err, func() error { return os.RemoveAll(mdir) })
 	if err := prepareModule(ctx, req.Module, req.Version, mdir, s.proxyClient, req.Insecure); err != nil {
-		return nil, nil, err
+		return nil, err
 	}
 	var sbox *sandbox.Sandbox
 	if !req.Insecure {
 		sbox = sandbox.New("/bundle")
 		sbox.Runsc = "/usr/local/bin/runsc"
-		destPath = strings.TrimPrefix(destPath, sandboxRoot)
 	}
-	tree, err := runAnalysisBinary(sbox, destPath, req.Args, mdir)
+	tree, err := runAnalysisBinary(sbox, binaryPath, req.Args, mdir)
 	if err != nil {
-		return nil, nil, err
+		return nil, err
 	}
-	return tree, binaryHash, nil
+	return tree, nil
 }
 
 func hashFile(filename string) (_ []byte, err error) {
diff --git a/internal/worker/analysis_test.go b/internal/worker/analysis_test.go
index a0b77ed..9a965da 100644
--- a/internal/worker/analysis_test.go
+++ b/internal/worker/analysis_test.go
@@ -6,8 +6,6 @@
 
 import (
 	"context"
-	"errors"
-	"io"
 	"os"
 	"path/filepath"
 	"strings"
@@ -122,7 +120,6 @@
 	diff := func(want, got *analysis.Result) {
 		t.Helper()
 		d := cmp.Diff(want, got,
-			cmpopts.IgnoreFields(analysis.WorkVersion{}, "BinaryVersion", "SchemaVersion"),
 			cmpopts.IgnoreFields(analysis.Diagnostic{}, "Position"))
 		if d != "" {
 			t.Errorf("mismatch (-want, +got)\n%s", d)
@@ -136,12 +133,6 @@
 				BinaryBucket: "unused",
 			},
 		},
-		openFile: func(name string) (io.ReadCloser, error) {
-			if name == "analysis-binaries/analyzer" {
-				return os.Open(binaryPath)
-			}
-			return nil, errors.New("bad name")
-		},
 	}
 	req := &analysis.ScanRequest{
 		ModuleURLPath: scan.ModuleURLPath{Module: modulePath, Version: version},
@@ -151,14 +142,15 @@
 			Insecure: true,
 		},
 	}
-	got := s.scan(context.Background(), req)
+	wv := analysis.WorkVersion{BinaryArgs: "-name G", BinaryVersion: "bv", SchemaVersion: "sv"}
+	got := s.scan(context.Background(), req, binaryPath, wv)
 	want := &analysis.Result{
 		ModulePath:    modulePath,
 		Version:       version,
 		SortVersion:   "1,2,3~",
 		CommitTime:    proxytest.CommitTime,
 		BinaryName:    "analyzer",
-		WorkVersion:   analysis.WorkVersion{BinaryArgs: "-name G"},
+		WorkVersion:   wv,
 		Error:         "",
 		ErrorCategory: "",
 		Diagnostics: []*analysis.Diagnostic{
@@ -173,7 +165,7 @@
 
 	// Test that errors are put into the Result.
 	req.Binary = "bad"
-	got = s.scan(context.Background(), req)
+	got = s.scan(context.Background(), req, "yyy", wv)
 	// Trim varying part of error.
 	if i := strings.LastIndexByte(got.Error, ':'); i > 0 {
 		got.Error = got.Error[i+2:]
@@ -183,9 +175,9 @@
 		Version:       version,
 		SortVersion:   "1,2,3~",
 		BinaryName:    "bad",
-		WorkVersion:   analysis.WorkVersion{BinaryArgs: "-name G"},
+		WorkVersion:   wv,
 		ErrorCategory: "MISC",
-		Error:         "bad name",
+		Error:         "executable file not found in $PATH",
 	}
 	diff(want, got)
 }
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 57b0adc..cfaea7e 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -16,7 +16,6 @@
 	"time"
 
 	"cloud.google.com/go/errorreporting"
-	"cloud.google.com/go/storage"
 	"github.com/google/safehtml/template"
 	"golang.org/x/pkgsite-metrics/internal/bigquery"
 	"golang.org/x/pkgsite-metrics/internal/config"
@@ -179,19 +178,10 @@
 }
 
 func (s *Server) registerAnalysisHandlers(ctx context.Context) error {
-	if s.cfg.BinaryBucket == "" {
-		return errors.New("missing binary bucket (define GO_ECOSYSTEM_BINARY_BUCKET)")
-	}
-	c, err := storage.NewClient(ctx)
+	h, err := newAnalysisServer(ctx, s)
 	if err != nil {
 		return err
 	}
-	bucket := c.Bucket(s.cfg.BinaryBucket)
-
-	h := &analysisServer{
-		Server:   s,
-		openFile: gcsOpenFileFunc(ctx, bucket),
-	}
 	s.handle("/analysis/scan/", h.handleScan)
 	s.handle("/analysis/enqueue", h.handleEnqueue)
 	return nil