internal/vulndbreqs: restart iterator when out of quota
When we run out of quota for for our requests to the logging API,
wait a little and then try again.
The Compute function is exported in anticipation of the next CL.
Change-Id: I84a3a01d38d49bfd6aa5aa8a55ea16e33a3ecf37
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/482158
Reviewed-by: Zvonimir Pavlinovic <zpavlinovic@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/vulndbreqs/compute.go b/internal/vulndbreqs/compute.go
index 6e80cfb..7a54a71 100644
--- a/internal/vulndbreqs/compute.go
+++ b/internal/vulndbreqs/compute.go
@@ -17,7 +17,6 @@
"golang.org/x/exp/slices"
"golang.org/x/pkgsite-metrics/internal/bigquery"
"golang.org/x/pkgsite-metrics/internal/log"
- "golang.org/x/time/rate"
"google.golang.org/api/iterator"
)
@@ -41,7 +40,7 @@
for d := startDate; d.Before(today); d = d.AddDays(1) {
if !have[d] {
// compute excludes both the start and end dates.
- rcs, err := compute(ctx, vulndbBucketProjectID, d.AddDays(-1), d.AddDays(1), 0)
+ rcs, err := Compute(ctx, vulndbBucketProjectID, d.AddDays(-1), d.AddDays(1), 0)
if err != nil {
return err
}
@@ -61,11 +60,11 @@
return nil
}
-// compute queries the vulndb load balancer logs for all
+// Compute queries the vulndb load balancer logs for all
// vuln DB requests between the given dates, exclusive of both.
// It returns request counts for each date, sorted from newest to oldest.
// If limit is positive, it reads no more than limit entries from the log (for testing only).
-func compute(ctx context.Context, vulndbBucketProjectID string, fromDate, toDate civil.Date, limit int) ([]*RequestCount, error) {
+func Compute(ctx context.Context, vulndbBucketProjectID string, fromDate, toDate civil.Date, limit int) ([]*RequestCount, error) {
log.Infof(ctx, "computing request counts from %s to %s", fromDate, toDate)
client, err := logadmin.NewClient(ctx, vulndbBucketProjectID)
if err != nil {
@@ -75,7 +74,7 @@
counts := map[civil.Date]int{}
- it := client.Entries(ctx,
+ it := newEntryIterator(ctx, client,
// This filter has three sections, marked with blank lines. It is more
// efficient to do as much filtering as possible in the logging API
// query, rather than in code.
@@ -94,7 +93,7 @@
// times. It formats the times as dates like "2022-08-10". We want
// the filter to be exclusive on both ends, so we use "<" for the end date,
// and add one day to the start date.
- logadmin.Filter(`
+ `
resource.type=http_load_balancer
resource.labels.forwarding_rule_name=go-vulndb-lb-forwarding-rule
resource.labels.url_map_name=go-vulndb-lb
@@ -107,18 +106,12 @@
-httpRequest.requestUrl:"https://vuln.go.dev/ID/"
timestamp>=`+fromDate.AddDays(1).String()+`
- timestamp<`+toDate.String()))
- // Using a large page size results in fewer requests to the logging API.
- // 1000 is the maximum allowed.
- const pageSize = 1000
- it.PageInfo().MaxSize = pageSize
+ timestamp<`+toDate.String())
// Count each log entry we see, bucketing by date.
// The timestamps are in order from oldest to newest
// (https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.v2#google.logging.v2.ListLogEntriesRequest).
var logErr error
n := 1
- const requestsPerMinuteQuota = 60 // estimated log read quota
- lim := rate.NewLimiter(requestsPerMinuteQuota/60.0, 1)
for {
entry, err := it.Next()
if err != nil {
@@ -128,17 +121,10 @@
break
}
counts[civil.DateOf(entry.Timestamp)]++
- // Assume one request per pageSize items.
- // Throttle to avoid exceeding quota.
n++
if limit > 0 && n > limit {
break
}
- if n%pageSize == 0 {
- if err := lim.Wait(ctx); err != nil {
- return nil, err
- }
- }
}
// Convert the counts map to a slice of VulnDBRquestCounts.
diff --git a/internal/vulndbreqs/compute_test.go b/internal/vulndbreqs/compute_test.go
index 28eca59..40c2f5b 100644
--- a/internal/vulndbreqs/compute_test.go
+++ b/internal/vulndbreqs/compute_test.go
@@ -25,7 +25,7 @@
today := civil.DateOf(time.Now())
// Compute yesterday's counts, up to 10 log entries.
// Assume there are more than ten requests a day.
- got, err := compute(context.Background(), projID, today.AddDays(-2), today, 10)
+ got, err := Compute(context.Background(), projID, today.AddDays(-2), today, 10)
if err != nil {
t.Fatal(err)
}
diff --git a/internal/vulndbreqs/iterator.go b/internal/vulndbreqs/iterator.go
new file mode 100644
index 0000000..8175a8d
--- /dev/null
+++ b/internal/vulndbreqs/iterator.go
@@ -0,0 +1,65 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vulndbreqs
+
+import (
+ "context"
+ "time"
+
+ "cloud.google.com/go/logging"
+ "cloud.google.com/go/logging/logadmin"
+ "golang.org/x/pkgsite-metrics/internal/log"
+ "google.golang.org/api/iterator"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// entryIterator wraps a logadmin.EntryIterator to handle quota limits.
+// When it sees a ResourceExhausted error, it waits a few seconds to
+// get more quota.
+type entryIterator struct {
+ ctx context.Context
+ client *logadmin.Client
+ filter string
+ it *logadmin.EntryIterator
+ token string
+}
+
+func newEntryIterator(ctx context.Context, client *logadmin.Client, filter string) *entryIterator {
+ return &entryIterator{ctx: ctx, client: client, filter: filter}
+}
+
+func (it *entryIterator) Next() (*logging.Entry, error) {
+ for {
+ if it.it == nil {
+ it.it = it.client.Entries(it.ctx, logadmin.Filter(it.filter))
+ pi := it.it.PageInfo()
+ // Using a large page size results in fewer requests to the logging API.
+ // 1000 is the maximum allowed.
+ pi.MaxSize = 1000
+ // If we remembered a page token, start the iterator with it.
+ // See [google.golang.org/api/iterator.PageInfo].
+ if it.token != "" {
+ pi.Token = it.token
+ }
+ }
+ entry, err := it.it.Next()
+ if err == iterator.Done {
+ return nil, err
+ }
+ if s, ok := status.FromError(err); ok && s.Code() == codes.ResourceExhausted {
+ // We ran out of quota. Wait a little and try again.
+ log.Infof(it.ctx, "entryIterator: got ResourceExhausted, sleeping...")
+ time.Sleep(10 * time.Second)
+ log.Infof(it.ctx, "entryIterator: retrying")
+ it.token = it.it.PageInfo().Token
+ // We can't continue with this iterator, so create a new one at the
+ // top of the loop.
+ it.it = nil
+ continue
+ }
+ return entry, nil
+ }
+}