internal/commentfix: add function FixGitHubIssue

Add a function, [Fixer.FixGitHubIssue], which applies
fixes to the body and comments of a given GitHub issue
without affecting any future runs of [Fixer.Run].

It is used by the GitHub webhook response function
to immediately take action on a single issue.

This CL also makes a few other changes to support
this new function:
    - Refactor [Fixer.Run] and its tests to reuse code
    - Add a db to Fixer to allow it to use distributed locks

Change-Id: I26d3167f45b1656778551994f57d17ef66303c12
Reviewed-on: https://go-review.googlesource.com/c/oscar/+/613177
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/commentfix/fix.go b/internal/commentfix/fix.go
index 514301f..186f31b 100644
--- a/internal/commentfix/fix.go
+++ b/internal/commentfix/fix.go
@@ -20,8 +20,10 @@
 
 	"golang.org/x/oscar/internal/diff"
 	"golang.org/x/oscar/internal/github"
+	"golang.org/x/oscar/internal/storage"
 	"golang.org/x/oscar/internal/storage/timed"
 	"rsc.io/markdown"
+	"rsc.io/ordered"
 )
 
 // A Fixer rewrites issue texts and issue comments using a set of rules.
@@ -34,6 +36,7 @@
 //
 // TODO(rsc): Separate the GitHub logic more cleanly from the rewrite logic.
 type Fixer struct {
+	name      string
 	slog      *slog.Logger
 	github    *github.Client
 	watcher   *timed.Watcher[*github.Event]
@@ -41,6 +44,7 @@
 	projects  map[string]bool
 	edit      bool
 	timeLimit time.Time
+	db        storage.DB
 
 	stderrw io.Writer
 }
@@ -70,15 +74,19 @@
 // configured and applied to Markdown using [Fixer.Fix], but calling
 // [Fixer.Run] will panic.
 //
+// The db is the database used to store locks.
+//
 // The name is the handle by which the Fixer's “last position” is retrieved
 // across multiple program invocations; each differently configured
 // Fixer needs a different name.
