internal/vulndb,internal/worker: add only modified or new entries

Skip adding an entry to the vuln db table if the same entry has been
added before and corresponding vulnerability in the db has not been
modified.

Change-Id: I9fc76cb9d74d03f0722a5523b0b1ab78541fbb5e
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/488177
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Zvonimir Pavlinovic <zpavlinovic@google.com>
Reviewed-by: Maceo Thompson <maceothompson@google.com>
diff --git a/go.mod b/go.mod
index 8a40c46..f088985 100644
--- a/go.mod
+++ b/go.mod
@@ -25,7 +25,6 @@
 	golang.org/x/exp/event v0.0.0-20220218215828-6cf2b201936e
 	golang.org/x/mod v0.7.0
 	golang.org/x/net v0.7.0
-	golang.org/x/sync v0.1.0
 	golang.org/x/tools v0.5.1-0.20230117180257-8aba49bb5ea2
 	golang.org/x/vuln v0.0.0-20230201222900-4c848edceff1
 	google.golang.org/api v0.110.0
@@ -58,6 +57,7 @@
 	go.opentelemetry.io/otel/trace v1.11.2 // indirect
 	golang.org/x/exp/typeparams v0.0.0-20221208152030-732eee02a75a // indirect
 	golang.org/x/oauth2 v0.5.0 // indirect
+	golang.org/x/sync v0.1.0 // indirect
 	golang.org/x/sys v0.5.0 // indirect
 	golang.org/x/text v0.7.0 // indirect
 	golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
diff --git a/internal/vulndb/vulndb.go b/internal/vulndb/vulndb.go
index 6fa62fd..9f9d1df 100644
--- a/internal/vulndb/vulndb.go
+++ b/internal/vulndb/vulndb.go
@@ -7,9 +7,11 @@
 package vulndb
 
 import (
+	"context"
 	"time"
 
 	"golang.org/x/pkgsite-metrics/internal/bigquery"
+	"golang.org/x/pkgsite-metrics/internal/derrors"
 	"golang.org/x/vuln/osv"
 )
 
@@ -107,3 +109,26 @@
 	}
 	return rs
 }
+
+// ReadMostRecentDB returns entries from the table that reflect the
+// most recent state of the vulnerability database at c.
+func ReadMostRecentDB(ctx context.Context, c *bigquery.Client) (entries []*Entry, err error) {
+	defer derrors.Wrap(&err, "ReadMostRecentDB")
+	query := bigquery.PartitionQuery{
+		Table:       c.FullTableName(TableName),
+		PartitionOn: "ID",
+		OrderBy:     "modified_time DESC",
+	}.String()
+	iter, err := c.Query(ctx, query)
+	if err != nil {
+		return nil, err
+	}
+	err = bigquery.ForEachRow(iter, func(e *Entry) bool {
+		entries = append(entries, e)
+		return true
+	})
+	if err != nil {
+		return nil, err
+	}
+	return entries, nil
+}
diff --git a/internal/vulndb/vulndb_test.go b/internal/vulndb/vulndb_test.go
index 6c86f73..adc07df 100644
--- a/internal/vulndb/vulndb_test.go
+++ b/internal/vulndb/vulndb_test.go
@@ -5,9 +5,13 @@
 package vulndb
 
 import (
+	"context"
 	"testing"
+	"time"
 
 	"github.com/google/go-cmp/cmp"
+	"golang.org/x/pkgsite-metrics/internal/bigquery"
+	test "golang.org/x/pkgsite-metrics/internal/testing"
 	"golang.org/x/vuln/osv"
 )
 
@@ -36,3 +40,46 @@
 		t.Fatalf("mismatch (-want, +got):\n%s", diff)
 	}
 }
+
+func TestReadMostRecentDB(t *testing.T) {
+	test.NeedsIntegrationEnv(t)
+
+	ctx := context.Background()
+	const projectID = "go-ecosystem"
+
+	client, err := bigquery.NewClientForTesting(ctx, projectID)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer client.Close()
+
+	writeToBigQuery := func(es []*Entry) {
+		if err := client.CreateTable(ctx, TableName); err != nil {
+			t.Fatal(err)
+		}
+		if err := bigquery.UploadMany(ctx, client, TableName, es, 0); err != nil {
+			t.Fatal(err)
+		}
+	}
+
+	lmt := time.Now()
+	es := []*Entry{
+		{ID: "A"},
+		{ID: "A", ModifiedTime: lmt},
+		{ID: "B", ModifiedTime: lmt},
+	}
+	writeToBigQuery(es)
+
+	got, err := ReadMostRecentDB(ctx, client)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if len(got) != 2 {
+		t.Fatalf("want 2 rows; got %d", len(got))
+	}
+	for _, e := range got {
+		if e.ModifiedTime != lmt {
+			t.Fatalf("want last modified time %v; got %v", lmt, e.ModifiedTime)
+		}
+	}
+}
diff --git a/internal/worker/vulndb.go b/internal/worker/vulndb.go
index 47a1b01..0ee62e2 100644
--- a/internal/worker/vulndb.go
+++ b/internal/worker/vulndb.go
@@ -11,6 +11,7 @@
 	"os"
 	"path/filepath"
 	"strings"
+	"time"
 
 	"fmt"
 	"net/http"
@@ -22,6 +23,7 @@
 	"golang.org/x/pkgsite-metrics/internal"
 	"golang.org/x/pkgsite-metrics/internal/bigquery"
 	"golang.org/x/pkgsite-metrics/internal/derrors"
+	"golang.org/x/pkgsite-metrics/internal/log"
 	"golang.org/x/pkgsite-metrics/internal/vulndb"
 	"golang.org/x/pkgsite-metrics/internal/vulndbreqs"
 )
@@ -66,12 +68,28 @@
 	if bucket == nil {
 		return errors.New("failed to create go-vulndb bucket")
 	}
+
+	lmts, err := lastModified(ctx, dbClient)
+	if err != nil {
+		return err
+	}
 	entries, err := vulndbEntries(ctx, bucket)
 	if err != nil {
 		return err
 	}
+	for _, e := range entries {
+		lmt, ok := lmts[e.ID]
+		if ok && e.ModifiedTime.Equal(lmt) {
+			// Skip adding the entry if nothing has changed in the meantime.
+			log.Infof(ctx, "skipping entry %s, it has not been modified", e.ID)
+			continue
+		}
+		if err = writeResult(ctx, false, w, dbClient, vulndb.TableName, e); err != nil {
+			return err
+		}
+	}
 
-	return bigquery.UploadMany(ctx, dbClient, vulndb.TableName, entries, 10000)
+	return nil
 }
 
 func vulndbEntries(ctx context.Context, bucket *storage.BucketHandle) ([]*vulndb.Entry, error) {
@@ -132,3 +150,15 @@
 	}
 	return &entry, nil
 }
+
+func lastModified(ctx context.Context, c *bigquery.Client) (map[string]time.Time, error) {
+	es, err := vulndb.ReadMostRecentDB(ctx, c)
+	if err != nil {
+		return nil, err
+	}
+	m := make(map[string]time.Time)
+	for _, e := range es {
+		m[e.ID] = e.ModifiedTime
+	}
+	return m, nil
+}