internal/workflow: run at most one expansion at a time
We've inadvertently been running expansions concurrently during minor
releases. For better or worse, it happened to be seemingly fine, and
problems started to be noticeable only when needing to restart or
approve tasks, which failed with puzzling "unknown task" errors.
It's useful to be able to plan builders for the two Go releases in
parallel, and it's completely fine for it not to happen concurrently.
Instead of getting the workflow to arrange for that, it seems we can
do it in the workflow package itself.
The new TestManualRetryMultipleExpansions test fails before the change,
and passes after.
Fixes golang/go#70249.
Change-Id: Id87323f77f573d9ac364010dfc0b8581e57ce9b8
Reviewed-on: https://go-review.googlesource.com/c/build/+/626335
Auto-Submit: Dmitri Shuralyov <dmitshur@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Carlos Amedee <carlos@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 94e0a21..5a9533b 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -35,8 +35,8 @@
// definition rather than producing an output. Unlike Actions and Tasks, they
// execute multiple times and must produce exactly the same workflow
// modifications each time. As such, they should be pure functions of their
-// inputs. Producing different modifications, or running multiple expansions
-// concurrently, is an error that will corrupt the workflow's state.
+// inputs. Producing different modifications is an error that will corrupt
+// the workflow's state. A workflow will run at most one expansion at a time.
//
// Once a Definition is complete, call Start to set its parameters and
// instantiate it into a Workflow. Call Run to execute the workflow until
@@ -486,8 +486,7 @@
// Unlike normal tasks, expansions may run multiple times and must produce
// the exact same changes to the definition each time.
//
-// Running more than one expansion concurrently is an error and will corrupt
-// the workflow.
+// A workflow will run at most one expansion at a time.
func Expand0[O1 any](d *Definition, name string, f func(*Definition) (Value[O1], error), opts ...TaskOption) Value[O1] {
return addExpansion[O1](d, name, f, nil, opts)
}
@@ -807,6 +806,7 @@
doneOnce := ctx.Done()
for {
running := 0
+ runningExpansion := false // Whether an expansion is running, and hasn't completed yet.
allDone := true
for _, task := range w.tasks {
if !task.created {
@@ -834,11 +834,16 @@
if !ready {
continue
}
+ if task.def.isExpansion && runningExpansion {
+ // Don't start a new expansion until the currently running one completes.
+ continue
+ }
task.started = true
running++
listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
taskCopy := *task
if task.def.isExpansion {
+ runningExpansion = true
defCopy := w.def.shallowClone()
go func() { stateChan <- runExpansion(defCopy, taskCopy, args) }()
} else {
@@ -861,6 +866,7 @@
case state := <-stateChan:
if state.def.isExpansion && state.finished && state.err == nil {
state.err = w.expand(state.expanded)
+ runningExpansion = false
}
listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
w.tasks[state.def] = &state
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
index b85be29..61276fb 100644
--- a/internal/workflow/workflow_test.go
+++ b/internal/workflow/workflow_test.go
@@ -332,6 +332,74 @@
}
}
+// Test that manual retry works on tasks that come from different expansions.
+//
+// This is similar to how the Go minor release workflow plans builders for
+// both releases. It previously failed due to expansions racing with with other,
+// leading to "unknown task" errors when retrying. See go.dev/issue/70249.
+func TestManualRetryMultipleExpansions(t *testing.T) {
+ // Create two sub-workflows, each one with an expansion that adds one work task.
+ // The work tasks fail on the first try, and require being successfully restarted
+ // for the workflow to complete.
+ var counters, retried [2]int
+ wd := wf.New(wf.ACL{})
+ sub1 := wd.Sub("sub1")
+ sub2 := wd.Sub("sub2")
+ for i, wd := range []*wf.Definition{sub1, sub2} {
+ out := wf.Expand0(wd, fmt.Sprintf("expand %d", i+1), func(wd *wf.Definition) (wf.Value[string], error) {
+ return wf.Task0(wd, fmt.Sprintf("work %d", i+1), func(ctx *wf.TaskContext) (string, error) {
+ ctx.DisableRetries()
+ counters[i]++
+ if counters[i] == 1 {
+ return "", fmt.Errorf("first try fail")
+ }
+ return "", nil
+ }), nil
+ })
+ wf.Output(wd, "out", out)
+ }
+
+ w := startWorkflow(t, wd, nil)
+ listener := &errorListener{
+ taskName: "work 1",
+ callback: func(string) {
+ go func() {
+ retried[0]++
+ err := w.RetryTask(context.Background(), "work 1")
+ if err != nil {
+ t.Errorf(`RetryTask("work 1") failed: %v`, err)
+ }
+ }()
+ },
+ Listener: &errorListener{
+ taskName: "work 2",
+ callback: func(string) {
+ go func() {
+ retried[1]++
+ err := w.RetryTask(context.Background(), "work 2")
+ if err != nil {
+ t.Errorf(`RetryTask("work 2") failed: %v`, err)
+ }
+ }()
+ },
+ Listener: &verboseListener{t},
+ },
+ }
+ runWorkflow(t, w, listener)
+ if counters[0] != 2 {
+ t.Errorf("sub1 task ran %v times, wanted 2", counters[0])
+ }
+ if retried[0] != 1 {
+ t.Errorf("sub1 task was retried %v times, wanted 1", retried[0])
+ }
+ if counters[1] != 2 {
+ t.Errorf("sub2 task ran %v times, wanted 2", counters[1])
+ }
+ if retried[1] != 1 {
+ t.Errorf("sub2 task was retried %v times, wanted 1", retried[1])
+ }
+}
+
func TestAutomaticRetry(t *testing.T) {
counter := 0
needsRetry := func(ctx *wf.TaskContext) (string, error) {