internal/worker: server improvements
Reorganize handlUpdateAndIssues so that update doesn't write to the
ResponseWriter before issues can set a status.
Add a "force" query parameter to update.
Throttle issue creation speed and number.
Improve issue logging.
Get GitHub access token from environment variable.
Change-Id: I8e1ef8444bbad8535f99e2ae3bda34d1b62fbfd2
Reviewed-on: https://go-review.googlesource.com/c/vuln/+/372180
Trust: Jonathan Amsterdam <jba@google.com>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Julie Qiu <julie@golang.org>
diff --git a/cmd/worker/main.go b/cmd/worker/main.go
index 7b1c193..b10f177 100644
--- a/cmd/worker/main.go
+++ b/cmd/worker/main.go
@@ -77,6 +77,8 @@
die("%v", err)
}
cfg.GitHubAccessToken = strings.TrimSpace(string(data))
+ } else {
+ cfg.GitHubAccessToken = os.Getenv("VULN_GITHUB_ACCESS_TOKEN")
}
if err := cfg.Validate(); err != nil {
dieWithUsage("%v", err)
diff --git a/internal/worker/server.go b/internal/worker/server.go
index 9effe89..ddb8b3d 100644
--- a/internal/worker/server.go
+++ b/internal/worker/server.go
@@ -235,18 +235,27 @@
}
func (s *Server) handleUpdate(w http.ResponseWriter, r *http.Request) error {
+ err := s.doUpdate(r)
+ if err == nil {
+ fmt.Fprintf(w, "Update succeeded.\n")
+ }
+ return err
+}
+
+func (s *Server) doUpdate(r *http.Request) error {
if r.Method != http.MethodPost {
return &serverError{
status: http.StatusMethodNotAllowed,
err: fmt.Errorf("%s required", http.MethodPost),
}
}
- err := UpdateCommit(r.Context(), gitrepo.CVEListRepoURL, "HEAD", s.cfg.Store, pkgsiteURL, false /* force */)
- if cerr := new(CheckUpdateError); errors.As(err, &cerr) {
- return fmt.Errorf("%w; use -force on the command line to override", cerr)
+ force := false
+ if f := r.FormValue("force"); f == "true" {
+ force = true
}
- if err == nil {
- fmt.Fprintf(w, "Update succeeded.\n")
+ err := UpdateCommit(r.Context(), gitrepo.CVEListRepoURL, "HEAD", s.cfg.Store, pkgsiteURL, force)
+ if cerr := new(CheckUpdateError); errors.As(err, &cerr) {
+ return fmt.Errorf("%w; use /update?force=true to override", cerr)
}
return err
}
@@ -264,7 +273,8 @@
err: errors.New("issue creation disabled"),
}
}
- limit := 0
+ // Unless explicitly asked to, don't create more than a few issues.
+ limit := 10
if sl := r.FormValue("limit"); sl != "" {
var err error
limit, err = strconv.Atoi(sl)
@@ -275,11 +285,12 @@
}
}
}
+ log.Info(r.Context(), "creating issues", event.Int64("limit", int64(limit)))
return CreateIssues(r.Context(), s.cfg.Store, s.issueClient, limit)
}
func (s *Server) handleUpdateAndIssues(w http.ResponseWriter, r *http.Request) error {
- if err := s.handleUpdate(w, r); err != nil {
+ if err := s.doUpdate(r); err != nil {
return err
}
return s.handleIssues(w, r)
diff --git a/internal/worker/update.go b/internal/worker/update.go
index 072120d..b0f6d0e 100644
--- a/internal/worker/update.go
+++ b/internal/worker/update.go
@@ -76,11 +76,17 @@
if err != nil {
log.Error(ctx, "update failed", event.Value("error", err))
} else {
- nProcessed := int64(0)
+ var nProcessed, nAdded, nModified int64
if ur != nil {
nProcessed = int64(ur.NumProcessed)
+ nAdded = int64(ur.NumAdded)
+ nModified = int64(ur.NumModified)
}
- log.Info(ctx, "update succeeded", event.Int64("numProcessed", nProcessed))
+ log.Info(ctx, "update succeeded",
+ event.String("commit", u.commitHash.String()),
+ event.Int64("processed", nProcessed),
+ event.Int64("added", nAdded),
+ event.Int64("modified", nModified))
}
}()
diff --git a/internal/worker/worker.go b/internal/worker/worker.go
index be29848..3c24a27 100644
--- a/internal/worker/worker.go
+++ b/internal/worker/worker.go
@@ -20,6 +20,7 @@
"github.com/go-git/go-git/v5/plumbing"
"golang.org/x/exp/event"
"golang.org/x/sync/errgroup"
+ "golang.org/x/time/rate"
vulnc "golang.org/x/vuln/client"
"golang.org/x/vuln/internal/cveschema"
"golang.org/x/vuln/internal/derrors"
@@ -172,14 +173,24 @@
return cveIDs, nil
}
+// Limit GitHub issue creation requests to this many per second.
+const issueQPS = 1
+
+// The limiter used to throttle pkgsite requests.
+// The second argument to rate.NewLimiter is the burst, which
+// basically lets you exceed the rate briefly.
+var issueRateLimiter = rate.NewLimiter(rate.Every(time.Duration(1000/float64(issueQPS))*time.Millisecond), 1)
+
func CreateIssues(ctx context.Context, st store.Store, ic IssueClient, limit int) (err error) {
derrors.Wrap(&err, "CreateIssues(destination: %s)", ic.Destination())
- log.Info(ctx, "CreateIssues starting", event.String("destination", ic.Destination()))
needsIssue, err := st.ListCVERecordsWithTriageState(ctx, store.TriageStateNeedsIssue)
if err != nil {
return err
}
+ log.Info(ctx, "CreateIssues starting",
+ event.String("destination", ic.Destination()),
+ event.Int64("needsIssue", int64(len(needsIssue))))
numCreated := int64(0)
for _, r := range needsIssue {
if limit > 0 && int(numCreated) >= limit {
@@ -203,6 +214,9 @@
Body: body,
Labels: []string{"Needs Triage"},
}
+ if err := issueRateLimiter.Wait(ctx); err != nil {
+ return err
+ }
num, err := ic.CreateIssue(ctx, iss)
if err != nil {
return fmt.Errorf("creating issue for %s: %w", r.ID, err)