internal/workflow: hang less
We don't want to abort a workflow when a single task fails -- we want to
let other tasks finish if they can. However, once everything that can
run has, there's no point in sticking around. Exit in that case.
Also set timeouts on most tests.
Updates golang/go#49318.
Change-Id: I3dcdb7eb5703e6502cf7f155213cdad6595c4bac
Reviewed-on: https://go-review.googlesource.com/c/build/+/361195
Trust: Heschi Kreinick <heschi@google.com>
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index ff50cd7..f097fc5 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -378,7 +378,7 @@
return w, nil
}
-// Run runs a workflow to successful completion and returns its outputs.
+// Run runs a workflow to completion or quiescence and returns its outputs.
// listener.TaskStateChanged will be called when each task starts and
// finishes. It should be used only for monitoring and persistence purposes -
// to read task results, register Outputs.
@@ -420,6 +420,17 @@
}(*task)
}
+ // Exit if we've run everything we can given errors.
+ running := 0
+ for _, task := range w.tasks {
+ if task.started && !task.finished {
+ running++
+ }
+ }
+ if running == 0 {
+ return nil, fmt.Errorf("workflow has progressed as far as it can")
+ }
+
select {
case <-ctx.Done():
return nil, ctx.Err()
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
index 4044ddf..b57889e 100644
--- a/internal/workflow/workflow_test.go
+++ b/internal/workflow/workflow_test.go
@@ -36,6 +36,22 @@
}
}
+func TestStuck(t *testing.T) {
+ fail := func(context.Context) (string, error) {
+ return "", fmt.Errorf("goodbye world")
+ }
+
+ wd := workflow.New()
+ nothing := wd.Task("fail", fail)
+ wd.Output("nothing", nothing)
+
+ w := startWorkflow(t, wd, nil)
+ _, err := w.Run(context.Background(), &verboseListener{t: t})
+ if err == nil || !strings.Contains(err.Error(), "as far as it can") {
+ t.Errorf("Run of stuck workflow = %v, wanted it to give up early", err)
+ }
+}
+
func TestSplitJoin(t *testing.T) {
echo := func(ctx context.Context, arg string) (string, error) {
return arg, nil
@@ -250,11 +266,13 @@
}
func runWorkflow(t *testing.T, w *workflow.Workflow, listener workflow.Listener) map[string]interface{} {
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
t.Helper()
if listener == nil {
listener = &verboseListener{t}
}
- outputs, err := w.Run(context.Background(), listener)
+ outputs, err := w.Run(ctx, listener)
if err != nil {
t.Fatal(err)
}