-func New(lg *slog.Logger, gh *github.Client, name string) *Fixer {
+func New(lg *slog.Logger, gh *github.Client, db storage.DB, name string) *Fixer {
 	f := &Fixer{
+		name:      name,
 		slog:      lg,
 		github:    gh,
 		projects:  make(map[string]bool),
 		timeLimit: time.Now().Add(-30 * 24 * time.Hour),
+		db:        db,
 	}
 	f.init() // set f.slog if lg==nil
 	if gh != nil {
@@ -304,14 +312,15 @@
 		}
 		last = e.DBTime
 
-		a := f.newAction(e)
-		if a == nil {
-			continue
-		}
-		if err := f.runAction(ctx, a); err != nil {
+		applied, err := f.fix(ctx, e)
+		if err != nil {
 			// unreachable unless github error
 			return err
 		}
+		if !applied {
+			continue
+		}
+
 		if f.edit {
 			// Mark this one old right now, so that we don't consider editing it again.
 			f.watcher.MarkOld(e.DBTime)
@@ -320,7 +329,7 @@
 
 			if !testing.Testing() {
 				// unreachable in tests
-				time.Sleep(1 * time.Second)
+				time.Sleep(sleep)
 			}
 		}
 	}
@@ -338,6 +347,78 @@
 	return nil
 }
 
+// FixGitHubIssue applies rewrites to the issue body and comments of the
+// specified GitHub issue, following the same logic as [Fixer.Run].
+//
+// It requires that the Fixer's [github.Client] contain one or more events
+// for the issue.
+//
+// It does not affect the watcher used by [Fixer.Run] and can be run
+// concurrently with [Fixer.Run].
+//
+// However, any issues or comments for which fixes were applied will not
+// be fixed again by subsequent calls to [Fixer.Run] or [Fixer.FixGitHubIssue]
+// for a [Fixer] with the same name as this one. This is true even if the
+// issue or comment body has changed since the fix was applied, in order
+// to a prevent a non-idempotent fix from being applied multiple times.
+//
+// It returns an error if any of the fixes cannot be applied or if
+// no events are found for the issue.
+func (f *Fixer) FixGitHubIssue(ctx context.Context, project string, issue int64) error {
+	events := 0
+	for event := range f.github.Events(project, issue, issue) {
+		events++
+		applied, err := f.fix(ctx, event)
+		if err != nil {
+			return err
+		}
+		if applied && !testing.Testing() {
+			time.Sleep(sleep)
+		}
+	}
+	if events == 0 {
+		return fmt.Errorf("%w for project=%s issue=%d", errNoGitHubEvents, project, issue)
+	}
+	return nil
+}
+
+var (
+	sleep             = 1 * time.Second
+	errNoGitHubEvents = errors.New("no GitHub events")
+)
+
+// fix creates and runs the action for the specified event.
+// applied is true if the action was successfully applied by this run.
+// fix returns an error if the action was attempted but could not be applied.
+func (f *Fixer) fix(ctx context.Context, e *github.Event) (applied bool, err error) {
+	a := f.newAction(e)
+	if a == nil {
+		return false, nil
+	}
+	return f.runAction(ctx, a)
+}
+
+// appliedKey returns the db key used to indicate a fix has been
+// applied by this Fixer (identified by [Fixer.name]) for this issue or
+// comment (identified by the URL of the issue/comment).
+func (f *Fixer) appliedKey(a *action) []byte {
+	return ordered.Encode("commentfix.Fixer", f.name, a.ic.url())
+}
+
+// markApplied marks this action as applied by this Fixer
+// (identified by [Fixer.name]).
+func (f *Fixer) markApplied(a *action) {
+	f.db.Set(f.appliedKey(a), nil)
+	f.db.Flush()
+}
+
+// isApplied reports whether the action has been applied by a Fixer
+// with the same name as this one.
+func (f *Fixer) isApplied(a *action) bool {
+	_, ok := f.db.Get(f.appliedKey(a))
+	return ok
+}
+
 // newAction returns an newAction to take on the issue or comment of the event,
 // or nil if there is nothing to do.
 func (f *Fixer) newAction(e *github.Event) *action {
@@ -369,18 +450,34 @@
 	if !updated {
 		return nil
 	}
-	return &action{project: e.Project, issue: e.Issue, ic: ic, body: body}
+	return &action{
+		project: e.Project,
+		issue:   e.Issue,
+		ic:      ic,
+		body:    body,
+	}
 }
 
-func (f *Fixer) runAction(ctx context.Context, a *action) error {
+func (f *Fixer) runAction(ctx context.Context, a *action) (applied bool, _ error) {
+	// Do not include this Fixer's name in the lock, so that separate
+	// fixers cannot operate on the same object at the same time.
+	lock := string(ordered.Encode("commentfix", a.ic.url()))
+	f.db.Lock(lock)
+	defer f.db.Unlock(lock)
+
+	if f.isApplied(a) {
+		f.slog.Info("commentfix already applied", "fixer", f.name, "project", a.project, "issue", a.issue, "url", a.ic.url())
+		return false, nil
+	}
+
 	live, err := a.ic.download(ctx, f.github)
 	if err != nil {
 		// unreachable unless github error
-		return fmt.Errorf("commentfix download error: project=%s issue=%d url=%s err=%w", a.project, a.issue, a.ic.url(), err)
+		return false, fmt.Errorf("commentfix download error: project=%s issue=%d url=%s err=%w", a.project, a.issue, a.ic.url(), err)
 	}
 	if live.body() != a.ic.body() {
 		f.slog.Info("commentfix stale", "project", a.project, "issue", a.issue, "url", a.ic.url())
-		return nil
+		return false, nil
 	}
 	f.slog.Info("commentfix rewrite", "project", a.project, "issue", a.issue, "url", a.ic.url(), "edit", f.edit, "diff", bodyDiff(a.ic.body(), a.body))
 	fmt.Fprintf(f.stderr(), "Fix %s:\n%s\n", a.ic.url(), bodyDiff(a.ic.body(), a.body))
@@ -388,10 +485,12 @@
 		f.slog.Info("commentfix editing github", "url", a.ic.url())
 		if err := a.ic.editBody(ctx, f.github, a.body); err != nil {
 			// unreachable unless github error
-			return fmt.Errorf("commentfix edit: project=%s issue=%d err=%w", a.project, a.issue, err)
+			return false, fmt.Errorf("commentfix edit: project=%s issue=%d err=%w", a.project, a.issue, err)
 		}
+		f.markApplied(a)
+		return true, nil
 	}
-	return nil
+	return false, nil
 }
 
 // Latest returns the latest known DBTime marked old by the Fixer's Watcher.
diff --git a/internal/commentfix/fix_test.go b/internal/commentfix/fix_test.go
index 211afc5..9a414d8 100644
--- a/internal/commentfix/fix_test.go
+++ b/internal/commentfix/fix_test.go
@@ -10,6 +10,7 @@
 	"io"
 	"path/filepath"
 	"strings"
+	"sync"
 	"testing"
 	"text/template"
 	"time"
@@ -89,45 +90,12 @@
 }
 
 func TestGitHub(t *testing.T) {
-	testGH := func() *github.Client {
-		db := storage.MemDB()
-		gh := github.New(testutil.Slogger(t), db, nil, nil)
-		gh.Testing().AddIssue("rsc/tmp", &github.Issue{
-			Number:    18,
-			Title:     "spellchecking",
-			Body:      "Contexts are cancelled.",
-			CreatedAt: "2024-06-17T20:16:49-04:00",
-			UpdatedAt: "2024-06-17T20:16:49-04:00",
-		})
-		gh.Testing().AddIssue("rsc/tmp", &github.Issue{
-			Number:      19,
-			Title:       "spellchecking",
-			Body:        "Contexts are cancelled.",
-			CreatedAt:   "2024-06-17T20:16:49-04:00",
-			UpdatedAt:   "2024-06-17T20:16:49-04:00",
-			PullRequest: new(struct{}),
-		})
-
-		gh.Testing().AddIssueComment("rsc/tmp", 18, &github.IssueComment{
-			Body:      "No really, contexts are cancelled.",
-			CreatedAt: "2024-06-17T20:16:49-04:00",
-			UpdatedAt: "2024-06-17T20:16:49-04:00",
-		})
-
-		gh.Testing().AddIssueComment("rsc/tmp", 18, &github.IssueComment{
-			Body:      "Completely unrelated.",
-			CreatedAt: "2024-06-17T20:16:49-04:00",
-			UpdatedAt: "2024-06-17T20:16:49-04:00",
-		})
-
-		return gh
-	}
-
 	// Check for comment with too-new cutoff and edits disabled.
 	// Finds nothing but also no-op.
-	gh := testGH()
+	gh := testGitHub(t)
+	db := storage.MemDB()
 	lg, buf := testutil.SlogBuffer()
-	f := New(lg, gh, "fixer1")
+	f := New(lg, gh, db, "fixer1")
 	f.SetStderr(testutil.LogWriter(t))
 	f.EnableProject("rsc/tmp")
 	f.SetTimeLimit(time.Date(2222, 1, 1, 1, 1, 1, 1, time.UTC))
@@ -140,7 +108,7 @@
 
 	// Check again with old enough cutoff.
 	// Finds comment but does not edit, does not advance cursor.
-	f = New(lg, gh, "fixer1")
+	f = New(lg, gh, db, "fixer1")
 	f.SetStderr(testutil.LogWriter(t))
 	f.EnableProject("rsc/tmp")
 	f.SetTimeLimit(time.Time{})
@@ -173,7 +141,7 @@
 
 	// Write comment (now using fixer2 to avoid 'marked as old' in fixer1).
 	lg, buf = testutil.SlogBuffer()
-	f = New(lg, gh, "fixer2")
+	f = New(lg, gh, db, "fixer2")
 	f.SetStderr(testutil.LogWriter(t))
 	f.EnableProject("rsc/tmp")
 	f.ReplaceText("cancelled", "canceled")
@@ -202,7 +170,7 @@
 
 	// Try again; comment should now be marked old in watcher.
 	lg, buf = testutil.SlogBuffer()
-	f = New(lg, gh, "fixer2")
+	f = New(lg, gh, db, "fixer2")
 	f.SetStderr(testutil.LogWriter(t))
 	f.EnableProject("rsc/tmp")
 	f.ReplaceText("cancelled", "canceled")
@@ -216,7 +184,7 @@
 
 	// Check that not enabling the project doesn't edit comments.
 	lg, buf = testutil.SlogBuffer()
-	f = New(lg, gh, "fixer3")
+	f = New(lg, gh, db, "fixer3")
 	f.SetStderr(testutil.LogWriter(t))
 	f.EnableProject("xyz/tmp")
 	f.ReplaceText("cancelled", "canceled")
@@ -230,7 +198,7 @@
 
 	// Check that when there's nothing to do, we still mark things old.
 	lg, buf = testutil.SlogBuffer()
-	f = New(lg, gh, "fixer4")
+	f = New(lg, gh, db, "fixer4")
 	f.SetStderr(testutil.LogWriter(t))
 	f.EnableProject("rsc/tmp")
 	f.ReplaceText("zyzzyva", "ZYZZYVA")
@@ -244,7 +212,7 @@
 
 	// Reverse the replacement and run again with same name; should not consider any comments.
 	lg, buf = testutil.SlogBuffer()
-	f = New(lg, gh, "fixer4")
+	f = New(lg, gh, db, "fixer4")
 	f.SetStderr(testutil.LogWriter(t))
 	f.EnableProject("rsc/tmp")
 	f.ReplaceText("c", "C")
@@ -256,3 +224,152 @@
 		t.Fatalf("logs incorrectly mention rewrite of comment:\n%s", buf.Bytes())
 	}
 }
