internal/workflow: add value-less dependencies

While building the release workflows, I'm frequently needing to express
a dependency on a function succeeding, not a value it produces. For
example, we need to wait for tests to pass, or check that there are no
release blockers. So far I've been expressing those dependencies as
empty strings, but it feels silly: the producing function has to return
"" everywhere, and the consuming function has know that it needs to wait
for something and accept a dummy argument.

Instead, introduce the concept of Actions to the workflow package. An
Action is similar to a Task, but its function returns only an error, and
its result is a Dependency, not a Value. Dependencies can be passed to
any Action or Task to express a dependency between the two, and are
invisible to the dependant function.

I'm not terribly fond of this API, so suggestions for improvement are
welcome.
 - Task and Action are not clearly distinguished. Perhaps Task should be
   renamed Function?
 - I don't like that you can't tell just by looking at a call site which
   of the TaskInputs are arguments that need to match the function's
   arguments. We could try introducing a fluent API or a builder struct.
   I fear either would be too verbose.

For golang/go#51797.

Change-Id: Ibd60b90ce0bf0ae83f7320f6a4e610dde8994d94
Reviewed-on: https://go-review.googlesource.com/c/build/+/410243
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Run-TryBot: Heschi Kreinick <heschi@google.com>
Auto-Submit: Heschi Kreinick <heschi@google.com>
Reviewed-by: Alex Rakoczy <alex@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 9e7fa49..8bcd579 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -5,10 +5,10 @@
 // Package workflow declaratively defines computation graphs that support
 // automatic parallelization, persistence, and monitoring.
 //
-// Workflows are a set of tasks that produce and consume Values. Tasks don't
-// run until the workflow is started, so Values represent data that doesn't
-// exist yet, and can't be used directly. Each value has a dynamic type, which
-// must match its uses.
+// Workflows are a set of tasks and actions that produce and consume Values.
+// Tasks don't run until the workflow is started, so Values represent data that
+// doesn't exist yet, and can't be used directly. Each value has a dynamic type,
+// which must match its uses.
 //
 // To wrap an existing Go object in a Value, use Constant. To define a
 // parameter that will be set when the workflow is started, use Parameter.
@@ -16,11 +16,17 @@
 // returned from Run. An arbitrary number of Values of the same type can
 // be combined with Slice.
 //
-// Each task has a set of input Values, and returns a single output Value.
+// Each Task has a set of input Values, and returns a single output Value.
 // Calling Task defines a task that will run a Go function when it runs. That
 // function must take a context.Context or *TaskContext, followed by arguments
-// corresponding to the dynamic type of the Values passed to it. The TaskContext
-// can be used as a normal Context, and also supports unstructured logging.
+// corresponding to the dynamic type of the Values passed to it. It must return
+// a value of any type and an error. The TaskContext can be used as a normal
+// Context, and also supports workflow features like unstructured logging.
+// A task only runs once all of its input Values and Dependencies are ready.
+//
+// In addition to Tasks, a workflow can have Actions, which represent functions
+// that don't produce an output. Their Go function must only return an error,
+// and their definition results in a Dependency rather than a Value.
 //
 // Once a Definition is complete, call Start to set its parameters and
 // instantiate it into a Workflow. Call Run to execute the workflow until
@@ -52,12 +58,24 @@
 	outputs    map[string]*taskResult
 }
 
+// A TaskInput is any input to the definition of a task.
+type TaskInput interface {
+	deps() []*taskDefinition
+}
+
 // A Value is a piece of data that will be produced or consumed when a task
 // runs. It cannot be read directly.
 type Value interface {
+	TaskInput
 	typ() reflect.Type
 	value(*Workflow) reflect.Value
-	deps() []*taskDefinition
+}
+
+// A Dependency represents a dependency on a prior task that does not produce
+// any in-band Value.
+type Dependency interface {
+	TaskInput
+	dependencyOnly()
 }
 
 // Parameter describes a Value that is filled in at workflow creation time.
