internal/goreviews: do not collect rejected changes

This should in principle get rid of the OOM for large repositories.

The change requires a refactoring where application of predicates and
collection of predicate changes now explicitly takes predicates as
inputs.

Fixes golang/oscar#40

Change-Id: Idb5f4de9d8acc8fdf7314d39b87e09cfd5520777
Reviewed-on: https://go-review.googlesource.com/c/oscar/+/622915
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Ian Lance Taylor <iant@google.com>
diff --git a/internal/goreviews/changes.go b/internal/goreviews/changes.go
index 76f0e29..2863c98 100644
--- a/internal/goreviews/changes.go
+++ b/internal/goreviews/changes.go
@@ -27,8 +27,6 @@
 
 // New returns a new Changes to manage the Go dashboard.
 func New(lg *slog.Logger, client *gerrit.Client, projects []string) *Changes {
-	addPredicatesOnce()
-
 	return &Changes{
 		slog:     lg,
 		client:   client,
diff --git a/internal/goreviews/collect.go b/internal/goreviews/collect.go
index 618435d..b3fe060 100644
--- a/internal/goreviews/collect.go
+++ b/internal/goreviews/collect.go
@@ -35,15 +35,11 @@
 		collectAccounts(client, accounts, chAccount)
 	}()
 
-	var changes []*gerrit.Change
+	// Collect account data first.
 	for _, project := range projects {
 		for _, changeFn := range client.ChangeNumbers(project) {
-			gchange := changeFn()
-
-			changes = append(changes, gchange)
-
 			select {
-			case chAccount <- gchange:
+			case chAccount <- changeFn():
 			case <-ctx.Done():
 				close(chAccount)
 				wg.Wait()
@@ -57,6 +53,33 @@
 
 	// We have all the account data, so we can now apply predicates.
 
+	// We collect all the changes but try to skip those that will be
+	// rejected. Otherwise, the number of changes to which we apply
+	// the predicates can get too big, causing OOM for large projects.
+	// Note that we cannot do this in the above loop since rejection
+	// might depend on the account data.
+	var changes []*gerrit.Change
+	rejs := append(reviews.Rejects(), rejects...)
+	grc := &reviews.GerritReviewClient{GClient: client, Accounts: accounts}
+	for _, project := range projects {
+		for _, changeFn := range client.ChangeNumbers(project) {
+			gchange := changeFn()
+
+			grchange := &reviews.GerritChange{
+				Client: grc,
+				Change: gchange,
+			}
+			_, ok, err := reviews.ApplyPredicates(goChange{grchange}, nil, rejs)
+			if err != nil {
+				lg.Error("error applying rejections", "change", grchange.ID(), "err", err)
+				continue
+			}
+			if ok {
+				changes = append(changes, gchange)
+			}
+		}
+	}
+
 	// Convert to reviews.GerritChange, which implements review.Change.
 	// We wrap GerritChange ourselves to reimplement the Author method.
 	// Create an iterator we can pass to reviews.CollectChangePreds.
@@ -68,7 +91,8 @@
 		}
 	}
 
-	cps := reviews.CollectChangePreds(ctx, lg, toReviews)
+	preds := append(reviews.Predicates(), predicates...)
+	cps := reviews.CollectChangePreds(ctx, lg, toReviews, preds, nil) // we already applied rejections
 
 	lg.Info("finished gathering change information", "duration", time.Since(start))
 
diff --git a/internal/goreviews/collect_test.go b/internal/goreviews/collect_test.go
index a68da51..d305cf6 100644
--- a/internal/goreviews/collect_test.go
+++ b/internal/goreviews/collect_test.go
@@ -16,8 +16,6 @@
 )
 
 func TestCollectChanges(t *testing.T) {
-	addPredicatesOnce()
-
 	check := testutil.Checker(t)
 
 	lg := testutil.Slogger(t)
diff --git a/internal/goreviews/pred.go b/internal/goreviews/pred.go
index a02084d..7a6fb06 100644
--- a/internal/goreviews/pred.go
+++ b/internal/goreviews/pred.go
@@ -7,7 +7,6 @@
 import (
 	"slices"
 	"strings"
-	"sync"
 
 	"golang.org/x/oscar/internal/gerrit"
 	"golang.org/x/oscar/internal/reviews"
@@ -170,10 +169,3 @@
 	}
 	return false, nil
 }
-
-// addPredicatesOnce adds the Go project predicates to the
-// general predicates, once.
-var addPredicatesOnce = sync.OnceFunc(func() {
-	reviews.AddPredicates(predicates)
-	reviews.AddRejects(rejects)
-})
diff --git a/internal/reviews/collect.go b/internal/reviews/collect.go
index 81d1eeb..e3a9fd2 100644
--- a/internal/reviews/collect.go
+++ b/internal/reviews/collect.go
@@ -16,10 +16,12 @@
 	"sync"
 )
 
-// CollectChangePreds reads [Change] values from an iterator and
-// collects them into a slice of [ChangePreds] values.
+// CollectChangePreds reads [Change] values from an iterator,
+// applies predicates to the values, and collects the results
+// into a slice of [ChangePreds] values, skipping changes that
+// are rejected.
 // The slice is sorted by the predicate default values.