+
+func TestFixGitHubIssue(t *testing.T) {
+	ctx := context.Background()
+	t.Run("basic", func(t *testing.T) {
+		f, project, buf, check := newFixer(t)
+		check(f.FixGitHubIssue(ctx, project, 18))
+		expect(t, buf, "commentfix rewrite", 2) // fix body and comment
+	})
+
+	t.Run("twice", func(t *testing.T) {
+		f, project, buf, check := newFixer(t)
+		check(f.FixGitHubIssue(ctx, project, 18))
+		expect(t, buf, "commentfix rewrite", 2) // fix body and comment
+
+		// Running Fix again doesn't change anything because fixes were
+		// already applied.
+		check(f.FixGitHubIssue(ctx, project, 18))
+		expect(t, buf, "commentfix rewrite", 2) // nothing new
+		expect(t, buf, "commentfix already applied", 2)
+	})
+
+	t.Run("fix-run", func(t *testing.T) {
+		f, project, buf, check := newFixer(t)
+		check(f.FixGitHubIssue(ctx, project, 20))
+		expect(t, buf, "commentfix rewrite", 1) // fix body of issue 20
+
+		// Run still fixes issue 18 because FixGitHubIssue
+		// doesn't modify Run's watcher.
+		check(f.Run(ctx))
+		expect(t, buf, "commentfix rewrite", 3) // fix body and comment of issue 18
+		expect(t, buf, "commentfix already applied", 1)
+	})
+
+	t.Run("fix-run-concurrent", func(t *testing.T) {
+		f, project, buf, check := newFixer(t)
+
+		var wg sync.WaitGroup
+
+		wg.Add(1)
+		go func() {
+			check(f.FixGitHubIssue(ctx, project, 20))
+			wg.Done()
+		}()
+
+		wg.Add(1)
+		go func() {
+			check(f.Run(ctx))
+			wg.Done()
+		}()
+
+		wg.Add(1)
+		go func() {
+			check(f.FixGitHubIssue(ctx, project, 18))
+			wg.Done()
+		}()
+
+		wg.Wait()
+
+		// Each action is attempted twice.
+		expect(t, buf, "commentfix rewrite", 3)
+		expect(t, buf, "commentfix already applied", 3)
+	})
+
+	t.Run("fix-concurrent", func(t *testing.T) {
+		f, project, buf, check := newFixer(t)
+
+		var wg sync.WaitGroup
+
+		n := 5
+		wg.Add(n)
+		for range n {
+			go func() {
+				check(f.FixGitHubIssue(ctx, project, 20))
+				wg.Done()
+			}()
+		}
+
+		wg.Wait()
+
+		expect(t, buf, "commentfix rewrite", 1)
+		expect(t, buf, "commentfix already applied", n-1)
+	})
+}
+
+func expect(t *testing.T, buf *bytes.Buffer, action string, n int) {
+	t.Helper()
+
+	if mentions := bytes.Count(buf.Bytes(), []byte(action)); mentions != n {
+		t.Errorf("logs mention %q %d times, want %d mentions:\n%s", action, mentions, n, buf.Bytes())
+	}
+}
+
+func newFixer(t *testing.T) (_ *Fixer, project string, _ *bytes.Buffer, check func(error)) {
+	gh := testGitHub(t)
+	db := storage.MemDB()
+	lg, buf := testutil.SlogBuffer()
+	f := New(lg, gh, db, t.Name())
+	f.SetStderr(testutil.LogWriter(t))
+	project = "rsc/tmp"
+	f.EnableProject(project)
+	f.ReplaceText("cancelled", "canceled")
+	f.SetTimeLimit(time.Time{})
+	f.EnableEdits()
+	return f, project, buf, testutil.Checker(t)
+}
+
+func testGitHub(t *testing.T) *github.Client {
+	db := storage.MemDB()
+	gh := github.New(testutil.Slogger(t), db, nil, nil)
+	gh.Testing().AddIssue("rsc/tmp", &github.Issue{
+		Number:    18,
+		Title:     "spellchecking",
+		Body:      "Contexts are cancelled.",
+		CreatedAt: "2024-06-17T20:16:49-04:00",
+		UpdatedAt: "2024-06-17T20:16:49-04:00",
+	})
+
+	// Ignored (pull request).
+	gh.Testing().AddIssue("rsc/tmp", &github.Issue{
+		Number:      19,
+		Title:       "spellchecking",
+		Body:        "Contexts are cancelled.",
+		CreatedAt:   "2024-06-17T20:16:49-04:00",
+		UpdatedAt:   "2024-06-17T20:16:49-04:00",
+		PullRequest: new(struct{}),
+	})
+
+	gh.Testing().AddIssueComment("rsc/tmp", 18, &github.IssueComment{
+		Body:      "No really, contexts are cancelled.",
+		CreatedAt: "2024-06-17T20:16:49-04:00",
+		UpdatedAt: "2024-06-17T20:16:49-04:00",
+	})
+
+	gh.Testing().AddIssueComment("rsc/tmp", 18, &github.IssueComment{
+		Body:      "Completely unrelated.",
+		CreatedAt: "2024-06-17T20:16:49-04:00",
+		UpdatedAt: "2024-06-17T20:16:49-04:00",
+	})
+
+	gh.Testing().AddIssue("rsc/tmp", &github.Issue{
+		Number:    20,
+		Title:     "spellchecking 2",
+		Body:      "Contexts are cancelled.",
+		CreatedAt: "2024-06-17T20:16:49-04:00",
+		UpdatedAt: "2024-06-17T20:16:49-04:00",
+	})
+
+	return gh
+}
diff --git a/internal/gaby/github_event.go b/internal/gaby/github_event.go
index 329b542..2a501b1 100644
--- a/internal/gaby/github_event.go
+++ b/internal/gaby/github_event.go
@@ -43,11 +43,9 @@
 
 // handleGitHubIssueEvent handles incoming GitHub "issue" events.
 //
