internal/task,internal/relui: add helper for awaiting a condition
In preparation for adding a watchdog timer, use a common helper function
that can reset the watchdog automatically.
For golang/go#54134.
Change-Id: I257a97cfeb540e90b0c005e255c40ee1d054bae0
Reviewed-on: https://go-review.googlesource.com/c/build/+/420540
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Jenny Rakoczy <jenny@golang.org>
diff --git a/internal/relui/buildrelease_test.go b/internal/relui/buildrelease_test.go
index 2bdbb86..2c6beef 100644
--- a/internal/relui/buildrelease_test.go
+++ b/internal/relui/buildrelease_test.go
@@ -69,6 +69,8 @@
}
func newReleaseTestDeps(t *testing.T, wantVersion string) *releaseTestDeps {
+ task.AwaitDivisor = 100
+ t.Cleanup(func() { task.AwaitDivisor = 1 })
ctx, cancel := context.WithCancel(context.Background())
if runtime.GOOS != "linux" && runtime.GOOS != "darwin" {
t.Skip("Requires bash shell scripting support.")
@@ -86,7 +88,6 @@
// Set up the fake signing process.
scratchDir := t.TempDir()
- signingPollDuration = 100 * time.Millisecond
argRe := regexp.MustCompile(`--relui_staging="(.*?)"`)
outputListener := func(taskName string, output interface{}) {
if taskName != "Start signing command" {
@@ -109,7 +110,6 @@
dlServer := httptest.NewServer(http.FileServer(http.FS(os.DirFS(dlDir))))
t.Cleanup(dlServer.Close)
go fakeCDNLoad(ctx, t, servingDir, dlDir)
- uploadPollDuration = 100 * time.Millisecond
// Set up the fake website to publish to.
var filesMu sync.Mutex
@@ -569,8 +569,8 @@
return "fake~12345", nil
}
-func (g *fakeGerrit) AwaitSubmit(ctx context.Context, changeID, baseCommit string) (string, error) {
- return "fakehash", nil
+func (g *fakeGerrit) Submitted(ctx context.Context, changeID, baseCommit string) (string, bool, error) {
+ return "fakehash", true, nil
}
func (g *fakeGerrit) ListTags(ctx context.Context, project string) ([]string, error) {
diff --git a/internal/relui/workflows.go b/internal/relui/workflows.go
index ea1dcd7..f448f00 100644
--- a/internal/relui/workflows.go
+++ b/internal/relui/workflows.go
@@ -260,26 +260,6 @@
return arg, nil
}
-type AwaitConditionFunc func(ctx *wf.TaskContext) (done bool, err error)
-
-// AwaitFunc is a wf.Task that polls the provided awaitCondition
-// every period until it either returns true or returns an error.
-func AwaitFunc(ctx *wf.TaskContext, period time.Duration, awaitCondition AwaitConditionFunc) error {
- ticker := time.NewTicker(period)
- defer ticker.Stop()
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-ticker.C:
- ok, err := awaitCondition(ctx)
- if ok || err != nil {
- return err
- }
- }
- }
-}
-
func checkTaskApproved(ctx *wf.TaskContext, p *pgxpool.Pool) (bool, error) {
q := db.New(p)
t, err := q.Task(ctx, db.TaskParams{
@@ -312,9 +292,11 @@
// waitAction := wf.ActionN(wd, "Wait for Approval", ApproveActionDep(db), wf.After(someDependency))
func ApproveActionDep(p *pgxpool.Pool) func(*wf.TaskContext) error {
return func(ctx *wf.TaskContext) error {
- return AwaitFunc(ctx, 5*time.Second, func(ctx *wf.TaskContext) (done bool, err error) {
- return checkTaskApproved(ctx, p)
+ _, err := task.AwaitCondition(ctx, 5*time.Second, func() (int, bool, error) {
+ done, err := checkTaskApproved(ctx, p)
+ return 0, done, err
})
+ return err
}
}
@@ -941,8 +923,6 @@
return path.Join(ctx.WorkflowID.String(), "signing", version)
}
-var signingPollDuration = 30 * time.Second
-
// awaitSigned waits for all of artifacts to be signed, plus the pkgs for
// darwinTargets.
func (tasks *BuildReleaseTasks) awaitSigned(ctx *wf.TaskContext, version string, darwinTargets []*releasetargets.Target, artifacts []artifact) ([]artifact, error) {
@@ -967,11 +947,12 @@
todo[a] = true
}
var signedArtifacts []artifact
- for {
+
+ check := func() ([]artifact, bool, error) {
for a := range todo {
signed, ok, err := readSignedArtifact(ctx, scratchFS, version, a)
if err != nil {
- return nil, err
+ return nil, false, err
}
if !ok {
continue
@@ -980,17 +961,9 @@
signedArtifacts = append(signedArtifacts, signed)
delete(todo, a)
}
-
- if len(todo) == 0 {
- return signedArtifacts, nil
- }
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case <-time.After(signingPollDuration):
- ctx.Printf("Still waiting for %v artifacts to be signed", len(todo))
- }
+ return signedArtifacts, len(todo) == 0, nil
}
+ return task.AwaitCondition(ctx, 30*time.Second, check)
}
func readSignedArtifact(ctx *wf.TaskContext, scratchFS fs.FS, version string, a artifact) (_ artifact, ok bool, _ error) {
@@ -1056,8 +1029,6 @@
return signed, true, nil
}
-var uploadPollDuration = 30 * time.Second
-
func (tasks *BuildReleaseTasks) uploadArtifacts(ctx *wf.TaskContext, artifacts []artifact) error {
scratchFS, err := gcsfs.FromURL(ctx, tasks.GCSClient, tasks.ScratchURL)
if err != nil {
@@ -1076,13 +1047,13 @@
todo[a] = true
}
- for {
+ check := func() (int, bool, error) {
for _, a := range artifacts {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
resp, err := ctxhttp.Head(ctx, http.DefaultClient, tasks.DownloadURL+"/"+a.Filename)
if err != nil && err != context.DeadlineExceeded {
- return err
+ return 0, false, err
}
resp.Body.Close()
cancel()
@@ -1090,17 +1061,10 @@
delete(todo, a)
}
}
-
- if len(todo) == 0 {
- return nil
- }
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-time.After(uploadPollDuration):
- ctx.Printf("Still waiting for %v artifacts to be published", len(todo))
- }
+ return 0, len(todo) == 0, nil
}
+ _, err = task.AwaitCondition(ctx, 30*time.Second, check)
+ return err
}
func uploadArtifact(scratchFS, servingFS fs.FS, a artifact) error {
diff --git a/internal/relui/workflows_test.go b/internal/relui/workflows_test.go
index 3b77eb1..0551683 100644
--- a/internal/relui/workflows_test.go
+++ b/internal/relui/workflows_test.go
@@ -10,6 +10,7 @@
"github.com/google/go-cmp/cmp"
"github.com/google/uuid"
"golang.org/x/build/internal/relui/db"
+ "golang.org/x/build/internal/task"
"golang.org/x/build/internal/workflow"
)
@@ -42,24 +43,28 @@
didWork := make(chan struct{}, 2)
success := make(chan interface{})
done := make(chan interface{})
- cond := func(ctx *workflow.TaskContext) (bool, error) {
- select {
- case <-success:
- if c.wantCancel {
- cancel()
- return false, ctx.Err()
- } else if c.wantErr {
- return false, errors.New("someError")
- }
- return true, nil
- case <-ctx.Done():
- return false, ctx.Err()
- case didWork <- struct{}{}:
- return false, nil
- }
- }
wd := workflow.New()
- await := workflow.Action2(wd, "AwaitFunc", AwaitFunc, workflow.Const(10*time.Millisecond), workflow.Const(AwaitConditionFunc(cond)))
+
+ awaitFunc := func(ctx *workflow.TaskContext) error {
+ _, err := task.AwaitCondition(ctx, 10*time.Millisecond, func() (int, bool, error) {
+ select {
+ case <-success:
+ if c.wantCancel {
+ cancel()
+ return 0, false, ctx.Err()
+ } else if c.wantErr {
+ return 0, false, errors.New("someError")
+ }
+ return 0, true, nil
+ case <-ctx.Done():
+ return 0, false, ctx.Err()
+ case didWork <- struct{}{}:
+ return 0, false, nil
+ }
+ })
+ return err
+ }
+ await := workflow.Action0(wd, "AwaitFunc", awaitFunc)
truth := workflow.Task0(wd, "truth", func(_ context.Context) (bool, error) { return true, nil }, workflow.After(await))
workflow.Output(wd, "await", truth)
diff --git a/internal/task/announce.go b/internal/task/announce.go
index 87fc7ae..1c47db1 100644
--- a/internal/task/announce.go
+++ b/internal/task/announce.go
@@ -301,37 +301,17 @@
// to show up on Google Groups, and returns its canonical URL.
func (t AnnounceMailTasks) AwaitAnnounceMail(ctx *workflow.TaskContext, m SentMail) (announcementURL string, _ error) {
// Find the URL for the announcement while giving the email a chance to be received and moderated.
- started := time.Now()
- poll := time.NewTicker(10 * time.Second)
- defer poll.Stop()
- updateLog := time.NewTicker(time.Minute)
- defer updateLog.Stop()
- for {
- // Wait a bit, updating the log periodically.
- select {
- case <-ctx.Done():
- return "", ctx.Err()
- case <-poll.C:
- case t := <-updateLog.C:
- if log := ctx.Logger; log != nil {
- log.Printf("... still waiting for %q to appear after %v ...\n", m.Subject, t.Sub(started))
- }
- continue
- }
-
+ check := func() (string, bool, error) {
// See if our email is available by now.
threadURL, err := findGoogleGroupsThread(ctx, m.Subject)
if err != nil {
- if log := ctx.Logger; log != nil {
- log.Printf("findGoogleGroupsThread: %v", err)
- }
- continue
- } else if threadURL == "" {
- // Our email hasn't yet shown up. Wait more and try again.
- continue
+ ctx.Printf("findGoogleGroupsThread: %v", err)
+ return "", false, nil
}
- return threadURL, nil
+ return threadURL, threadURL != "", nil
+
}
+ return AwaitCondition(ctx, 10*time.Second, check)
}
// findGoogleGroupsThread fetches the first page of threads from the golang-announce
diff --git a/internal/task/gerrit.go b/internal/task/gerrit.go
index 2f9f825..23ec96f 100644
--- a/internal/task/gerrit.go
+++ b/internal/task/gerrit.go
@@ -5,7 +5,6 @@
"errors"
"fmt"
"strings"
- "time"
"golang.org/x/build/gerrit"
)
@@ -17,10 +16,10 @@
// If the requested contents match the state of the repository, no change
// is created and the returned change ID will be empty.
CreateAutoSubmitChange(ctx context.Context, input gerrit.ChangeInput, contents map[string]string) (string, error)
- // AwaitSubmit waits for the specified change to be auto-submitted or fail
+ // Submitted checks if the specified change has been submitted or failed
// trybots. If the CL is submitted, returns the submitted commit hash.
// If parentCommit is non-empty, the submitted CL's parent must match it.
- AwaitSubmit(ctx context.Context, changeID, parentCommit string) (string, error)
+ Submitted(ctx context.Context, changeID, parentCommit string) (string, bool, error)
// Tag creates a tag on project at the specified commit.
Tag(ctx context.Context, project, tag, commit string) error
// ListTags returns all the tags on project.
@@ -76,33 +75,26 @@
return changeID, nil
}
-func (c *RealGerritClient) AwaitSubmit(ctx context.Context, changeID, parentCommit string) (string, error) {
- for {
- detail, err := c.Client.GetChangeDetail(ctx, changeID, gerrit.QueryChangesOpt{
- Fields: []string{"CURRENT_REVISION", "DETAILED_LABELS", "CURRENT_COMMIT"},
- })
- if err != nil {
- return "", err
+func (c *RealGerritClient) Submitted(ctx context.Context, changeID, parentCommit string) (string, bool, error) {
+ detail, err := c.Client.GetChangeDetail(ctx, changeID, gerrit.QueryChangesOpt{
+ Fields: []string{"CURRENT_REVISION", "DETAILED_LABELS", "CURRENT_COMMIT"},
+ })
+ if err != nil {
+ return "", false, err
+ }
+ if detail.Status == "MERGED" {
+ parents := detail.Revisions[detail.CurrentRevision].Commit.Parents
+ if parentCommit != "" && (len(parents) != 1 || parents[0].CommitID != parentCommit) {
+ return "", false, fmt.Errorf("expected merged CL %v to have one parent commit %v, has %v", ChangeLink(changeID), parentCommit, parents)
}
- if detail.Status == "MERGED" {
- parents := detail.Revisions[detail.CurrentRevision].Commit.Parents
- if parentCommit != "" && (len(parents) != 1 || parents[0].CommitID != parentCommit) {
- return "", fmt.Errorf("expected merged CL %v to have one parent commit %v, has %v", ChangeLink(changeID), parentCommit, parents)
- }
- return detail.CurrentRevision, nil
- }
- for _, approver := range detail.Labels["TryBot-Result"].All {
- if approver.Value < 0 {
- return "", fmt.Errorf("trybots failed on %v", ChangeLink(changeID))
- }
- }
-
- select {
- case <-ctx.Done():
- return "", ctx.Err()
- case <-time.After(10 * time.Second):
+ return detail.CurrentRevision, true, nil
+ }
+ for _, approver := range detail.Labels["TryBot-Result"].All {
+ if approver.Value < 0 {
+ return "", false, fmt.Errorf("trybots failed on %v", ChangeLink(changeID))
}
}
+ return "", false, nil
}
func (c *RealGerritClient) Tag(ctx context.Context, project, tag, commit string) error {
diff --git a/internal/task/task.go b/internal/task/task.go
index b77b535..129251f 100644
--- a/internal/task/task.go
+++ b/internal/task/task.go
@@ -5,8 +5,40 @@
// Package task implements tasks involved in making a Go release.
package task
+import (
+ "time"
+
+ "golang.org/x/build/internal/workflow"
+)
+
// CommunicationTasks combines communication tasks together.
type CommunicationTasks struct {
AnnounceMailTasks
TweetTasks
}
+
+var AwaitDivisor int = 1
+
+// AwaitCondition calls the condition function every period until it returns
+// true to indicate success, or an error. If the condition succeeds,
+// AwaitCondition returns its result.
+func AwaitCondition[T any](ctx *workflow.TaskContext, period time.Duration, condition func() (T, bool, error)) (T, error) {
+ pollTimer := time.NewTicker(period / time.Duration(AwaitDivisor))
+ defer pollTimer.Stop()
+ heartbeatTimer := time.NewTicker(time.Minute)
+ defer heartbeatTimer.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ var zero T
+ return zero, ctx.Err()
+ case <-heartbeatTimer.C:
+ // TODO: reset watchdog
+ case <-pollTimer.C:
+ res, done, err := condition()
+ if done || err != nil {
+ return res, err
+ }
+ }
+ }
+}
diff --git a/internal/task/version.go b/internal/task/version.go
index b2a2e22..efb0a2c 100644
--- a/internal/task/version.go
+++ b/internal/task/version.go
@@ -4,6 +4,7 @@
"fmt"
"strconv"
"strings"
+ "time"
"golang.org/x/build/gerrit"
"golang.org/x/build/internal/workflow"
@@ -110,7 +111,9 @@
}
ctx.Printf("Awaiting review/submit of %v", ChangeLink(changeID))
- return t.Gerrit.AwaitSubmit(ctx, changeID, baseCommit)
+ return AwaitCondition(ctx, 10*time.Second, func() (string, bool, error) {
+ return t.Gerrit.Submitted(ctx, changeID, baseCommit)
+ })
}
// ReadBranchHead returns the current HEAD revision of branch.