internal/vulndbreqs: restart iterator when out of quota

When we run out of quota for for our requests to the logging API,
wait a little and then try again.

The Compute function is exported in anticipation of the next CL.

Change-Id: I84a3a01d38d49bfd6aa5aa8a55ea16e33a3ecf37
Reviewed-on: https://go-review.googlesource.com/c/pkgsite-metrics/+/482158
Reviewed-by: Zvonimir Pavlinovic <zpavlinovic@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/vulndbreqs/compute.go b/internal/vulndbreqs/compute.go
index 6e80cfb..7a54a71 100644
--- a/internal/vulndbreqs/compute.go
+++ b/internal/vulndbreqs/compute.go
@@ -17,7 +17,6 @@
 	"golang.org/x/exp/slices"
 	"golang.org/x/pkgsite-metrics/internal/bigquery"
 	"golang.org/x/pkgsite-metrics/internal/log"
-	"golang.org/x/time/rate"
 	"google.golang.org/api/iterator"
 )
 
@@ -41,7 +40,7 @@
 	for d := startDate; d.Before(today); d = d.AddDays(1) {
 		if !have[d] {
 			// compute excludes both the start and end dates.
-			rcs, err := compute(ctx, vulndbBucketProjectID, d.AddDays(-1), d.AddDays(1), 0)
+			rcs, err := Compute(ctx, vulndbBucketProjectID, d.AddDays(-1), d.AddDays(1), 0)
 			if err != nil {
 				return err
 			}
@@ -61,11 +60,11 @@
 	return nil
 }
 
-// compute queries the vulndb load balancer logs for all
+// Compute queries the vulndb load balancer logs for all
 // vuln DB requests between the given dates, exclusive of both.
 // It returns request counts for each date, sorted from newest to oldest.
 // If limit is positive, it reads no more than limit entries from the log (for testing only).
-func compute(ctx context.Context, vulndbBucketProjectID string, fromDate, toDate civil.Date, limit int) ([]*RequestCount, error) {
+func Compute(ctx context.Context, vulndbBucketProjectID string, fromDate, toDate civil.Date, limit int) ([]*RequestCount, error) {
 	log.Infof(ctx, "computing request counts from %s to %s", fromDate, toDate)
 	client, err := logadmin.NewClient(ctx, vulndbBucketProjectID)
 	if err != nil {
@@ -75,7 +74,7 @@
 
 	counts := map[civil.Date]int{}
 
-	it := client.Entries(ctx,
+	it := newEntryIterator(ctx, client,
 		// This filter has three sections, marked with blank lines. It is more
 		// efficient to do as much filtering as possible in the logging API
 		// query, rather than in code.
@@ -94,7 +93,7 @@
 		// times. It formats the times as dates like "2022-08-10". We want
 		// the filter to be exclusive on both ends, so we use "<" for the end date,
 		// and add one day to the start date.
-		logadmin.Filter(`
+		`
 		resource.type=http_load_balancer
 		resource.labels.forwarding_rule_name=go-vulndb-lb-forwarding-rule
 		resource.labels.url_map_name=go-vulndb-lb
@@ -107,18 +106,12 @@
 		-httpRequest.requestUrl:"https://vuln.go.dev/ID/"
 
 		timestamp>=`+fromDate.AddDays(1).String()+`
-		timestamp<`+toDate.String()))
-	// Using a large page size results in fewer requests to the logging API.
-	// 1000 is the maximum allowed.
-	const pageSize = 1000
-	it.PageInfo().MaxSize = pageSize
+		timestamp<`+toDate.String())
 	// Count each log entry we see, bucketing by date.
 	// The timestamps are in order from oldest to newest
 	// (https://cloud.google.com/logging/docs/reference/v2/rpc/google.logging.v2#google.logging.v2.ListLogEntriesRequest).
 	var logErr error
 	n := 1
-	const requestsPerMinuteQuota = 60 // estimated log read quota
-	lim := rate.NewLimiter(requestsPerMinuteQuota/60.0, 1)
 	for {
 		entry, err := it.Next()
 		if err != nil {
@@ -128,17 +121,10 @@
 			break
 		}
 		counts[civil.DateOf(entry.Timestamp)]++
-		// Assume one request per pageSize items.
-		// Throttle to avoid exceeding quota.
 		n++
 		if limit > 0 && n > limit {
 			break
 		}
-		if n%pageSize == 0 {
-			if err := lim.Wait(ctx); err != nil {
-				return nil, err
-			}
-		}
 	}
 
 	// Convert the counts map to a slice of VulnDBRquestCounts.
diff --git a/internal/vulndbreqs/compute_test.go b/internal/vulndbreqs/compute_test.go
index 28eca59..40c2f5b 100644
--- a/internal/vulndbreqs/compute_test.go
+++ b/internal/vulndbreqs/compute_test.go
@@ -25,7 +25,7 @@
 	today := civil.DateOf(time.Now())
 	// Compute yesterday's counts, up to 10 log entries.
 	// Assume there are more than ten requests a day.
-	got, err := compute(context.Background(), projID, today.AddDays(-2), today, 10)
+	got, err := Compute(context.Background(), projID, today.AddDays(-2), today, 10)
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/internal/vulndbreqs/iterator.go b/internal/vulndbreqs/iterator.go
new file mode 100644
index 0000000..8175a8d
--- /dev/null
+++ b/internal/vulndbreqs/iterator.go
@@ -0,0 +1,65 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package vulndbreqs
+
+import (
+	"context"
+	"time"
+
+	"cloud.google.com/go/logging"
+	"cloud.google.com/go/logging/logadmin"
+	"golang.org/x/pkgsite-metrics/internal/log"
+	"google.golang.org/api/iterator"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+// entryIterator wraps a logadmin.EntryIterator to handle quota limits.
+// When it sees a ResourceExhausted error, it waits a few seconds to
+// get more quota.
+type entryIterator struct {
+	ctx    context.Context
+	client *logadmin.Client
+	filter string
+	it     *logadmin.EntryIterator
+	token  string
+}
+
+func newEntryIterator(ctx context.Context, client *logadmin.Client, filter string) *entryIterator {
+	return &entryIterator{ctx: ctx, client: client, filter: filter}
+}
+
+func (it *entryIterator) Next() (*logging.Entry, error) {
+	for {
+		if it.it == nil {
+			it.it = it.client.Entries(it.ctx, logadmin.Filter(it.filter))
+			pi := it.it.PageInfo()
+			// Using a large page size results in fewer requests to the logging API.
+			// 1000 is the maximum allowed.
+			pi.MaxSize = 1000
+			// If we remembered a page token, start the iterator with it.
+			// See [google.golang.org/api/iterator.PageInfo].
+			if it.token != "" {
+				pi.Token = it.token
+			}
+		}
+		entry, err := it.it.Next()
+		if err == iterator.Done {
+			return nil, err
+		}
+		if s, ok := status.FromError(err); ok && s.Code() == codes.ResourceExhausted {
+			// We ran out of quota. Wait a little and try again.
+			log.Infof(it.ctx, "entryIterator: got ResourceExhausted, sleeping...")
+			time.Sleep(10 * time.Second)
+			log.Infof(it.ctx, "entryIterator: retrying")
+			it.token = it.it.PageInfo().Token
+			// We can't continue with this iterator, so create a new one at the
+			// top of the loop.
+			it.it = nil
+			continue
+		}
+		return entry, nil
+	}
+}