internal/worker: server improvements

Reorganize handlUpdateAndIssues so that update doesn't write to the
ResponseWriter before issues can set a status.

Add a "force" query parameter to update.

Throttle issue creation speed and number.

Improve issue logging.

Get GitHub access token from environment variable.

Change-Id: I8e1ef8444bbad8535f99e2ae3bda34d1b62fbfd2
Reviewed-on: https://go-review.googlesource.com/c/vuln/+/372180
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 7b1c193..b10f177 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -77,6 +77,8 @@
 			die("%v", err)
 		}
 		cfg.GitHubAccessToken = strings.TrimSpace(string(data))
+	} else {
+		cfg.GitHubAccessToken = os.Getenv("VULN_GITHUB_ACCESS_TOKEN")
 	}
 	if err := cfg.Validate(); err != nil {
 		dieWithUsage("%v", err)
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 9effe89..ddb8b3d 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -235,18 +235,27 @@
 }
 
 func (s *Server) handleUpdate(w http.ResponseWriter, r *http.Request) error {
+	err := s.doUpdate(r)
+	if err == nil {
+		fmt.Fprintf(w, "Update succeeded.\n")
+	}
+	return err
+}
+
+func (s *Server) doUpdate(r *http.Request) error {
 	if r.Method != http.MethodPost {
 		return &serverError{
 			status: http.StatusMethodNotAllowed,
 			err:    fmt.Errorf("%s required", http.MethodPost),
 		}
 	}
-	err := UpdateCommit(r.Context(), gitrepo.CVEListRepoURL, "HEAD", s.cfg.Store, pkgsiteURL, false /* force */)
-	if cerr := new(CheckUpdateError); errors.As(err, &cerr) {
-		return fmt.Errorf("%w; use -force on the command line to override", cerr)
+	force := false
+	if f := r.FormValue("force"); f == "true" {
+		force = true
 	}
-	if err == nil {
-		fmt.Fprintf(w, "Update succeeded.\n")
+	err := UpdateCommit(r.Context(), gitrepo.CVEListRepoURL, "HEAD", s.cfg.Store, pkgsiteURL, force)
+	if cerr := new(CheckUpdateError); errors.As(err, &cerr) {
+		return fmt.Errorf("%w; use /update?force=true to override", cerr)
 	}
 	return err
 }
@@ -264,7 +273,8 @@
 			err:    errors.New("issue creation disabled"),
 		}
 	}
-	limit := 0
+	// Unless explicitly asked to, don't create more than a few issues.
+	limit := 10
 	if sl := r.FormValue("limit"); sl != "" {
 		var err error
 		limit, err = strconv.Atoi(sl)
@@ -275,11 +285,12 @@
 			}
 		}
 	}
+	log.Info(r.Context(), "creating issues", event.Int64("limit", int64(limit)))
 	return CreateIssues(r.Context(), s.cfg.Store, s.issueClient, limit)
 }
 
 func (s *Server) handleUpdateAndIssues(w http.ResponseWriter, r *http.Request) error {
-	if err := s.handleUpdate(w, r); err != nil {
+	if err := s.doUpdate(r); err != nil {
 		return err
 	}
 	return s.handleIssues(w, r)
diff --git a/internal/worker/update.go b/internal/worker/update.go
index 072120d..b0f6d0e 100644
--- a/internal/worker/update.go
+++ b/internal/worker/update.go
@@ -76,11 +76,17 @@
 		if err != nil {
 			log.Error(ctx, "update failed", event.Value("error", err))
 		} else {
-			nProcessed := int64(0)
+			var nProcessed, nAdded, nModified int64
 			if ur != nil {
 				nProcessed = int64(ur.NumProcessed)
+				nAdded = int64(ur.NumAdded)
+				nModified = int64(ur.NumModified)
 			}
-			log.Info(ctx, "update succeeded", event.Int64("numProcessed", nProcessed))
+			log.Info(ctx, "update succeeded",
+				event.String("commit", u.commitHash.String()),
+				event.Int64("processed", nProcessed),
+				event.Int64("added", nAdded),
+				event.Int64("modified", nModified))
 		}
 	}()
 
diff --git a/internal/worker/worker.go b/internal/worker/worker.go
index be29848..3c24a27 100644
--- a/internal/worker/worker.go
+++ b/internal/worker/worker.go
@@ -20,6 +20,7 @@
 	"github.com/go-git/go-git/v5/plumbing"
 	"golang.org/x/exp/event"
 	"golang.org/x/sync/errgroup"
+	"golang.org/x/time/rate"
 	vulnc "golang.org/x/vuln/client"
 	"golang.org/x/vuln/internal/cveschema"
 	"golang.org/x/vuln/internal/derrors"
@@ -172,14 +173,24 @@
 	return cveIDs, nil
 }
 
+// Limit GitHub issue creation requests to this many per second.
+const issueQPS = 1
+
+// The limiter used to throttle pkgsite requests.
+// The second argument to rate.NewLimiter is the burst, which
+// basically lets you exceed the rate briefly.
+var issueRateLimiter = rate.NewLimiter(rate.Every(time.Duration(1000/float64(issueQPS))*time.Millisecond), 1)
+
 func CreateIssues(ctx context.Context, st store.Store, ic IssueClient, limit int) (err error) {
 	derrors.Wrap(&err, "CreateIssues(destination: %s)", ic.Destination())
 
-	log.Info(ctx, "CreateIssues starting", event.String("destination", ic.Destination()))
 	needsIssue, err := st.ListCVERecordsWithTriageState(ctx, store.TriageStateNeedsIssue)
 	if err != nil {
 		return err
 	}
+	log.Info(ctx, "CreateIssues starting",
+		event.String("destination", ic.Destination()),
+		event.Int64("needsIssue", int64(len(needsIssue))))
 	numCreated := int64(0)
 	for _, r := range needsIssue {
 		if limit > 0 && int(numCreated) >= limit {
@@ -203,6 +214,9 @@
 			Body:   body,
 			Labels: []string{"Needs Triage"},
 		}
+		if err := issueRateLimiter.Wait(ctx); err != nil {
+			return err
+		}
 		num, err := ic.CreateIssue(ctx, iss)
 		if err != nil {
 			return fmt.Errorf("creating issue for %s: %w", r.ID, err)