internal/workflow: add sub-workflows

The minor release workflow runs the single-release workflow twice in
parallel. Add support for nesting workflows to make that easy.

Adds (*Definition).Sub, which returns a definition that shares state
with its parent, but prefixes all names to avoid clashes.

For golang/go#51797.

Change-Id: Ia2d8965a7fc733e99ae4f0563f4018642352a29f
Reviewed-on: https://go-review.googlesource.com/c/build/+/410825
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Auto-Submit: Heschi Kreinick <heschi@google.com>
Reviewed-by: Alex Rakoczy <alex@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 8bcd579..ee44960 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -46,13 +46,31 @@
 // New creates a new workflow definition.
 func New() *Definition {
 	return &Definition{
-		tasks:   make(map[string]*taskDefinition),
-		outputs: make(map[string]*taskResult),
+		definitionState: &definitionState{
+			tasks:   make(map[string]*taskDefinition),
+			outputs: make(map[string]*taskResult),
+		},
 	}
 }
 
 // A Definition defines the structure of a workflow.
 type Definition struct {
+	namePrefix string // For sub-workflows, the prefix that will be prepended to various names.
+	*definitionState
+}
+
+func (d *Definition) Sub(name string) *Definition {
+	return &Definition{
+		namePrefix:      name + ": " + d.namePrefix,
+		definitionState: d.definitionState,
+	}
+}
+
+func (d *Definition) name(name string) string {
+	return d.namePrefix + name
+}
+
+type definitionState struct {
 	parameters []Parameter // Ordered according to registration, unique parameter names.
 	tasks      map[string]*taskDefinition
 	outputs    map[string]*taskResult
@@ -142,6 +160,7 @@
 	if p.Name == "" {
 		panic(fmt.Errorf("parameter name must be non-empty"))
 	}
+	p.Name = d.name(p.Name)
 	if p.ParameterType == (ParameterType{}) {
 		p.ParameterType = BasicString
 	}
@@ -227,7 +246,7 @@
 	if !ok {
 		panic(fmt.Errorf("output must be a task result"))
 	}
-	d.outputs[name] = tr
+	d.outputs[d.name(name)] = tr
 }
 
 // Task adds a task to the workflow definition. It can take any number of
@@ -244,6 +263,7 @@
 }
 
 func (d *Definition) addTask(hasResult bool, name string, f interface{}, inputs ...TaskInput) *taskDefinition {
+	name = d.name(name)
 	var args []Value
 	for _, arg := range inputs {
 		val, ok := arg.(Value)
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
index 6950dfb..b0cc255 100644
--- a/internal/workflow/workflow_test.go
+++ b/internal/workflow/workflow_test.go
@@ -86,6 +86,28 @@
 	}
 }
 
+func TestSub(t *testing.T) {
+	hi := func(ctx context.Context) (string, error) {
+		return "hi", nil
+	}
+	concat := func(ctx context.Context, s1, s2 string) (string, error) {
+		return s1 + " " + s2, nil
+	}
+
+	wd := workflow.New()
+	sub1 := wd.Sub("sub1")
+	g1 := sub1.Task("Greeting", hi)
+	sub2 := wd.Sub("sub2")
+	g2 := sub2.Task("Greeting", hi)
+	wd.Output("result", wd.Task("Concatenate", concat, g1, g2))
+
+	w := startWorkflow(t, wd, nil)
+	outputs := runWorkflow(t, w, nil)
+	if got, want := outputs["result"], "hi hi"; got != want {
+		t.Errorf("result = %q, want %q", got, want)
+	}
+}
+
 func TestStuck(t *testing.T) {
 	fail := func(context.Context) (string, error) {
 		return "", fmt.Errorf("goodbye world")