internal/worker: read analysis work version per <module,version,binary>
Change-Id: Ic5aa4b3bbd9c4e59a64d75a4ed2a544fa9f2c8ed
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/502995
Reviewed-by: Maceo Thompson <maceothompson@google.com>
Run-TryBot: Zvonimir Pavlinovic <zpavlinovic@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/analysis/analysis.go b/internal/analysis/analysis.go
index cac987a..5603df6 100644
--- a/internal/analysis/analysis.go
+++ b/internal/analysis/analysis.go
@@ -211,44 +211,29 @@
Binary string
}
-// ReadWorkVersions reads the most recent WorkVersions in the analysis table.
-func ReadWorkVersions(ctx context.Context, c *bigquery.Client) (_ map[WorkVersionKey]WorkVersion, err error) {
- defer derrors.Wrap(&err, "ReadWorkVersions")
+// ReadWorkVersion reads the most recent WorkVersion in the analysis table
+// for module_path at version for binary.
+func ReadWorkVersion(ctx context.Context, c *bigquery.Client, module_path, version, binary string) (wv *WorkVersion, err error) {
+ defer derrors.Wrap(&err, "ReadWorkVersion")
- // Preamble defines an auxiliary table that remembers the
- // latest version, defined by sort_version, for each module
- // and analysis.
- const preamble = "WITH latest AS (SELECT module_path AS module, binary_name as binary, MAX(sort_version) as max_version FROM `%s` GROUP BY module_path, binary_name)"
- latest := fmt.Sprintf(preamble, c.FullTableName(TableName))
- // Partition the table by module, analysis, and version while
- // only considering the `latest` version. This is accomplished
- // by joining govulncheck table with latest.
- partition := bigquery.PartitionQuery{
- From: fmt.Sprintf("`%s` JOIN latest ON module_path=module AND sort_version=max_version AND binary_name=binary", c.FullTableName(TableName)),
- Columns: "module_path, version, binary_name, binary_version, binary_args, worker_version, schema_version",
- PartitionOn: "module_path, sort_version, binary_name",
- OrderBy: "created_at DESC",
- }.String()
- // Create the final query that gets only one work version
- // for each module and analysis. The returned work version is
- // the latest one, defined by max of sort_version. Note that
- // this will not match the latest version of a module, in the
- // strict Go sense, if the module has non-linear tagging (which
- // should not happen too often).
- query := fmt.Sprintf("%s\n%s", latest, partition)
+ const qf = `
+ SELECT module_path, version, binary_name, binary_version, binary_args, worker_version, schema_version
+ FROM %s WHERE module_path="%s" AND version="%s" AND binary_name="%s" ORDER BY created_at DESC LIMIT 1
+ `
+ query := fmt.Sprintf(qf, "`"+c.FullTableName(TableName)+"`", module_path, version, binary)
iter, err := c.Query(ctx, query)
if err != nil {
return nil, err
}
- m := map[WorkVersionKey]WorkVersion{}
err = bigquery.ForEachRow(iter, func(r *Result) bool {
- m[WorkVersionKey{r.ModulePath, r.Version, r.BinaryName}] = r.WorkVersion
+ // Should be reached at most once.
+ wv = &r.WorkVersion
return true
})
if err != nil {
return nil, err
}
- return m, nil
+ return wv, nil
}
// JSONTreeToDiagnostics converts a jsonTree to a list of diagnostics for BigQuery.
diff --git a/internal/worker/analysis.go b/internal/worker/analysis.go
index 449cffe..ea480d3 100644
--- a/internal/worker/analysis.go
+++ b/internal/worker/analysis.go
@@ -49,18 +49,10 @@
return nil, err
}
bucket := c.Bucket(s.cfg.BinaryBucket)
- var wvs map[analysis.WorkVersionKey]analysis.WorkVersion
- if s.bqClient != nil {
- wvs, err = analysis.ReadWorkVersions(ctx, s.bqClient)
- if err != nil {
- return nil, err
- }
- log.Infof(ctx, "read %d work versions", len(wvs))
- }
return &analysisServer{
Server: s,
openFile: gcsOpenFileFunc(ctx, bucket),
- storedWorkVersions: wvs,
+ storedWorkVersions: make(map[analysis.WorkVersionKey]analysis.WorkVersion),
}, nil
}
@@ -132,12 +124,17 @@
SchemaVersion: analysis.SchemaVersion,
BinaryVersion: hex.EncodeToString(binaryHash),
}
+
+ if err := s.readWorkVersion(ctx, req.Module, req.Version, req.Binary); err != nil {
+ return err
+ }
key := analysis.WorkVersionKey{Module: req.Module, Version: req.Version, Binary: req.Binary}
if wv == s.storedWorkVersions[key] {
log.Infof(ctx, "skipping (work version unchanged): %+v", key)
incrementJob("NumSkipped")
return nil
}
+
row := s.scan(ctx, req, localBinaryPath, wv)
if err := writeResult(ctx, req.Serve, w, s.bqClient, analysis.TableName, row); err != nil {
return err
@@ -150,6 +147,31 @@
return nil
}
+func (s *analysisServer) readWorkVersion(ctx context.Context, module_path, version, binary string) error {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ key := analysis.WorkVersionKey{Module: module_path, Version: version, Binary: binary}
+ if _, ok := s.storedWorkVersions[key]; ok {
+ return nil
+ }
+ if s.bqClient == nil {
+ return nil
+ }
+ wv, err := analysis.ReadWorkVersion(ctx, s.bqClient, module_path, version, binary)
+ if err != nil {
+ if isReadPreviousWorkQuotaError(err) {
+ log.Info(ctx, "hit bigquery list quota when reading work version, sleeping 1 minute...")
+ // Sleep a minute to allow quota limitations to clear up.
+ time.Sleep(60 * time.Second)
+ }
+ return err
+ }
+ if wv != nil {
+ s.storedWorkVersions[key] = *wv
+ }
+ return nil
+}
+
func (s *analysisServer) scan(ctx context.Context, req *analysis.ScanRequest, localBinaryPath string, wv analysis.WorkVersion) *analysis.Result {
row := &analysis.Result{
ModulePath: req.Module,
diff --git a/internal/worker/govulncheck_scan.go b/internal/worker/govulncheck_scan.go
index 38e5fee..fa65480 100644
--- a/internal/worker/govulncheck_scan.go
+++ b/internal/worker/govulncheck_scan.go
@@ -25,7 +25,6 @@
"golang.org/x/pkgsite-metrics/internal/proxy"
"golang.org/x/pkgsite-metrics/internal/sandbox"
"golang.org/x/pkgsite-metrics/internal/version"
- "google.golang.org/api/googleapi"
)
const (
@@ -142,7 +141,7 @@
}
ws, err := govulncheck.ReadWorkState(ctx, h.bqClient, module_path, version)
if err != nil {
- if isReadWorkStatesQuotaError(err) {
+ if isReadPreviousWorkQuotaError(err) {
log.Info(ctx, "hit bigquery list quota when reading work version, sleeping 1 minute...")
// Sleep a minute to allow quota limitations to clear up.
time.Sleep(60 * time.Second)
@@ -156,15 +155,6 @@
return nil
}
-func isReadWorkStatesQuotaError(err error) bool {
- var gerr *googleapi.Error
- if !errors.As(err, &gerr) {
- return false
- }
- // BigQuery uses 403 for quota exceeded.
- return gerr.Code == 403
-}
-
// A scanner holds state for scanning modules.
type scanner struct {
proxyClient *proxy.Client
diff --git a/internal/worker/scan.go b/internal/worker/scan.go
index b686bd8..6728ff0 100644
--- a/internal/worker/scan.go
+++ b/internal/worker/scan.go
@@ -26,6 +26,7 @@
"golang.org/x/pkgsite-metrics/internal/log"
"golang.org/x/pkgsite-metrics/internal/modules"
"golang.org/x/pkgsite-metrics/internal/proxy"
+ "google.golang.org/api/googleapi"
)
const (
@@ -305,3 +306,12 @@
func isSyntheticLoad(err error) bool {
return strings.Contains(err.Error(), "synthetic")
}
+
+func isReadPreviousWorkQuotaError(err error) bool {
+ var gerr *googleapi.Error
+ if !errors.As(err, &gerr) {
+ return false
+ }
+ // BigQuery uses 403 for quota exceeded.
+ return gerr.Code == 403
+}