internal/worker: read analysis work version per <module,version,binary> Change-Id: Ic5aa4b3bbd9c4e59a64d75a4ed2a544fa9f2c8ed Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/502995 Reviewed-by: Maceo Thompson <maceothompson@google.com> Run-TryBot: Zvonimir Pavlinovic <zpavlinovic@google.com> TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/analysis/analysis.go b/internal/analysis/analysis.go index cac987a..5603df6 100644 --- a/internal/analysis/analysis.go +++ b/internal/analysis/analysis.go
@@ -211,44 +211,29 @@ Binary string } -// ReadWorkVersions reads the most recent WorkVersions in the analysis table. -func ReadWorkVersions(ctx context.Context, c *bigquery.Client) (_ map[WorkVersionKey]WorkVersion, err error) { - defer derrors.Wrap(&err, "ReadWorkVersions") +// ReadWorkVersion reads the most recent WorkVersion in the analysis table +// for module_path at version for binary. +func ReadWorkVersion(ctx context.Context, c *bigquery.Client, module_path, version, binary string) (wv *WorkVersion, err error) { + defer derrors.Wrap(&err, "ReadWorkVersion") - // Preamble defines an auxiliary table that remembers the - // latest version, defined by sort_version, for each module - // and analysis. - const preamble = "WITH latest AS (SELECT module_path AS module, binary_name as binary, MAX(sort_version) as max_version FROM `%s` GROUP BY module_path, binary_name)" - latest := fmt.Sprintf(preamble, c.FullTableName(TableName)) - // Partition the table by module, analysis, and version while - // only considering the `latest` version. This is accomplished - // by joining govulncheck table with latest. - partition := bigquery.PartitionQuery{ - From: fmt.Sprintf("`%s` JOIN latest ON module_path=module AND sort_version=max_version AND binary_name=binary", c.FullTableName(TableName)), - Columns: "module_path, version, binary_name, binary_version, binary_args, worker_version, schema_version", - PartitionOn: "module_path, sort_version, binary_name", - OrderBy: "created_at DESC", - }.String() - // Create the final query that gets only one work version - // for each module and analysis. The returned work version is - // the latest one, defined by max of sort_version. Note that - // this will not match the latest version of a module, in the - // strict Go sense, if the module has non-linear tagging (which - // should not happen too often). - query := fmt.Sprintf("%s\n%s", latest, partition) + const qf = ` + SELECT module_path, version, binary_name, binary_version, binary_args, worker_version, schema_version + FROM %s WHERE module_path="%s" AND version="%s" AND binary_name="%s" ORDER BY created_at DESC LIMIT 1 + ` + query := fmt.Sprintf(qf, "`"+c.FullTableName(TableName)+"`", module_path, version, binary) iter, err := c.Query(ctx, query) if err != nil { return nil, err } - m := map[WorkVersionKey]WorkVersion{} err = bigquery.ForEachRow(iter, func(r *Result) bool { - m[WorkVersionKey{r.ModulePath, r.Version, r.BinaryName}] = r.WorkVersion + // Should be reached at most once. + wv = &r.WorkVersion return true }) if err != nil { return nil, err } - return m, nil + return wv, nil } // JSONTreeToDiagnostics converts a jsonTree to a list of diagnostics for BigQuery.
diff --git a/internal/worker/analysis.go b/internal/worker/analysis.go index 449cffe..ea480d3 100644 --- a/internal/worker/analysis.go +++ b/internal/worker/analysis.go
@@ -49,18 +49,10 @@ return nil, err } bucket := c.Bucket(s.cfg.BinaryBucket) - var wvs map[analysis.WorkVersionKey]analysis.WorkVersion - if s.bqClient != nil { - wvs, err = analysis.ReadWorkVersions(ctx, s.bqClient) - if err != nil { - return nil, err - } - log.Infof(ctx, "read %d work versions", len(wvs)) - } return &analysisServer{ Server: s, openFile: gcsOpenFileFunc(ctx, bucket), - storedWorkVersions: wvs, + storedWorkVersions: make(map[analysis.WorkVersionKey]analysis.WorkVersion), }, nil } @@ -132,12 +124,17 @@ SchemaVersion: analysis.SchemaVersion, BinaryVersion: hex.EncodeToString(binaryHash), } + + if err := s.readWorkVersion(ctx, req.Module, req.Version, req.Binary); err != nil { + return err + } key := analysis.WorkVersionKey{Module: req.Module, Version: req.Version, Binary: req.Binary} if wv == s.storedWorkVersions[key] { log.Infof(ctx, "skipping (work version unchanged): %+v", key) incrementJob("NumSkipped") return nil } + row := s.scan(ctx, req, localBinaryPath, wv) if err := writeResult(ctx, req.Serve, w, s.bqClient, analysis.TableName, row); err != nil { return err @@ -150,6 +147,31 @@ return nil } +func (s *analysisServer) readWorkVersion(ctx context.Context, module_path, version, binary string) error { + s.mu.Lock() + defer s.mu.Unlock() + key := analysis.WorkVersionKey{Module: module_path, Version: version, Binary: binary} + if _, ok := s.storedWorkVersions[key]; ok { + return nil + } + if s.bqClient == nil { + return nil + } + wv, err := analysis.ReadWorkVersion(ctx, s.bqClient, module_path, version, binary) + if err != nil { + if isReadPreviousWorkQuotaError(err) { + log.Info(ctx, "hit bigquery list quota when reading work version, sleeping 1 minute...") + // Sleep a minute to allow quota limitations to clear up. + time.Sleep(60 * time.Second) + } + return err + } + if wv != nil { + s.storedWorkVersions[key] = *wv + } + return nil +} + func (s *analysisServer) scan(ctx context.Context, req *analysis.ScanRequest, localBinaryPath string, wv analysis.WorkVersion) *analysis.Result { row := &analysis.Result{ ModulePath: req.Module,
diff --git a/internal/worker/govulncheck_scan.go b/internal/worker/govulncheck_scan.go index 38e5fee..fa65480 100644 --- a/internal/worker/govulncheck_scan.go +++ b/internal/worker/govulncheck_scan.go
@@ -25,7 +25,6 @@ "golang.org/x/pkgsite-metrics/internal/proxy" "golang.org/x/pkgsite-metrics/internal/sandbox" "golang.org/x/pkgsite-metrics/internal/version" - "google.golang.org/api/googleapi" ) const ( @@ -142,7 +141,7 @@ } ws, err := govulncheck.ReadWorkState(ctx, h.bqClient, module_path, version) if err != nil { - if isReadWorkStatesQuotaError(err) { + if isReadPreviousWorkQuotaError(err) { log.Info(ctx, "hit bigquery list quota when reading work version, sleeping 1 minute...") // Sleep a minute to allow quota limitations to clear up. time.Sleep(60 * time.Second) @@ -156,15 +155,6 @@ return nil } -func isReadWorkStatesQuotaError(err error) bool { - var gerr *googleapi.Error - if !errors.As(err, &gerr) { - return false - } - // BigQuery uses 403 for quota exceeded. - return gerr.Code == 403 -} - // A scanner holds state for scanning modules. type scanner struct { proxyClient *proxy.Client
diff --git a/internal/worker/scan.go b/internal/worker/scan.go index b686bd8..6728ff0 100644 --- a/internal/worker/scan.go +++ b/internal/worker/scan.go
@@ -26,6 +26,7 @@ "golang.org/x/pkgsite-metrics/internal/log" "golang.org/x/pkgsite-metrics/internal/modules" "golang.org/x/pkgsite-metrics/internal/proxy" + "google.golang.org/api/googleapi" ) const ( @@ -305,3 +306,12 @@ func isSyntheticLoad(err error) bool { return strings.Contains(err.Error(), "synthetic") } + +func isReadPreviousWorkQuotaError(err error) bool { + var gerr *googleapi.Error + if !errors.As(err, &gerr) { + return false + } + // BigQuery uses 403 for quota exceeded. + return gerr.Code == 403 +}