internal/worker: read analysis work version per <module,version,binary>

Change-Id: Ic5aa4b3bbd9c4e59a64d75a4ed2a544fa9f2c8ed
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/502995
Reviewed-by: Maceo Thompson <maceothompson@google.com>
Run-TryBot: Zvonimir Pavlinovic <zpavlinovic@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/analysis/analysis.go b/internal/analysis/analysis.go
index cac987a..5603df6 100644
--- a/internal/analysis/analysis.go
+++ b/internal/analysis/analysis.go
@@ -211,44 +211,29 @@
 	Binary  string
 }
 
-// ReadWorkVersions reads the most recent WorkVersions in the analysis table.
-func ReadWorkVersions(ctx context.Context, c *bigquery.Client) (_ map[WorkVersionKey]WorkVersion, err error) {
-	defer derrors.Wrap(&err, "ReadWorkVersions")
+// ReadWorkVersion reads the most recent WorkVersion in the analysis table
+// for module_path at version for binary.
+func ReadWorkVersion(ctx context.Context, c *bigquery.Client, module_path, version, binary string) (wv *WorkVersion, err error) {
+	defer derrors.Wrap(&err, "ReadWorkVersion")
 
-	// Preamble defines an auxiliary table that remembers the
-	// latest version, defined by sort_version, for each module
-	// and analysis.
-	const preamble = "WITH latest AS (SELECT module_path AS module, binary_name as binary, MAX(sort_version) as max_version FROM `%s` GROUP BY module_path, binary_name)"
-	latest := fmt.Sprintf(preamble, c.FullTableName(TableName))
-	// Partition the table by module, analysis, and version while
-	// only considering the `latest` version. This is accomplished
-	// by joining govulncheck table with latest.
-	partition := bigquery.PartitionQuery{
-		From:        fmt.Sprintf("`%s` JOIN latest ON module_path=module AND sort_version=max_version AND binary_name=binary", c.FullTableName(TableName)),
-		Columns:     "module_path, version, binary_name, binary_version, binary_args, worker_version, schema_version",
-		PartitionOn: "module_path, sort_version, binary_name",
-		OrderBy:     "created_at DESC",
-	}.String()
-	// Create the final query that gets only one work version
-	// for each module and analysis. The returned work version is
-	// the latest one, defined by max of sort_version. Note that
-	// this will not match the latest version of a module, in the
-	// strict Go sense, if the module has non-linear tagging (which
-	// should not happen too often).
-	query := fmt.Sprintf("%s\n%s", latest, partition)
+	const qf = `
+                SELECT module_path, version, binary_name, binary_version, binary_args, worker_version, schema_version
+                FROM %s WHERE module_path="%s" AND version="%s" AND binary_name="%s" ORDER BY created_at DESC LIMIT 1
+        `
+	query := fmt.Sprintf(qf, "`"+c.FullTableName(TableName)+"`", module_path, version, binary)
 	iter, err := c.Query(ctx, query)
 	if err != nil {
 		return nil, err
 	}
-	m := map[WorkVersionKey]WorkVersion{}
 	err = bigquery.ForEachRow(iter, func(r *Result) bool {
-		m[WorkVersionKey{r.ModulePath, r.Version, r.BinaryName}] = r.WorkVersion
+		// Should be reached at most once.
+		wv = &r.WorkVersion
 		return true
 	})
 	if err != nil {
 		return nil, err
 	}
-	return m, nil
+	return wv, nil
 }
 
 // JSONTreeToDiagnostics converts a jsonTree to a list of diagnostics for BigQuery.
diff --git a/internal/worker/analysis.go b/internal/worker/analysis.go
index 449cffe..ea480d3 100644
--- a/internal/worker/analysis.go
+++ b/internal/worker/analysis.go
@@ -49,18 +49,10 @@
 		return nil, err
 	}
 	bucket := c.Bucket(s.cfg.BinaryBucket)
-	var wvs map[analysis.WorkVersionKey]analysis.WorkVersion
-	if s.bqClient != nil {
-		wvs, err = analysis.ReadWorkVersions(ctx, s.bqClient)
-		if err != nil {
-			return nil, err
-		}
-		log.Infof(ctx, "read %d work versions", len(wvs))
-	}
 	return &analysisServer{
 		Server:             s,
 		openFile:           gcsOpenFileFunc(ctx, bucket),
-		storedWorkVersions: wvs,
+		storedWorkVersions: make(map[analysis.WorkVersionKey]analysis.WorkVersion),
 	}, nil
 }
 
