blob: 51b58ac3d0e4b3b4d1a3f1ba8f7b6a46f5b1e2b6 [file] [log] [blame]
// Copyright 2022 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package bigquery
import (
"context"
"fmt"
"sort"
"time"
bq "cloud.google.com/go/bigquery"
"cloud.google.com/go/civil"
"golang.org/x/exp/maps"
"golang.org/x/pkgsite-metrics/internal/derrors"
"golang.org/x/pkgsite-metrics/internal/log"
"google.golang.org/api/iterator"
)
const VulncheckTableName = "vulncheck"
// Note: before modifying VulnResult or Vuln, make sure the change
// is a valid schema modification.
// The only supported changes are:
// - adding a nullable or repeated column
// - dropping a column
// - changing a column from required to nullable.
// See https://cloud.google.com/bigquery/docs/managing-table-schemas for details.
// VulnResult is a row in the BigQuery vulncheck table. It corresponds to a
// result from the output for vulncheck.Source.
type VulnResult struct {
CreatedAt time.Time `bigquery:"created_at"`
ModulePath string `bigquery:"module_path"`
Version string `bigquery:"version"`
Suffix string `bigquery:"suffix"`
SortVersion string `bigquery:"sort_version"`
ImportedBy int `bigquery:"imported_by"`
Error string `bigquery:"error"`
ErrorCategory string `bigquery:"error_category"`
CommitTime time.Time `bigquery:"commit_time"`
ScanSeconds float64 `bigquery:"scan_seconds"`
ScanMemory int64 `bigquery:"scan_memory"`
PkgsMemory int64 `bigquery:"pkgs_memory"`
ScanMode string `bigquery:"scan_mode"`
// Workers is the concurrency limit under which a module is
// analyzed. Useful for interpreting memory measurements when
// there are multiple modules analyzed in the same process.
// 0 if no limit is specified, -1 for potential errors.
Workers int `bigquery:"workers"`
VulncheckWorkVersion // InferSchema flattens embedded fields
Vulns []*Vuln `bigquery:"vulns"`
}
// VulncheckWorkVersion contains information that can be used to avoid duplicate work.
// Given two VulncheckWorkVersion values v1 and v2 for the same module path and version,
// if v1.Equal(v2) then it is not necessary to scan the module.
type VulncheckWorkVersion struct {
// The version of the currently running code. This tracks changes in the
// logic of module scanning and processing.
WorkerVersion string `bigquery:"worker_version"`
// The version of the bigquery schema.
SchemaVersion string ` bigquery:"schema_version"`
// The version of the golang.org/x/vuln module used by the current module.
VulnVersion string `bigquery:"x_vuln_version"`
// When the vuln DB was last modified.
VulnDBLastModified time.Time `bigquery:"vulndb_last_modified"`
}
func (v1 *VulncheckWorkVersion) Equal(v2 *VulncheckWorkVersion) bool {
if v1 == nil || v2 == nil {
return v1 == v2
}
return v1.WorkerVersion == v2.WorkerVersion &&
v1.SchemaVersion == v2.SchemaVersion &&
v1.VulnVersion == v2.VulnVersion &&
v1.VulnDBLastModified.Equal(v2.VulnDBLastModified)
}
func (vr *VulnResult) SetUploadTime(t time.Time) { vr.CreatedAt = t }
func (vr *VulnResult) AddError(err error) {
if err == nil {
return
}
vr.Error = err.Error()
vr.ErrorCategory = derrors.CategorizeError(err)
}
// Vuln is a record in VulnResult and corresponds to an item in
// vulncheck.Result.Vulns.
type Vuln struct {
ID string `bigquery:"id"`
Symbol string `bigquery:"symbol"`
PackagePath string `bigquery:"package_path"`
ModulePath string `bigquery:"module_path"`
CallSink bq.NullInt64 `bigquery:"call_sink"`
ImportSink bq.NullInt64 `bigquery:"import_sink"`
RequireSink bq.NullInt64 `bigquery:"require_sink"`
}
// VulncheckSchemaVersion changes whenever the vulncheck schema changes.
var VulncheckSchemaVersion string
func init() {
s, err := bq.InferSchema(VulnResult{})
if err != nil {
panic(err)
}
VulncheckSchemaVersion = SchemaVersion(s)
AddTable(VulncheckTableName, s)
}
// ReadVulncheckWorkVersions reads the most recent WorkVersions in the vulncheck table.
func ReadVulncheckWorkVersions(ctx context.Context, c *Client) (_ map[[2]string]*VulncheckWorkVersion, err error) {
defer derrors.Wrap(&err, "ReadVulncheckWorkVersions")
m := map[[2]string]*VulncheckWorkVersion{}
query := PartitionQuery(c.FullTableName(VulncheckTableName), "module_path, sort_version", "created_at DESC")
iter, err := c.Query(ctx, query)
if err != nil {
return nil, err
}
err = ForEachRow(iter, func(r *VulnResult) bool {
m[[2]string{r.ModulePath, r.Version}] = &r.VulncheckWorkVersion
return true
})
if err != nil {
return nil, err
}
return m, nil
}
// The module path along with the four sort columns should uniquely specify a
// row, because we do not generate a new row for a (module, version) if the
// other three versions are identical. (There is actually a fourth component of
// the work version, the schema version. But since it is represented by a struct
// in the worker code and the worker version captures every change to that code,
// it cannot change independently of worker_version.)
const orderByClauses = `
vulndb_last_modified DESC, -- latest version of database
x_vuln_version DESC, -- latest version of x/vuln
worker_version DESC, -- latest version of x/pkgsite-metrics
sort_version DESC, -- latest version of module
created_at DESC -- latest insertion time
`
func FetchVulncheckResults(ctx context.Context, c *Client) (rows []*VulnResult, err error) {
return fetchVulncheckResults(ctx, c, VulncheckTableName)
}
func fetchVulncheckResults(ctx context.Context, c *Client, tableName string) (rows []*VulnResult, err error) {
name := c.FullTableName(tableName)
query := PartitionQuery(name, "module_path, scan_mode", orderByClauses)
log.Infof(ctx, "running latest query on %s", name)
iter, err := c.Query(ctx, query)
if err != nil {
return nil, err
}
rows, err = All[VulnResult](iter)
if err != nil {
return nil, err
}
log.Infof(ctx, "got %d rows", len(rows))
// Check for duplicate rows.
modvers := map[string]int{}
for _, r := range rows {
modvers[r.ModulePath+"@"+r.Version+" "+r.ScanMode]++
}
keys := maps.Keys(modvers)
sort.Strings(keys)
for _, k := range keys {
if n := modvers[k]; n > 1 {
return nil, fmt.Errorf("%s has %d rows", k, n)
}
}
return rows, nil
}
type ReportVulnResult struct {
*VulnResult
ReportDate civil.Date `bigquery:"report_date"` // for reporting (e.g. dashboard)
InsertedAt time.Time `bigquery:"inserted_at"` // to disambiguate if >1 insertion for same date
}
func init() {
s, err := bq.InferSchema(ReportVulnResult{})
if err != nil {
panic(err)
}
AddTable(VulncheckTableName+"-report", s)
}
func InsertVulncheckResults(ctx context.Context, c *Client, results []*VulnResult, date civil.Date, allowDuplicates bool) (err error) {
return insertVulncheckResults(ctx, c, VulncheckTableName+"-report", results, date, allowDuplicates)
}
func insertVulncheckResults(ctx context.Context, c *Client, reportTableName string, results []*VulnResult, date civil.Date, allowDuplicates bool) (err error) {
derrors.Wrap(&err, "InsertVulncheckResults(%s)", date)
// Create the report table if it doesn't exist.
if err := c.CreateTable(ctx, reportTableName); err != nil {
return err
}
if !allowDuplicates {
query := fmt.Sprintf("SELECT COUNT(*) FROM `%s` WHERE report_date = '%s'",
c.FullTableName(reportTableName), date)
iter, err := c.Query(ctx, query)
if err != nil {
return err
}
var count struct {
n int
}
err = iter.Next(&count)
if err != nil && err != iterator.Done {
return err
}
if count.n > 0 {
return fmt.Errorf("already have %d rows for %s", count.n, date)
}
}
now := time.Now()
var rows []ReportVulnResult
for _, r := range results {
rows = append(rows, ReportVulnResult{VulnResult: r, ReportDate: date, InsertedAt: now})
}
ctx, cancel := context.WithTimeout(ctx, 5*time.Minute) // to avoid retrying forever on permanent errors
defer cancel()
const chunkSize = 1024 // Chunk rows to a void exceeding the maximum allowable request size.
return UploadMany(ctx, c, reportTableName, rows, chunkSize)
}