@@ -213,14 +231,26 @@
 }
 
 // Task adds a task to the workflow definition. It can take any number of
-// arguments, and returns one output. name must uniquely identify the task in
+// inputs, and returns one output. name must uniquely identify the task in
 // the workflow.
 // f must be a function that takes a context.Context or *TaskContext argument,
-// followed by one argument for each of args, corresponding to the Value's dynamic type.
-// It must return two values, the first of which will be returned as its Value,
-// and an error that will be used by the workflow engine. See the package
-// documentation for examples.
-func (d *Definition) Task(name string, f interface{}, args ...Value) Value {
+// followed by one argument for each Value in inputs, corresponding to the
+// Value's dynamic type. It must return two values, the first of which will
+// be returned as its Value, and an error that will be used by the workflow
+// engine. See the package documentation for examples.
+func (d *Definition) Task(name string, f interface{}, inputs ...TaskInput) Value {
+	td := d.addTask(true, name, f, inputs...)
+	return &taskResult{td}
+}
+
+func (d *Definition) addTask(hasResult bool, name string, f interface{}, inputs ...TaskInput) *taskDefinition {
+	var args []Value
+	for _, arg := range inputs {
+		val, ok := arg.(Value)
+		if ok {
+			args = append(args, val)
+		}
+	}
 	if d.tasks[name] != nil {
 		panic(fmt.Errorf("task %q already exists in the workflow", name))
 	}
@@ -234,20 +264,42 @@
 	if !reflect.TypeOf((*TaskContext)(nil)).AssignableTo(ftyp.In(0)) {
 		panic(fmt.Errorf("the first argument of %v must be a context.Context or *TaskContext, is %v", f, ftyp.In(0)))
 	}
-	for i, arg := range args {
-		if !arg.typ().AssignableTo(ftyp.In(i + 1)) {
-			panic(fmt.Errorf("argument %v to %v is %v, but was passed %v", i, f, ftyp.In(i+1), arg.typ()))
+	for i, val := range args {
+		if !val.typ().AssignableTo(ftyp.In(i + 1)) {
+			panic(fmt.Errorf("argument %v to %v is %v, but was passed %v", i, f, ftyp.In(i+1), val.typ()))
 		}
 	}
-	if ftyp.NumOut() != 2 {
-		panic(fmt.Errorf("%v returns %v results, must return 2", f, ftyp.NumOut()))
+	wantOuts := 2
+	if !hasResult {
+		wantOuts = 1
 	}
-	if ftyp.Out(1) != reflect.TypeOf((*error)(nil)).Elem() {
-		panic(fmt.Errorf("%v's second return value must be error, is %v", f, ftyp.Out(1)))
+	if ftyp.NumOut() != wantOuts {
+		panic(fmt.Errorf("function for task %v returns %v values, must return %v", name, ftyp.NumOut(), wantOuts))
 	}
-	td := &taskDefinition{name: name, args: args, f: f}
+	if ftyp.Out(wantOuts-1) != reflect.TypeOf((*error)(nil)).Elem() {
+		panic(fmt.Errorf("%v's last return value must be error, is %v", f, ftyp.Out(wantOuts-1)))
+	}
+	td := &taskDefinition{name: name, inputs: inputs, f: f}
 	d.tasks[name] = td
-	return &taskResult{task: td}
+	return td
+}
+
+// Action adds an Action to the workflow definition. Its behavior and
+// requirements are the same as Task, except that f must only return an error,
+// and the result of the definition is a Dependency.
+func (d *Definition) Action(name string, f interface{}, inputs ...TaskInput) Dependency {
+	td := d.addTask(false, name, f, inputs...)
+	return &dependency{td}
+}
+
+type dependency struct {
+	task *taskDefinition
+}
+
+func (d *dependency) dependencyOnly() {}
+
+func (d *dependency) deps() []*taskDefinition {
+	return []*taskDefinition{d.task}
 }
 
 // A TaskContext is a context.Context, plus workflow-related features.
@@ -289,9 +341,9 @@
 }
 
 type taskDefinition struct {
-	name string
-	args []Value
-	f    interface{}
+	name   string
+	inputs []TaskInput
+	f      interface{}
 }
 
 type taskResult struct {
@@ -331,13 +383,15 @@
 
 func (t *taskState) args() ([]reflect.Value, bool) {
 	var args []reflect.Value
-	for _, arg := range t.def.args {
+	for _, arg := range t.def.inputs {
 		for _, dep := range arg.deps() {
 			if depState, ok := t.w.tasks[dep]; !ok || !depState.finished || depState.err != nil {
 				return nil, false
 			}
 		}
-		args = append(args, arg.value(t.w))
+		if v, ok := arg.(Value); ok {
+			args = append(args, v.value(t.w))
+		}
 	}
 	return args, true
 }
@@ -376,7 +430,7 @@
 	// Validate tasks.
 	used := map[*taskDefinition]bool{}
 	for _, taskDef := range w.def.tasks {
-		for _, arg := range taskDef.args {
+		for _, arg := range taskDef.inputs {
 			for _, argDep := range arg.deps() {
 				used[argDep] = true
 			}
@@ -531,13 +585,15 @@
 	}
 	in := append([]reflect.Value{reflect.ValueOf(tctx)}, args...)
 	out := reflect.ValueOf(state.def.f).Call(in)
-	var err error
-	if !out[1].IsNil() {
-		err = out[1].Interface().(error)
+
+	if errIdx := len(out) - 1; !out[errIdx].IsNil() {
+		state.err = out[errIdx].Interface().(error)
 	}
 	state.finished = true
-	state.result, state.err = out[0].Interface(), err
-	if err == nil {
+	if len(out) == 2 {
+		state.result = out[0].Interface()
+	}
+	if state.err == nil {
 		state.serializedResult, state.err = json.Marshal(state.result)
 	}
 	return state
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
index 74dc165..6950dfb 100644
--- a/internal/workflow/workflow_test.go
+++ b/internal/workflow/workflow_test.go
@@ -36,6 +36,56 @@
 	}
 }
 
+func TestDependency(t *testing.T) {
+	var actionRan, checkRan bool
+	action := func(ctx context.Context) error {
+		actionRan = true
+		return nil
+	}
+	checkAction := func(ctx context.Context) error {
+		if !actionRan {
+			return fmt.Errorf("prior action didn't run")
+		}
+		checkRan = true
+		return nil
+	}
+	hi := func(ctx context.Context) (string, error) {
+		if !actionRan || !checkRan {
+			return "", fmt.Errorf("either action (%v) or checkAction (%v) didn't run", actionRan, checkRan)
+		}
+		return "hello world", nil
+	}
+
+	wd := workflow.New()
+	firstDep := wd.Action("first action", action)
+	secondDep := wd.Action("check action", checkAction, firstDep)
+	wd.Output("greeting", wd.Task("say hi", hi, secondDep))
+
+	w := startWorkflow(t, wd, nil)
+	outputs := runWorkflow(t, w, nil)
+	if got, want := outputs["greeting"], "hello world"; got != want {
+		t.Errorf("greeting = %q, want %q", got, want)
+	}
+}
+
+func TestDependencyError(t *testing.T) {
+	action := func(ctx context.Context) error {
+		return fmt.Errorf("hardcoded error")
+	}
+	task := func(ctx context.Context) (string, error) {
+		return "", fmt.Errorf("unexpected error")
+	}
+
+	wd := workflow.New()
+	dep := wd.Action("failing action", action)
+	wd.Output("output", wd.Task("task", task, dep))
+	w := startWorkflow(t, wd, nil)
+	l := &verboseListener{t: t}
+	if _, err := w.Run(context.Background(), l); err == nil {
+		t.Errorf("workflow finished successfully, expected an error")
+	}
+}
+
 func TestStuck(t *testing.T) {
 	fail := func(context.Context) (string, error) {
 		return "", fmt.Errorf("goodbye world")