@@ -132,12 +124,17 @@
 		SchemaVersion: analysis.SchemaVersion,
 		BinaryVersion: hex.EncodeToString(binaryHash),
 	}
+
+	if err := s.readWorkVersion(ctx, req.Module, req.Version, req.Binary); err != nil {
+		return err
+	}
 	key := analysis.WorkVersionKey{Module: req.Module, Version: req.Version, Binary: req.Binary}
 	if wv == s.storedWorkVersions[key] {
 		log.Infof(ctx, "skipping (work version unchanged): %+v", key)
 		incrementJob("NumSkipped")
 		return nil
 	}
+
 	row := s.scan(ctx, req, localBinaryPath, wv)
 	if err := writeResult(ctx, req.Serve, w, s.bqClient, analysis.TableName, row); err != nil {
 		return err
@@ -150,6 +147,31 @@
 	return nil
 }
 
+func (s *analysisServer) readWorkVersion(ctx context.Context, module_path, version, binary string) error {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	key := analysis.WorkVersionKey{Module: module_path, Version: version, Binary: binary}
+	if _, ok := s.storedWorkVersions[key]; ok {
+		return nil
+	}
+	if s.bqClient == nil {
+		return nil
+	}
+	wv, err := analysis.ReadWorkVersion(ctx, s.bqClient, module_path, version, binary)
+	if err != nil {
+		if isReadPreviousWorkQuotaError(err) {
+			log.Info(ctx, "hit bigquery list quota when reading work version, sleeping 1 minute...")
+			// Sleep a minute to allow quota limitations to clear up.
+			time.Sleep(60 * time.Second)
+		}
+		return err
+	}
+	if wv != nil {
+		s.storedWorkVersions[key] = *wv
+	}
+	return nil
+}
+
 func (s *analysisServer) scan(ctx context.Context, req *analysis.ScanRequest, localBinaryPath string, wv analysis.WorkVersion) *analysis.Result {
 	row := &analysis.Result{
 		ModulePath:  req.Module,
diff --git a/internal/worker/govulncheck_scan.go b/internal/worker/govulncheck_scan.go
index 38e5fee..fa65480 100644
--- a/internal/worker/govulncheck_scan.go
+++ b/internal/worker/govulncheck_scan.go
@@ -25,7 +25,6 @@
 	"golang.org/x/pkgsite-metrics/internal/proxy"
 	"golang.org/x/pkgsite-metrics/internal/sandbox"
 	"golang.org/x/pkgsite-metrics/internal/version"
-	"google.golang.org/api/googleapi"
 )
 
 const (
@@ -142,7 +141,7 @@
 	}
 	ws, err := govulncheck.ReadWorkState(ctx, h.bqClient, module_path, version)
 	if err != nil {
-		if isReadWorkStatesQuotaError(err) {
+		if isReadPreviousWorkQuotaError(err) {
 			log.Info(ctx, "hit bigquery list quota when reading work version, sleeping 1 minute...")
 			// Sleep a minute to allow quota limitations to clear up.
 			time.Sleep(60 * time.Second)
@@ -156,15 +155,6 @@
 	return nil
 }
 
-func isReadWorkStatesQuotaError(err error) bool {
-	var gerr *googleapi.Error
-	if !errors.As(err, &gerr) {
-		return false
-	}
-	// BigQuery uses 403 for quota exceeded.
-	return gerr.Code == 403
-}
-
 // A scanner holds state for scanning modules.
 type scanner struct {
 	proxyClient *proxy.Client
diff --git a/internal/worker/scan.go b/internal/worker/scan.go
index b686bd8..6728ff0 100644
--- a/internal/worker/scan.go
+++ b/internal/worker/scan.go
@@ -26,6 +26,7 @@
 	"golang.org/x/pkgsite-metrics/internal/log"
 	"golang.org/x/pkgsite-metrics/internal/modules"
 	"golang.org/x/pkgsite-metrics/internal/proxy"
+	"google.golang.org/api/googleapi"
 )
 
 const (
@@ -305,3 +306,12 @@
 func isSyntheticLoad(err error) bool {
 	return strings.Contains(err.Error(), "synthetic")
 }
+
+func isReadPreviousWorkQuotaError(err error) bool {
+	var gerr *googleapi.Error
+	if !errors.As(err, &gerr) {
+		return false
+	}
+	// BigQuery uses 403 for quota exceeded.
+	return gerr.Code == 403
+}