-func CollectChangePreds(ctx context.Context, lg *slog.Logger, it iter.Seq[Change]) []ChangePreds {
+func CollectChangePreds(ctx context.Context, lg *slog.Logger, it iter.Seq[Change], predicates []Predicate, rejects []Reject) []ChangePreds {
 	// Applying predicates can be time consuming,
 	// and there can be a lot of changes,
 	// so shard the work.
@@ -46,7 +48,7 @@
 		go func() {
 			defer wg.Done()
 			for change := range chIn {
-				cp, ok, err := ApplyPredicates(change)
+				cp, ok, err := ApplyPredicates(change, predicates, rejects)
 				if err != nil {
 					// Errors are assumed to be
 					// non-critical. Just log them.
diff --git a/internal/reviews/collect_test.go b/internal/reviews/collect_test.go
index 6507d92..1b352d5 100644
--- a/internal/reviews/collect_test.go
+++ b/internal/reviews/collect_test.go
@@ -36,7 +36,7 @@
 	}
 
 	ctx := context.Background()
-	cps := CollectChangePreds(ctx, testutil.Slogger(t), changes)
+	cps := CollectChangePreds(ctx, testutil.Slogger(t), changes, Predicates(), Rejects())
 	if len(cps) != 1 {
 		t.Errorf("CollectChangePreds returned %d entries, want 1", len(cps))
 	}
diff --git a/internal/reviews/pred.go b/internal/reviews/pred.go
index 9e610e0..c80489c 100644
--- a/internal/reviews/pred.go
+++ b/internal/reviews/pred.go
@@ -6,7 +6,7 @@
 
 import (
 	"fmt"
-	"sync"
+	"slices"
 )
 
 // A Predicate is a categorization of a [Change].
@@ -39,19 +39,12 @@
 type Reject Predicate
 
 // ApplyPredicates takes a [Change] and applies predicates to it.
-// The bool result reports whether the change is reviewable;
-// this will be false if the change should not be reviewed,
-// for example because it has already been committed.
-func ApplyPredicates(change Change) (ChangePreds, bool, error) {
-	predicatesLock.Lock()
-	// The predicates and rejects slices are append-only,
-	// so a top-level copy is safe to use.
-	p := predicates
-	r := rejects
-	predicatesLock.Unlock()
-
-	for i := range r {
-		applies, err := r[i].Applies(change)
+// The reject predicates are used to determine if the change is
+// reviewable; the bool result will be false if the change should
+// not be reviewed, for example because it has already been committed.
+func ApplyPredicates(change Change, predicates []Predicate, rejects []Reject) (ChangePreds, bool, error) {
+	for i := range rejects {
+		applies, err := rejects[i].Applies(change)
 		if err != nil {
 			return ChangePreds{}, false, err
 		}
@@ -61,8 +54,8 @@
 	}
 
 	var preds []*Predicate
-	for i := range p {
-		pred := &p[i]
+	for i := range predicates {
+		pred := &predicates[i]
 		applies, err := pred.Applies(change)
 		if err != nil {
 			return ChangePreds{}, false, err
@@ -80,20 +73,6 @@
 	return cp, true, nil
 }
 
-// AddPredicates adds more [Predicate] values for a project.
-func AddPredicates(newPreds []Predicate) {
-	predicatesLock.Lock()
-	defer predicatesLock.Unlock()
-	predicates = append(predicates, newPreds...)
-}
-
-// AddRejects adds more [Reject] values for a project.
-func AddRejects(newRejects []Reject) {
-	predicatesLock.Lock()
-	defer predicatesLock.Unlock()
-	rejects = append(rejects, newRejects...)
-}
-
 // Some [Predicate] default scores.
 const (
 	ScoreImportant     = 10  // change is important
@@ -102,8 +81,11 @@
 	ScoreUnimportant   = -10 // change is not important
 )
 
-// predicatesLock protects predicates and rejectors.
-var predicatesLock sync.Mutex
+// Predicates returns a list of non-reject predicates
+// that apply to a change.
+func Predicates() []Predicate {
+	return slices.Clone(predicates)
+}
 
 // predicates is the list of [Predicate] values that we apply to a change.
 var predicates = []Predicate{
@@ -185,6 +167,12 @@
 	return true, nil
 }
 
+// Rejects returns a list of reject predicates
+// that apply to a change.
+func Rejects() []Reject {
+	return slices.Clone(rejects)
+}
+
 // rejects is the list of Reject values that we apply to a change.
 var rejects = []Reject{
 	{
diff --git a/internal/reviews/pred_test.go b/internal/reviews/pred_test.go
index 5c13a0c..b74adf2 100644
--- a/internal/reviews/pred_test.go
+++ b/internal/reviews/pred_test.go
@@ -14,7 +14,7 @@
 	const file = "testdata/gerritchange.txt"
 	const num = 1
 	change := loadTestChange(t, gc, file, 1)
-	sc, ok, err := ApplyPredicates(change)
+	sc, ok, err := ApplyPredicates(change, Predicates(), Rejects())
 	if err != nil {
 		t.Fatalf("%s: %d: ApplyPredicates returned unexpected error %v", file, num, err)
 	} else if !ok {
@@ -35,7 +35,7 @@
 	const file = "testdata/gerritchange.txt"
 	const num = 2
 	change := loadTestChange(t, gc, file, num)
-	_, ok, err := ApplyPredicates(change)
+	_, ok, err := ApplyPredicates(change, Predicates(), Rejects())
 	if err != nil {
 		t.Fatalf("%s: %d: ApplyPredicates returned unexpected error %v", file, num, err)
 	} else if ok {