-// If the event corresponds to a new issue, the function syncs
-// the corresponding GitHub project, posts related issues for the
-// new GitHub issue, and fixes comments in all new issues.
-// TODO(https://github.com/golang/oscar/issues/19): Run commentfix for
-// the single issue instead of all new issues.
+// If the event is a new issue, the function syncs
+// the corresponding GitHub project, posts related issues for and fixes
+// comments on the new GitHub issue.
 //
 // Otherwise, it logs the event and takes no other action.
 func (g *Gaby) handleGithubIssueEvent(ctx context.Context, event *github.WebhookIssueEvent) error {
@@ -75,7 +73,9 @@
 		if err := g.relatedPoster.Post(ctx, project, event.Issue.Number); err != nil {
 			return err
 		}
-		if err := g.fixAllComments(ctx); err != nil {
+		// No need to lock; [commentfix.Fixer.FixGitHubIssue] and
+		// [commentfix.Fixer.Run] can happen concurrently.
+		if err := g.commentFixer.FixGitHubIssue(ctx, project, event.Issue.Number); err != nil {
 			return err
 		}
 	}
diff --git a/internal/gaby/main.go b/internal/gaby/main.go
index 21cff4e..ac6e198 100644
--- a/internal/gaby/main.go
+++ b/internal/gaby/main.go
@@ -133,7 +133,7 @@
 		return
 	}
 
-	cf := commentfix.New(g.slog, g.github, "gerritlinks")
+	cf := commentfix.New(g.slog, g.github, g.db, "gerritlinks")
 	cf.EnableProject(g.githubProject)
 	cf.AutoLink(`\bCL ([0-9]+)\b`, "https://go.dev/cl/$1")
 	cf.ReplaceURL(`\Qhttps://go-review.git.corp.google.com/\E`, "https://go-review.googlesource.com/")