internal/worker: doUpdate function

This CL contains the bulk of the update operation: everything
but the logic for handling modified CVEs.

For golang/go#49733

Change-Id: I742d19bb0553256c9cc81df0c4a930eea9d73a98
Reviewed-on: https://go-review.googlesource.com/c/vuln/+/366474
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Julie Qiu <julie@golang.org>
Trust: Jonathan Amsterdam <jba@google.com>
diff --git a/go.mod b/go.mod
index d7aa886..169aab2 100644
--- a/go.mod
+++ b/go.mod
@@ -3,12 +3,13 @@
 go 1.17
 
 require (
+	cloud.google.com/go/firestore v1.6.1
 	github.com/Microsoft/go-winio v0.4.16 // indirect
 	github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 // indirect
 	github.com/acomagu/bufpipe v1.0.3 // indirect
 	github.com/emirpasic/gods v1.12.0 // indirect
 	github.com/go-git/gcfg v1.5.0 // indirect
-	github.com/go-git/go-billy/v5 v5.3.1 // indirect
+	github.com/go-git/go-billy/v5 v5.3.1
 	github.com/go-git/go-git/v5 v5.4.2
 	github.com/google/go-cmp v0.5.6
 	github.com/imdario/mergo v0.3.12 // indirect
@@ -21,14 +22,15 @@
 	golang.org/x/mod v0.4.2
 	golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420 // indirect
 	golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 // indirect
+	golang.org/x/tools v0.1.5
 	golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
+	google.golang.org/api v0.60.0
 	gopkg.in/warnings.v0 v0.1.2 // indirect
 	gopkg.in/yaml.v2 v2.4.0
 )
 
 require (
 	cloud.google.com/go v0.97.0 // indirect
-	cloud.google.com/go/firestore v1.6.1 // indirect
 	github.com/census-instrumentation/opencensus-proto v0.2.1 // indirect
 	github.com/cespare/xxhash v1.1.0 // indirect
 	github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403 // indirect
@@ -41,7 +43,6 @@
 	go.opencensus.io v0.23.0 // indirect
 	golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 // indirect
 	golang.org/x/text v0.3.6 // indirect
-	google.golang.org/api v0.60.0 // indirect
 	google.golang.org/appengine v1.6.7 // indirect
 	google.golang.org/genproto v0.0.0-20211028162531-8db9c33dc351 // indirect
 	google.golang.org/grpc v1.40.0 // indirect
diff --git a/go.sum b/go.sum
index bb4035c..76e2445 100644
--- a/go.sum
+++ b/go.sum
@@ -51,6 +51,7 @@
 github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA=
 github.com/Microsoft/go-winio v0.4.16 h1:FtSW/jqD+l4ba5iPBj9CODVtgfYAD8w2wS923g/cFDk=
 github.com/Microsoft/go-winio v0.4.16/go.mod h1:XB6nPKklQyQ7GC9LdcBEcBl8PF76WugXOPRXwdLnMv0=
+github.com/OneOfOne/xxhash v1.2.2 h1:KMrpdQIwFcEqXDklaen+P1axHaj9BSKzvpUUfnHldSE=
 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
 github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7 h1:YoJbenK9C67SkzkDfmQuVln04ygHj3vjZfd9FL+GmQQ=
 github.com/ProtonMail/go-crypto v0.0.0-20210428141323-04723f9f07d7/go.mod h1:z4/9nQmJSSwwds7ejkxaJwO37dru3geImFUdJlaLzQo=
@@ -174,6 +175,7 @@
 github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
 github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
+github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y=
 github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
 github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
 github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
@@ -219,6 +221,7 @@
 github.com/sergi/go-diff v1.1.0 h1:we8PVUC3FE2uYfodKH/nBHMSetSfHDR6scGdBi+erh0=
 github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
 github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
 github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
@@ -405,7 +408,6 @@
 golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac h1:oN6lz7iLW/YC7un8pq+9bOLyXrprv2+DKfkJY+2LJJw=
 golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik=
 golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -473,6 +475,7 @@
 golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
+golang.org/x/tools v0.1.5 h1:ouewzE6p+/VEB31YYnTbEJdi8pFqKp4P4n85vwo3DHA=
 golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@@ -508,7 +511,6 @@
 google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
 google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
 google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI=
-google.golang.org/api v0.59.0 h1:fPfFO7gttlXYo2ALuD3HxJzh8vaF++4youI0BkFL6GE=
 google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU=
 google.golang.org/api v0.60.0 h1:eq/zs5WPH4J9undYM9IP1O7dSr7Yh8Y0GtSCpzGzIUk=
 google.golang.org/api v0.60.0/go.mod h1:d7rl65NZAkEQ90JFzqBjcRq1TVeG5ZoGV3sSpEnnVb4=
diff --git a/internal/worker/repo_test.go b/internal/worker/repo_test.go
new file mode 100644
index 0000000..04a958c
--- /dev/null
+++ b/internal/worker/repo_test.go
@@ -0,0 +1,139 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+	"io/ioutil"
+	"path"
+	"testing"
+	"time"
+
+	"github.com/go-git/go-billy/v5/memfs"
+	"github.com/go-git/go-git/v5"
+	"github.com/go-git/go-git/v5/plumbing"
+	"github.com/go-git/go-git/v5/plumbing/object"
+	"github.com/go-git/go-git/v5/storage/memory"
+	"golang.org/x/tools/txtar"
+	"golang.org/x/vuln/internal/derrors"
+)
+
+// readTxtarRepo converts a txtar file to a single-commit
+// repo.
+func readTxtarRepo(filename string) (_ *git.Repository, err error) {
+	defer derrors.Wrap(&err, "readTxtarRepo(%q)", filename)
+
+	mfs := memfs.New()
+	ar, err := txtar.ParseFile(filename)
+	if err != nil {
+		return nil, err
+	}
+	for _, f := range ar.Files {
+		file, err := mfs.Create(f.Name)
+		if err != nil {
+			return nil, err
+		}
+		if _, err := file.Write(f.Data); err != nil {
+			return nil, err
+		}
+		if err := file.Close(); err != nil {
+			return nil, err
+		}
+	}
+
+	repo, err := git.Init(memory.NewStorage(), mfs)
+	if err != nil {
+		return nil, err
+	}
+	wt, err := repo.Worktree()
+	if err != nil {
+		return nil, err
+	}
+	for _, f := range ar.Files {
+		if _, err := wt.Add(f.Name); err != nil {
+			return nil, err
+		}
+	}
+	_, err = wt.Commit("", &git.CommitOptions{All: true, Author: &object.Signature{
+		Name:  "Joe Random",
+		Email: "joe@example.com",
+		When:  time.Now(),
+	}})
+	if err != nil {
+		return nil, err
+	}
+	return repo, nil
+}
+
+// headCommit returns the commit at the repo HEAD.
+func headCommit(t *testing.T, repo *git.Repository) *object.Commit {
+	h, err := headHash(repo)
+	if err != nil {
+		t.Fatal(err)
+	}
+	commit, err := repo.CommitObject(h)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return commit
+}
+
+// headHash returns the hash of the repo's HEAD.
+func headHash(repo *git.Repository) (plumbing.Hash, error) {
+	ref, err := repo.Reference(plumbing.HEAD, true)
+	if err != nil {
+		return plumbing.ZeroHash, err
+	}
+	return ref.Hash(), nil
+}
+
+// findBlob returns the blob at filename in repo.
+// It fail the test if it doesn't exist.
+func findBlob(t *testing.T, repo *git.Repository, filename string) *object.Blob {
+	c := headCommit(t, repo)
+	tree, err := repo.TreeObject(c.TreeHash)
+	if err != nil {
+		t.Fatal(err)
+	}
+	e := findEntry(t, repo, tree, filename)
+	blob, err := repo.BlobObject(e.Hash)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return blob
+}
+
+// readBlob reads the contents of a blob.
+func readBlob(t *testing.T, blob *object.Blob) []byte {
+	r, err := blob.Reader()
+	if err != nil {
+		t.Fatal(err)
+	}
+	data, err := ioutil.ReadAll(r)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return data
+}
+
+// findEntry returns the TreeEntry at filename. It fails the test if
+// it doesn't exist.
+func findEntry(t *testing.T, repo *git.Repository, tree *object.Tree, filename string) object.TreeEntry {
+	dir, base := path.Split(filename)
+	if dir != "" {
+		te := findEntry(t, repo, tree, dir[:len(dir)-1])
+		var err error
+		tree, err = repo.TreeObject(te.Hash)
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+	for _, e := range tree.Entries {
+		if e.Name == base {
+			return e
+		}
+	}
+	t.Fatalf("could not find %q in repo", filename)
+	return object.TreeEntry{}
+}
diff --git a/internal/worker/testdata/basic.txtar b/internal/worker/testdata/basic.txtar
new file mode 100644
index 0000000..ea782fa
--- /dev/null
+++ b/internal/worker/testdata/basic.txtar
@@ -0,0 +1,108 @@
+Repo in the shape of github.com/CVEProject/cvelist, with
+some actual data.
+
+-- README.md --
+ignore me please
+
+-- 2021/0xxx/CVE-2021-0001.json --
+{
+    "data_type": "CVE",
+    "data_format": "MITRE",
+    "data_version": "4.0",
+    "CVE_data_meta": {
+        "ID": "CVE-2021-0001",
+        "ASSIGNER": "secure@intel.com",
+        "STATE": "PUBLIC"
+    },
+    "affects": {
+        "vendor": {
+            "vendor_data": [
+                {
+                    "vendor_name": "n/a",
+                    "product": {
+                        "product_data": [
+                            {
+                                "product_name": "Intel(R) IPP",
+                                "version": {
+                                    "version_data": [
+                                        {
+                                            "version_value": "before version 2020 update 1"
+                                        }
+                                    ]
+                                }
+                            }
+                        ]
+                    }
+                }
+            ]
+        }
+    },
+    "problemtype": {
+        "problemtype_data": [
+            {
+                "description": [
+                    {
+                        "lang": "eng",
+                        "value": "information disclosure"
+                    }
+                ]
+            }
+        ]
+    },
+    "references": {
+        "reference_data": [
+            {
+                "refsource": "MISC",
+                "name": "https://www.intel.com/content/www/us/en/security-center/advisory/intel-sa-00477.html",
+                "url": "https://www.intel.com/content/www/us/en/security-center/advisory/intel-sa-00477.html"
+            }
+        ]
+    },
+    "description": {
+        "description_data": [
+            {
+                "lang": "eng",
+                "value": "Observable timing discrepancy in Intel(R) IPP before version 2020 update 1 may allow authorized user to potentially enable information disclosure via local access."
+            }
+        ]
+    }
+}
+-- 2021/0xxx/CVE-2021-0010.json --
+{
+    "data_type": "CVE",
+    "data_format": "MITRE",
+    "data_version": "4.0",
+    "CVE_data_meta": {
+        "ID": "CVE-2021-0010",
+        "ASSIGNER": "cve@mitre.org",
+        "STATE": "RESERVED"
+    },
+    "description": {
+        "description_data": [
+            {
+                "lang": "eng",
+                "value": "** RESERVED ** This candidate has been reserved by an organization or individual that will use it when announcing a new security problem. When the candidate has been publicized, the details for this candidate will be provided."
+            }
+        ]
+    }
+}
+-- 2021/1xxx/CVE-2021-1384.json --
+{
+    "data_type": "CVE",
+    "data_format": "MITRE",
+    "data_version": "4.0",
+    "CVE_data_meta": {
+        "ID": "CVE-2021-1384",
+        "ASSIGNER": "cve@mitre.org",
+        "STATE": "REJECT"
+    },
+    "description": {
+        "description_data": [
+            {
+                "lang": "eng",
+                "value": "** REJECT ** DO NOT USE THIS CANDIDATE NUMBER. ConsultIDs: none. Reason: This candidate was withdrawn by its CNA. Further investigation showed that it was not a security issue. Notes: none."
+            }
+        ]
+    }
+}
+
diff --git a/internal/worker/update.go b/internal/worker/update.go
new file mode 100644
index 0000000..63fd8f0
--- /dev/null
+++ b/internal/worker/update.go
@@ -0,0 +1,280 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+	"context"
+	"encoding/hex"
+	"encoding/json"
+	"fmt"
+	"io"
+	"log"
+	"path"
+	"sort"
+	"strings"
+	"time"
+
+	"github.com/go-git/go-git/v5"
+	"github.com/go-git/go-git/v5/plumbing"
+	"github.com/go-git/go-git/v5/plumbing/filemode"
+	"github.com/go-git/go-git/v5/plumbing/object"
+	"golang.org/x/vuln/internal/cveschema"
+	"golang.org/x/vuln/internal/derrors"
+	"golang.org/x/vuln/internal/worker/store"
+)
+
+// A triageFunc triages a CVE: it decides whether an issue needs to be filed.
+type triageFunc func(*cveschema.CVE) (bool, error)
+
+// doUpdate compares the repo at the given commit with the state
+// of the DB and updates the DB to match.
+//
+// needsIssue determines whether a CVE needs an issue to be filed for it.
+func doUpdate(ctx context.Context, repo *git.Repository, commitHash plumbing.Hash, st store.Store, needsIssue triageFunc) (err error) {
+	// We want the action of reading the old DB record, updating it and
+	// writing it back to be atomic. It would be too expensive to do that one
+	// record at a time. Ideally we'd process the whole repo commit in one
+	// transaction, but Firestore has a limit on how many writes one
+	// transaction can do, so the CVE files in the repo are processed in
+	// batches, one transaction per batch.
+	defer derrors.Wrap(&err, "doUpdate(%s)", commitHash)
+
+	defer func() {
+		log.Printf("doUpdate finished with error %v", err)
+	}()
+
+	log.Printf("starting update of %s", commitHash)
+
+	// Get all the CVE files.
+	// It is cheaper to read all the files from the repo and compare
+	// them to the DB in bulk, than to walk the repo and process
+	// each file individually.
+	files, err := repoCVEFiles(repo, commitHash)
+	if err != nil {
+		return err
+	}
+	// Create a new UpdateRecord to describe this run of doUpdate.
+	ur := &store.UpdateRecord{
+		StartedAt:  time.Now(),
+		CommitHash: commitHash.String(),
+		NumTotal:   len(files),
+	}
+	if err := st.CreateUpdateRecord(ctx, ur); err != nil {
+		return err
+	}
+
+	// Update files in batches.
+
+	// Max Firestore writes per transaction.
+	// See https://cloud.google.com/firestore/quotas.
+	const batchSize = 500
+
+	for i := 0; i < len(files); i += batchSize {
+		j := i + batchSize
+		if j > len(files) {
+			j = len(files)
+		}
+		numAdds, numMods, err := updateBatch(ctx, files[i:j], st, repo, commitHash, needsIssue)
+
+		// Change the UpdateRecord in the Store to reflect the results of the transaction.
+		if err != nil {
+			ur.Error = err.Error()
+			if err2 := st.SetUpdateRecord(ctx, ur); err2 != nil {
+				return fmt.Errorf("update failed with %w, could not set update record: %v", err, err2)
+			}
+			return err
+		}
+		ur.NumProcessed += j - i
+		// Add in these two numbers here, instead of in the function passed to
+		// RunTransaction, because that function may be executed multiple times.
+		ur.NumAdded += numAdds
+		ur.NumModified += numMods
+		if err := st.SetUpdateRecord(ctx, ur); err != nil {
+			return err
+		}
+	} // end loop
+
+	ur.EndedAt = time.Now()
+	return st.SetUpdateRecord(ctx, ur)
+}
+
+// Action performed by handleCVE.
+type action int
+
+const (
+	nothing action = iota
+	add
+	mod
+)
+
+func updateBatch(ctx context.Context, batch []repoFile, st store.Store, repo *git.Repository, commitHash plumbing.Hash, needsIssue triageFunc) (numAdds, numMods int, err error) {
+	startID := idFromFilename(batch[0].filename)
+	endID := idFromFilename(batch[len(batch)-1].filename)
+	defer derrors.Wrap(&err, "updateBatch(%s-%s)", startID, endID)
+
+	err = st.RunTransaction(ctx, func(ctx context.Context, tx store.Transaction) error {
+		numAdds = 0
+		numMods = 0
+		// Read information about the existing state in the store that's
+		// relevant to this batch. Since the entries are sorted, we can read
+		// a range of IDS.
+		crs, err := tx.GetCVERecords(startID, endID)
+		if err != nil {
+			return err
+		}
+		idToRecord := map[string]*store.CVERecord{}
+		for _, cr := range crs {
+			idToRecord[cr.ID] = cr
+		}
+		// Determine what needs to be added and modified.
+		for _, f := range batch {
+			id := idFromFilename(f.filename)
+			act, err := handleCVE(ctx, repo, f, idToRecord[id], commitHash, needsIssue, tx)
+			if err != nil {
+				return err
+			}
+			switch act {
+			case add:
+				numAdds++
+			case mod:
+				numMods++
+			}
+		}
+		return nil
+	})
+	if err != nil {
+		return 0, 0, err
+	}
+	log.Printf("%s - %s: applied %d additions, %d modifications", startID, endID, numAdds, numMods)
+	return numAdds, numMods, nil
+}
+
+// handleCVE determines how to change the store for a single CVE.
+func handleCVE(ctx context.Context, repo *git.Repository, f repoFile, old *store.CVERecord, commitHash plumbing.Hash, needsIssue triageFunc, tx store.Transaction) (_ action, err error) {
+	defer derrors.Wrap(&err, "handleCVE(%s)", f.filename)
+
+	if old != nil && old.BlobHash == f.hash.String() {
+		// No change; do nothing.
+		return nothing, nil
+	}
+	// Read CVE from repo.
+	r, err := blobReader(repo, f.hash)
+	if err != nil {
+		return nothing, err
+	}
+	cve := &cveschema.CVE{}
+	if err := json.NewDecoder(r).Decode(cve); err != nil {
+		log.Printf("ERROR decoding %s: %v", f.filename, err)
+		return nothing, nil
+	}
+
+	// If the CVE is not in the database, add it.
+	if old == nil {
+		cr := store.NewCVERecord(cve, path.Join(f.dirpath, f.filename), f.hash.String())
+		cr.CommitHash = commitHash.String()
+		needs := false
+		if cve.State == cveschema.StatePublic {
+			needs, err = needsIssue(cve)
+			if err != nil {
+				return nothing, err
+			}
+		}
+		if needs {
+			cr.TriageState = store.TriageStateNeedsIssue
+		} else {
+			cr.TriageState = store.TriageStateNoActionNeeded
+		}
+		if err := tx.CreateCVERecord(cr); err != nil {
+			return nothing, err
+		}
+		return add, nil
+	} else {
+		// TODO(golang/go#49733): handle changes to CVEs.
+	}
+	return nothing, nil
+}
+
+type repoFile struct {
+	dirpath  string
+	filename string
+	hash     plumbing.Hash
+}
+
+// repoCVEFiles returns all the CVE files in the given repo commit, sorted by
+// name.
+func repoCVEFiles(repo *git.Repository, commitHash plumbing.Hash) (_ []repoFile, err error) {
+	defer derrors.Wrap(&err, "repoCVEFiles(%s)", commitHash)
+
+	commit, err := repo.CommitObject(commitHash)
+	if err != nil {
+		return nil, fmt.Errorf("CommitObject: %w", err)
+	}
+	root, err := repo.TreeObject(commit.TreeHash)
+	if err != nil {
+		return nil, fmt.Errorf("TreeObject: %v", err)
+	}
+	files, err := walkFiles(repo, root, "", nil)
+	if err != nil {
+		return nil, err
+	}
+	sort.Slice(files, func(i, j int) bool {
+		return files[i].filename < files[j].filename
+	})
+	return files, nil
+}
+
+// walkFiles collects CVE files from a repo tree.
+func walkFiles(repo *git.Repository, tree *object.Tree, dirpath string, files []repoFile) ([]repoFile, error) {
+	for _, e := range tree.Entries {
+		if e.Mode == filemode.Dir {
+			dir, err := repo.TreeObject(e.Hash)
+			if err != nil {
+				return nil, err
+			}
+			files, err = walkFiles(repo, dir, path.Join(dirpath, e.Name), files)
+			if err != nil {
+				return nil, err
+			}
+		} else if isCVEFilename(e.Name) {
+			files = append(files, repoFile{
+				dirpath:  dirpath,
+				filename: e.Name,
+				hash:     e.Hash,
+			})
+		}
+	}
+	return files, nil
+}
+
+// blobReader returns a reader to the blob with the given hash.
+func blobReader(repo *git.Repository, hash plumbing.Hash) (io.Reader, error) {
+	blob, err := repo.BlobObject(hash)
+	if err != nil {
+		return nil, err
+	}
+	return blob.Reader()
+}
+
+// hashFromString converts a hex string into a Hash.
+// Unlike plumbing.NewHash, it reports errors.
+func hashFromString(s string) (plumbing.Hash, error) {
+	b, err := hex.DecodeString(s)
+	if err != nil {
+		return plumbing.ZeroHash, err
+	}
+	var h plumbing.Hash
+	copy(h[:], b)
+	return h, nil
+}
+
+// idFromFilename extracts the CVE ID from its filename.
+func idFromFilename(name string) string {
+	return strings.TrimSuffix(path.Base(name), path.Ext(name))
+}
+
+// isCVEFilename reports whether name is the basename of a CVE file.
+func isCVEFilename(name string) bool {
+	return strings.HasPrefix(name, "CVE-") && path.Ext(name) == ".json"
+}
diff --git a/internal/worker/update_test.go b/internal/worker/update_test.go
new file mode 100644
index 0000000..f68f91c
--- /dev/null
+++ b/internal/worker/update_test.go
@@ -0,0 +1,95 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package worker
+
+import (
+	"context"
+	"encoding/json"
+	"strings"
+	"testing"
+
+	"github.com/go-git/go-git/v5"
+	"github.com/go-git/go-git/v5/plumbing"
+	"github.com/go-git/go-git/v5/plumbing/object"
+	"github.com/google/go-cmp/cmp"
+	"github.com/google/go-cmp/cmp/cmpopts"
+	"golang.org/x/vuln/internal/cveschema"
+	"golang.org/x/vuln/internal/worker/store"
+)
+
+func TestRepoCVEFiles(t *testing.T) {
+	repo, err := readTxtarRepo("testdata/basic.txtar")
+	if err != nil {
+		t.Fatal(err)
+	}
+	h, err := headHash(repo)
+	if err != nil {
+		t.Fatal(err)
+	}
+	got, err := repoCVEFiles(repo, h)
+	if err != nil {
+		t.Fatal(err)
+	}
+	want := []repoFile{
+		{dirpath: "2021/0xxx", filename: "CVE-2021-0001.json"},
+		{dirpath: "2021/0xxx", filename: "CVE-2021-0010.json"},
+		{dirpath: "2021/1xxx", filename: "CVE-2021-1384.json"},
+	}
+
+	if diff := cmp.Diff(want, got, cmp.AllowUnexported(repoFile{}), cmpopts.IgnoreFields(repoFile{}, "hash")); diff != "" {
+		t.Errorf("mismatch (-want, +got):\n%s", diff)
+	}
+}
+
+func TestDoUpdate(t *testing.T) {
+	ctx := context.Background()
+	repo, err := readTxtarRepo("testdata/basic.txtar")
+	if err != nil {
+		t.Fatal(err)
+	}
+	mstore := store.NewMemStore()
+	h, err := headHash(repo)
+	if err != nil {
+		t.Fatal(err)
+	}
+	needsIssue := func(cve *cveschema.CVE) (bool, error) {
+		return strings.HasSuffix(cve.ID, "0001"), nil
+	}
+	if err := doUpdate(ctx, repo, h, mstore, needsIssue); err != nil {
+		t.Fatal(err)
+	}
+	ref, err := repo.Reference(plumbing.HEAD, true)
+	if err != nil {
+		t.Fatal(err)
+	}
+	r1 := newTestCVERecord(t, repo, ref, "2021/0xxx/CVE-2021-0001.json", store.TriageStateNeedsIssue)
+	r10 := newTestCVERecord(t, repo, ref, "2021/0xxx/CVE-2021-0010.json", store.TriageStateNoActionNeeded)
+	r384 := newTestCVERecord(t, repo, ref, "2021/1xxx/CVE-2021-1384.json", store.TriageStateNoActionNeeded)
+	wantRecords := map[string]*store.CVERecord{
+		"CVE-2021-0001": r1,
+		"CVE-2021-0010": r10,
+		"CVE-2021-1384": r384,
+	}
+	diff := cmp.Diff(wantRecords, mstore.CVERecords())
+	if diff != "" {
+		t.Errorf("mismatch (-want, +got):\n%s", diff)
+	}
+}
+
+func newTestCVERecord(t *testing.T, repo *git.Repository, ref *plumbing.Reference, path string, ts store.TriageState) *store.CVERecord {
+	blob := findBlob(t, repo, path)
+	r := store.NewCVERecord(readCVE(t, blob), path, blob.Hash.String())
+	r.CommitHash = ref.Hash().String()
+	r.TriageState = ts
+	return r
+}
+
+func readCVE(t *testing.T, blob *object.Blob) *cveschema.CVE {
+	var cve cveschema.CVE
+	if err := json.Unmarshal(readBlob(t, blob), &cve); err != nil {
+		t.Fatal(err)
+	}
+	return &cve
+}