internal/workflow: add sub-workflows
The minor release workflow runs the single-release workflow twice in
parallel. Add support for nesting workflows to make that easy.
Adds (*Definition).Sub, which returns a definition that shares state
with its parent, but prefixes all names to avoid clashes.
For golang/go#51797.
Change-Id: Ia2d8965a7fc733e99ae4f0563f4018642352a29f
Reviewed-on: https://go-review.googlesource.com/c/build/+/410825
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Auto-Submit: Heschi Kreinick <heschi@google.com>
Reviewed-by: Alex Rakoczy <alex@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 8bcd579..ee44960 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -46,13 +46,31 @@
// New creates a new workflow definition.
func New() *Definition {
return &Definition{
- tasks: make(map[string]*taskDefinition),
- outputs: make(map[string]*taskResult),
+ definitionState: &definitionState{
+ tasks: make(map[string]*taskDefinition),
+ outputs: make(map[string]*taskResult),
+ },
}
}
// A Definition defines the structure of a workflow.
type Definition struct {
+ namePrefix string // For sub-workflows, the prefix that will be prepended to various names.
+ *definitionState
+}
+
+func (d *Definition) Sub(name string) *Definition {
+ return &Definition{
+ namePrefix: name + ": " + d.namePrefix,
+ definitionState: d.definitionState,
+ }
+}
+
+func (d *Definition) name(name string) string {
+ return d.namePrefix + name
+}
+
+type definitionState struct {
parameters []Parameter // Ordered according to registration, unique parameter names.
tasks map[string]*taskDefinition
outputs map[string]*taskResult
@@ -142,6 +160,7 @@
if p.Name == "" {
panic(fmt.Errorf("parameter name must be non-empty"))
}
+ p.Name = d.name(p.Name)
if p.ParameterType == (ParameterType{}) {
p.ParameterType = BasicString
}
@@ -227,7 +246,7 @@
if !ok {
panic(fmt.Errorf("output must be a task result"))
}
- d.outputs[name] = tr
+ d.outputs[d.name(name)] = tr
}
// Task adds a task to the workflow definition. It can take any number of
@@ -244,6 +263,7 @@
}
func (d *Definition) addTask(hasResult bool, name string, f interface{}, inputs ...TaskInput) *taskDefinition {
+ name = d.name(name)
var args []Value
for _, arg := range inputs {
val, ok := arg.(Value)
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
index 6950dfb..b0cc255 100644
--- a/internal/workflow/workflow_test.go
+++ b/internal/workflow/workflow_test.go
@@ -86,6 +86,28 @@
}
}
+func TestSub(t *testing.T) {
+ hi := func(ctx context.Context) (string, error) {
+ return "hi", nil
+ }
+ concat := func(ctx context.Context, s1, s2 string) (string, error) {
+ return s1 + " " + s2, nil
+ }
+
+ wd := workflow.New()
+ sub1 := wd.Sub("sub1")
+ g1 := sub1.Task("Greeting", hi)
+ sub2 := wd.Sub("sub2")
+ g2 := sub2.Task("Greeting", hi)
+ wd.Output("result", wd.Task("Concatenate", concat, g1, g2))
+
+ w := startWorkflow(t, wd, nil)
+ outputs := runWorkflow(t, w, nil)
+ if got, want := outputs["result"], "hi hi"; got != want {
+ t.Errorf("result = %q, want %q", got, want)
+ }
+}
+
func TestStuck(t *testing.T) {
fail := func(context.Context) (string, error) {
return "", fmt.Errorf("goodbye world")