internal/task,internal/relui: add helper for awaiting a condition

In preparation for adding a watchdog timer, use a common helper function
that can reset the watchdog automatically.

For golang/go#54134.

Change-Id: I257a97cfeb540e90b0c005e255c40ee1d054bae0
Reviewed-on: https://go-review.googlesource.com/c/build/+/420540
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Jenny Rakoczy <jenny@golang.org>
diff --git a/internal/relui/buildrelease_test.go b/internal/relui/buildrelease_test.go
index 2bdbb86..2c6beef 100644
--- a/internal/relui/buildrelease_test.go
+++ b/internal/relui/buildrelease_test.go
@@ -69,6 +69,8 @@
 }
 
 func newReleaseTestDeps(t *testing.T, wantVersion string) *releaseTestDeps {
+	task.AwaitDivisor = 100
+	t.Cleanup(func() { task.AwaitDivisor = 1 })
 	ctx, cancel := context.WithCancel(context.Background())
 	if runtime.GOOS != "linux" && runtime.GOOS != "darwin" {
 		t.Skip("Requires bash shell scripting support.")
@@ -86,7 +88,6 @@
 
 	// Set up the fake signing process.
 	scratchDir := t.TempDir()
-	signingPollDuration = 100 * time.Millisecond
 	argRe := regexp.MustCompile(`--relui_staging="(.*?)"`)
 	outputListener := func(taskName string, output interface{}) {
 		if taskName != "Start signing command" {
@@ -109,7 +110,6 @@
 	dlServer := httptest.NewServer(http.FileServer(http.FS(os.DirFS(dlDir))))
 	t.Cleanup(dlServer.Close)
 	go fakeCDNLoad(ctx, t, servingDir, dlDir)
-	uploadPollDuration = 100 * time.Millisecond
 
 	// Set up the fake website to publish to.
 	var filesMu sync.Mutex
@@ -569,8 +569,8 @@
 	return "fake~12345", nil
 }
 
-func (g *fakeGerrit) AwaitSubmit(ctx context.Context, changeID, baseCommit string) (string, error) {
-	return "fakehash", nil
+func (g *fakeGerrit) Submitted(ctx context.Context, changeID, baseCommit string) (string, bool, error) {
+	return "fakehash", true, nil
 }
 
 func (g *fakeGerrit) ListTags(ctx context.Context, project string) ([]string, error) {
diff --git a/internal/relui/workflows.go b/internal/relui/workflows.go
index ea1dcd7..f448f00 100644
--- a/internal/relui/workflows.go
+++ b/internal/relui/workflows.go
@@ -260,26 +260,6 @@
 	return arg, nil
 }
 
-type AwaitConditionFunc func(ctx *wf.TaskContext) (done bool, err error)
-
-// AwaitFunc is a wf.Task that polls the provided awaitCondition
-// every period until it either returns true or returns an error.
-func AwaitFunc(ctx *wf.TaskContext, period time.Duration, awaitCondition AwaitConditionFunc) error {
-	ticker := time.NewTicker(period)
-	defer ticker.Stop()
-	for {
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		case <-ticker.C:
-			ok, err := awaitCondition(ctx)
-			if ok || err != nil {
-				return err
-			}
-		}
-	}
-}
-
 func checkTaskApproved(ctx *wf.TaskContext, p *pgxpool.Pool) (bool, error) {
 	q := db.New(p)
 	t, err := q.Task(ctx, db.TaskParams{
@@ -312,9 +292,11 @@
 //	waitAction := wf.ActionN(wd, "Wait for Approval", ApproveActionDep(db), wf.After(someDependency))
 func ApproveActionDep(p *pgxpool.Pool) func(*wf.TaskContext) error {
 	return func(ctx *wf.TaskContext) error {
-		return AwaitFunc(ctx, 5*time.Second, func(ctx *wf.TaskContext) (done bool, err error) {
-			return checkTaskApproved(ctx, p)
+		_, err := task.AwaitCondition(ctx, 5*time.Second, func() (int, bool, error) {
+			done, err := checkTaskApproved(ctx, p)
+			return 0, done, err
 		})
+		return err
 	}
 }
 
@@ -941,8 +923,6 @@
 	return path.Join(ctx.WorkflowID.String(), "signing", version)
 }
 
-var signingPollDuration = 30 * time.Second
-
 // awaitSigned waits for all of artifacts to be signed, plus the pkgs for
 // darwinTargets.
 func (tasks *BuildReleaseTasks) awaitSigned(ctx *wf.TaskContext, version string, darwinTargets []*releasetargets.Target, artifacts []artifact) ([]artifact, error) {
@@ -967,11 +947,12 @@
 		todo[a] = true
 	}
 	var signedArtifacts []artifact
-	for {
+
+	check := func() ([]artifact, bool, error) {
 		for a := range todo {
 			signed, ok, err := readSignedArtifact(ctx, scratchFS, version, a)
 			if err != nil {
-				return nil, err
+				return nil, false, err
 			}
 			if !ok {
 				continue
@@ -980,17 +961,9 @@
 			signedArtifacts = append(signedArtifacts, signed)
 			delete(todo, a)
 		}
-
-		if len(todo) == 0 {
-			return signedArtifacts, nil
-		}
-		select {
-		case <-ctx.Done():
-			return nil, ctx.Err()
-		case <-time.After(signingPollDuration):
-			ctx.Printf("Still waiting for %v artifacts to be signed", len(todo))
-		}
+		return signedArtifacts, len(todo) == 0, nil
 	}
+	return task.AwaitCondition(ctx, 30*time.Second, check)
 }
 
 func readSignedArtifact(ctx *wf.TaskContext, scratchFS fs.FS, version string, a artifact) (_ artifact, ok bool, _ error) {
@@ -1056,8 +1029,6 @@
 	return signed, true, nil
 }
 
-var uploadPollDuration = 30 * time.Second
-
 func (tasks *BuildReleaseTasks) uploadArtifacts(ctx *wf.TaskContext, artifacts []artifact) error {
 	scratchFS, err := gcsfs.FromURL(ctx, tasks.GCSClient, tasks.ScratchURL)
 	if err != nil {
@@ -1076,13 +1047,13 @@
 		todo[a] = true
 	}
 
-	for {
+	check := func() (int, bool, error) {
 		for _, a := range artifacts {
 			ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
 			defer cancel()
 			resp, err := ctxhttp.Head(ctx, http.DefaultClient, tasks.DownloadURL+"/"+a.Filename)
 			if err != nil && err != context.DeadlineExceeded {
-				return err
+				return 0, false, err
 			}
 			resp.Body.Close()
 			cancel()
@@ -1090,17 +1061,10 @@
 				delete(todo, a)
 			}
 		}
-
-		if len(todo) == 0 {
-			return nil
-		}
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		case <-time.After(uploadPollDuration):
-			ctx.Printf("Still waiting for %v artifacts to be published", len(todo))
-		}
+		return 0, len(todo) == 0, nil
 	}
+	_, err = task.AwaitCondition(ctx, 30*time.Second, check)
+	return err
 }
 
 func uploadArtifact(scratchFS, servingFS fs.FS, a artifact) error {
diff --git a/internal/relui/workflows_test.go b/internal/relui/workflows_test.go
index 3b77eb1..0551683 100644
--- a/internal/relui/workflows_test.go
+++ b/internal/relui/workflows_test.go
@@ -10,6 +10,7 @@
 	"github.com/google/go-cmp/cmp"
 	"github.com/google/uuid"
 	"golang.org/x/build/internal/relui/db"
+	"golang.org/x/build/internal/task"
 	"golang.org/x/build/internal/workflow"
 )
 
@@ -42,24 +43,28 @@
 			didWork := make(chan struct{}, 2)
 			success := make(chan interface{})
 			done := make(chan interface{})
-			cond := func(ctx *workflow.TaskContext) (bool, error) {
-				select {
-				case <-success:
-					if c.wantCancel {
-						cancel()
-						return false, ctx.Err()
-					} else if c.wantErr {
-						return false, errors.New("someError")
-					}
-					return true, nil
-				case <-ctx.Done():
-					return false, ctx.Err()
-				case didWork <- struct{}{}:
-					return false, nil
-				}
-			}
 			wd := workflow.New()
-			await := workflow.Action2(wd, "AwaitFunc", AwaitFunc, workflow.Const(10*time.Millisecond), workflow.Const(AwaitConditionFunc(cond)))
+
+			awaitFunc := func(ctx *workflow.TaskContext) error {
+				_, err := task.AwaitCondition(ctx, 10*time.Millisecond, func() (int, bool, error) {
+					select {
+					case <-success:
+						if c.wantCancel {
+							cancel()
+							return 0, false, ctx.Err()
+						} else if c.wantErr {
+							return 0, false, errors.New("someError")
+						}
+						return 0, true, nil
+					case <-ctx.Done():
+						return 0, false, ctx.Err()
+					case didWork <- struct{}{}:
+						return 0, false, nil
+					}
+				})
+				return err
+			}
+			await := workflow.Action0(wd, "AwaitFunc", awaitFunc)
 			truth := workflow.Task0(wd, "truth", func(_ context.Context) (bool, error) { return true, nil }, workflow.After(await))
 			workflow.Output(wd, "await", truth)
 
diff --git a/internal/task/announce.go b/internal/task/announce.go
index 87fc7ae..1c47db1 100644
--- a/internal/task/announce.go
+++ b/internal/task/announce.go
@@ -301,37 +301,17 @@
 // to show up on Google Groups, and returns its canonical URL.
 func (t AnnounceMailTasks) AwaitAnnounceMail(ctx *workflow.TaskContext, m SentMail) (announcementURL string, _ error) {
 	// Find the URL for the announcement while giving the email a chance to be received and moderated.
-	started := time.Now()
-	poll := time.NewTicker(10 * time.Second)
-	defer poll.Stop()
-	updateLog := time.NewTicker(time.Minute)
-	defer updateLog.Stop()
-	for {
-		// Wait a bit, updating the log periodically.
-		select {
-		case <-ctx.Done():
-			return "", ctx.Err()
-		case <-poll.C:
-		case t := <-updateLog.C:
-			if log := ctx.Logger; log != nil {
-				log.Printf("... still waiting for %q to appear after %v ...\n", m.Subject, t.Sub(started))
-			}
-			continue
-		}
-
+	check := func() (string, bool, error) {
 		// See if our email is available by now.
 		threadURL, err := findGoogleGroupsThread(ctx, m.Subject)
 		if err != nil {
-			if log := ctx.Logger; log != nil {
-				log.Printf("findGoogleGroupsThread: %v", err)
-			}
-			continue
-		} else if threadURL == "" {
-			// Our email hasn't yet shown up. Wait more and try again.
-			continue
+			ctx.Printf("findGoogleGroupsThread: %v", err)
+			return "", false, nil
 		}
-		return threadURL, nil
+		return threadURL, threadURL != "", nil
+
 	}
+	return AwaitCondition(ctx, 10*time.Second, check)
 }
 
 // findGoogleGroupsThread fetches the first page of threads from the golang-announce
diff --git a/internal/task/gerrit.go b/internal/task/gerrit.go
index 2f9f825..23ec96f 100644
--- a/internal/task/gerrit.go
+++ b/internal/task/gerrit.go
@@ -5,7 +5,6 @@
 	"errors"
 	"fmt"
 	"strings"
-	"time"
 
 	"golang.org/x/build/gerrit"
 )
@@ -17,10 +16,10 @@
 	// If the requested contents match the state of the repository, no change
 	// is created and the returned change ID will be empty.
 	CreateAutoSubmitChange(ctx context.Context, input gerrit.ChangeInput, contents map[string]string) (string, error)
-	// AwaitSubmit waits for the specified change to be auto-submitted or fail
+	// Submitted checks if the specified change has been submitted or failed
 	// trybots. If the CL is submitted, returns the submitted commit hash.
 	// If parentCommit is non-empty, the submitted CL's parent must match it.
-	AwaitSubmit(ctx context.Context, changeID, parentCommit string) (string, error)
+	Submitted(ctx context.Context, changeID, parentCommit string) (string, bool, error)
 	// Tag creates a tag on project at the specified commit.
 	Tag(ctx context.Context, project, tag, commit string) error
 	// ListTags returns all the tags on project.
@@ -76,33 +75,26 @@
 	return changeID, nil
 }
 
-func (c *RealGerritClient) AwaitSubmit(ctx context.Context, changeID, parentCommit string) (string, error) {
-	for {
-		detail, err := c.Client.GetChangeDetail(ctx, changeID, gerrit.QueryChangesOpt{
-			Fields: []string{"CURRENT_REVISION", "DETAILED_LABELS", "CURRENT_COMMIT"},
-		})
-		if err != nil {
-			return "", err
+func (c *RealGerritClient) Submitted(ctx context.Context, changeID, parentCommit string) (string, bool, error) {
+	detail, err := c.Client.GetChangeDetail(ctx, changeID, gerrit.QueryChangesOpt{
+		Fields: []string{"CURRENT_REVISION", "DETAILED_LABELS", "CURRENT_COMMIT"},
+	})
+	if err != nil {
+		return "", false, err
+	}
+	if detail.Status == "MERGED" {
+		parents := detail.Revisions[detail.CurrentRevision].Commit.Parents
+		if parentCommit != "" && (len(parents) != 1 || parents[0].CommitID != parentCommit) {
+			return "", false, fmt.Errorf("expected merged CL %v to have one parent commit %v, has %v", ChangeLink(changeID), parentCommit, parents)
 		}
-		if detail.Status == "MERGED" {
-			parents := detail.Revisions[detail.CurrentRevision].Commit.Parents
-			if parentCommit != "" && (len(parents) != 1 || parents[0].CommitID != parentCommit) {
-				return "", fmt.Errorf("expected merged CL %v to have one parent commit %v, has %v", ChangeLink(changeID), parentCommit, parents)
-			}
-			return detail.CurrentRevision, nil
-		}
-		for _, approver := range detail.Labels["TryBot-Result"].All {
-			if approver.Value < 0 {
-				return "", fmt.Errorf("trybots failed on %v", ChangeLink(changeID))
-			}
-		}
-
-		select {
-		case <-ctx.Done():
-			return "", ctx.Err()
-		case <-time.After(10 * time.Second):
+		return detail.CurrentRevision, true, nil
+	}
+	for _, approver := range detail.Labels["TryBot-Result"].All {
+		if approver.Value < 0 {
+			return "", false, fmt.Errorf("trybots failed on %v", ChangeLink(changeID))
 		}
 	}
+	return "", false, nil
 }
 
 func (c *RealGerritClient) Tag(ctx context.Context, project, tag, commit string) error {
diff --git a/internal/task/task.go b/internal/task/task.go
index b77b535..129251f 100644
--- a/internal/task/task.go
+++ b/internal/task/task.go
@@ -5,8 +5,40 @@
 // Package task implements tasks involved in making a Go release.
 package task
 
+import (
+	"time"
+
+	"golang.org/x/build/internal/workflow"
+)
+
 // CommunicationTasks combines communication tasks together.
 type CommunicationTasks struct {
 	AnnounceMailTasks
 	TweetTasks
 }
+
+var AwaitDivisor int = 1
+
+// AwaitCondition calls the condition function every period until it returns
+// true to indicate success, or an error. If the condition succeeds,
+// AwaitCondition returns its result.
+func AwaitCondition[T any](ctx *workflow.TaskContext, period time.Duration, condition func() (T, bool, error)) (T, error) {
+	pollTimer := time.NewTicker(period / time.Duration(AwaitDivisor))
+	defer pollTimer.Stop()
+	heartbeatTimer := time.NewTicker(time.Minute)
+	defer heartbeatTimer.Stop()
+	for {
+		select {
+		case <-ctx.Done():
+			var zero T
+			return zero, ctx.Err()
+		case <-heartbeatTimer.C:
+			// TODO: reset watchdog
+		case <-pollTimer.C:
+			res, done, err := condition()
+			if done || err != nil {
+				return res, err
+			}
+		}
+	}
+}
diff --git a/internal/task/version.go b/internal/task/version.go
index b2a2e22..efb0a2c 100644
--- a/internal/task/version.go
+++ b/internal/task/version.go
@@ -4,6 +4,7 @@
 	"fmt"
 	"strconv"
 	"strings"
+	"time"
 
 	"golang.org/x/build/gerrit"
 	"golang.org/x/build/internal/workflow"
@@ -110,7 +111,9 @@
 	}
 
 	ctx.Printf("Awaiting review/submit of %v", ChangeLink(changeID))
-	return t.Gerrit.AwaitSubmit(ctx, changeID, baseCommit)
+	return AwaitCondition(ctx, 10*time.Second, func() (string, bool, error) {
+		return t.Gerrit.Submitted(ctx, changeID, baseCommit)
+	})
 }
 
 // ReadBranchHead returns the current HEAD revision of branch.