internal/workflow: add basic workflow functionality

Creates the workflow package, which runs computation graphs defined by
a declarative API inspired by Beam. See the package documentation for an
overview.

Future work:
- Allow tasks to log, preferably in a streaming way.
- Enable persistence.
- Add whatever additional observability we need to write a pretty UI.

Updates golang/go#47406.

Change-Id: I6f720191a48549cfee3ad6c305399fe9b63edf9d
Reviewed-on: https://go-review.googlesource.com/c/build/+/339949
Trust: Heschi Kreinick <heschi@google.com>
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Reviewed-by: Carlos Amedee <carlos@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
new file mode 100644
index 0000000..781437f
--- /dev/null
+++ b/internal/workflow/workflow.go
@@ -0,0 +1,342 @@
+// Copyright 2021 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package workflow declaratively defines computation graphs that support
+// automatic parallelization, persistance, and monitoring.
+//
+// Workflows are a set of tasks that produce and consume Values. Tasks don't
+// run until the workflow is started, so Values represent data that doesn't
+// exist yet, and can't be used directly. Each value has a dynamic type, which
+// must match its uses.
+//
+// To wrap an existing Go object in a Value, use Constant. To define a
+// parameter that will be set when the workflow is started, use Parameter.
+// To read a task's return value, register it as an Output, and it will be
+// returned from Run. An arbitrary number of Values of the same type can
+// be combined with Slice.
+//
+// Each task has a set of input Values, and returns a single output Value.
+// Calling Task defines a task that will run a Go function when it runs. That
+// function must take a context.Context, followed by arguments corresponding to
+// the dynamic type of the Values passed to it.
+//
+// Once a Definition is complete, call Start to set its parameters and
+// instantiate it into a Workflow. Call Run to execute the workflow until
+// completion.
+package workflow
+
+import (
+	"context"
+	"fmt"
+	"reflect"
+	"sync"
+
+	"github.com/google/uuid"
+)
+
+// New creates a new workflow definition.
+func New() *Definition {
+	return &Definition{
+		parameterNames: map[string]struct{}{},
+		tasks:          map[string]*taskDefinition{},
+		outputs:        map[string]*taskResult{},
+	}
+}
+
+// A Definition defines the structure of a workflow.
+type Definition struct {
+	parameterNames map[string]struct{}
+	tasks          map[string]*taskDefinition
+	outputs        map[string]*taskResult
+}
+
+// A Value is a piece of data that will be produced or consumed when a task
+// runs. It cannot be read directly.
+type Value interface {
+	typ() reflect.Type
+	value(*Workflow) reflect.Value
+	deps() []*taskDefinition
+}
+
+// Parameter creates a Value that is filled in at workflow creation time.
+func (d *Definition) Parameter(name string) Value {
+	d.parameterNames[name] = struct{}{}
+	return &workflowParameter{name: name}
+}
+
+type workflowParameter struct {
+	name string
+}
+
+func (wp *workflowParameter) typ() reflect.Type {
+	return reflect.TypeOf("")
+}
+
+func (wp *workflowParameter) value(w *Workflow) reflect.Value {
+	return reflect.ValueOf(w.params[wp.name])
+}
+
+func (wp *workflowParameter) deps() []*taskDefinition {
+	return nil
+}
+
+// Constant creates a Value from an existing object.
+func (d *Definition) Constant(value interface{}) Value {
+	return &constant{reflect.ValueOf(value)}
+}
+
+type constant struct {
+	v reflect.Value
+}
+
+func (c *constant) typ() reflect.Type               { return c.v.Type() }
+func (c *constant) value(_ *Workflow) reflect.Value { return c.v }
+func (c *constant) deps() []*taskDefinition         { return nil }
+
+// Slice combines multiple Values of the same type into a Value containing
+// a slice of that type.
+func (d *Definition) Slice(vs []Value) Value {
+	if len(vs) == 0 {
+		return &slice{}
+	}
+	typ := vs[0].typ()
+	for _, v := range vs[1:] {
+		if v.typ() != typ {
+			panic(fmt.Errorf("mismatched value types in Slice: %v vs. %v", v.typ(), typ))
+		}
+	}
+	return &slice{elt: typ, vals: vs}
+}
+
+type slice struct {
+	elt  reflect.Type
+	vals []Value
+}
+
+func (s *slice) typ() reflect.Type {
+	return reflect.SliceOf(s.elt)
+}
+
+func (s *slice) value(w *Workflow) reflect.Value {
+	value := reflect.MakeSlice(reflect.SliceOf(s.elt), len(s.vals), len(s.vals))
+	for i, v := range s.vals {
+		value.Index(i).Set(v.value(w))
+	}
+	return value
+}
+
+func (s *slice) deps() []*taskDefinition {
+	var result []*taskDefinition
+	for _, v := range s.vals {
+		result = append(result, v.deps()...)
+	}
+	return result
+}
+
+// Output registers a Value as a workflow output which will be returned when
+// the workflow finishes.
+func (d *Definition) Output(name string, v Value) {
+	tr, ok := v.(*taskResult)
+	if !ok {
+		panic(fmt.Errorf("output must be a task result"))
+	}
+	d.outputs[name] = tr
+}
+
+// Task adds a task to the workflow definition. It can take any number of
+// arguments, and returns one output. name must uniquely identify the task in
+// the workflow.
+// f must be a function that takes a context.Context argument, followed by one
+// argument for each of args, corresponding to the Value's dynamic type.
+// It must return two values, the first of which will be returned as its Value,
+// and an error that will be used by the workflow engine. See the package
+// documentation for examples.
+func (d *Definition) Task(name string, f interface{}, args ...Value) Value {
+	if d.tasks[name] != nil {
+		panic(fmt.Errorf("task %q already exists in the workflow", name))
+	}
+	ftyp := reflect.ValueOf(f).Type()
+	if ftyp.Kind() != reflect.Func {
+		panic(fmt.Errorf("%v is not a function", f))
+	}
+	if ftyp.NumIn()-1 != len(args) {
+		panic(fmt.Errorf("%v takes %v non-Context arguments, but was passed %v", f, ftyp.NumIn()-1, len(args)))
+	}
+	if ftyp.In(0) != reflect.TypeOf((*context.Context)(nil)).Elem() {
+		panic(fmt.Errorf("the first argument of %v must be a context.Context, is %v", f, ftyp.In(0)))
+	}
+	for i, arg := range args {
+		if !arg.typ().AssignableTo(ftyp.In(i + 1)) {
+			panic(fmt.Errorf("argument %v to %v is %v, but was passed %v", i, f, ftyp.In(i+1), arg.typ()))
+		}
+	}
+	if ftyp.NumOut() != 2 {
+		panic(fmt.Errorf("%v returns %v results, must return 2", f, ftyp.NumOut()))
+	}
+	if ftyp.Out(1) != reflect.TypeOf((*error)(nil)).Elem() {
+		panic(fmt.Errorf("%v's second return value must be error, is %v", f, ftyp.Out(1)))
+	}
+	td := &taskDefinition{name: name, args: args, f: f}
+	d.tasks[name] = td
+	return &taskResult{task: td}
+}
+
+type taskDefinition struct {
+	name string
+	args []Value
+	f    interface{}
+}
+
+type taskResult struct {
+	task *taskDefinition
+}
+
+func (tr *taskResult) typ() reflect.Type {
+	return reflect.ValueOf(tr.task.f).Type().Out(0)
+}
+
+func (tr *taskResult) value(w *Workflow) reflect.Value {
+	return reflect.ValueOf(w.tasks[tr.task].result)
+}
+
+func (tr *taskResult) deps() []*taskDefinition {
+	return []*taskDefinition{tr.task}
+}
+
+// A Workflow is an instantiated workflow instance, ready to run.
+type Workflow struct {
+	id     string
+	def    *Definition
+	params map[string]string
+
+	tasks map[*taskDefinition]*taskState
+}
+
+type taskState struct {
+	def      *taskDefinition
+	w        *Workflow
+	started  bool
+	finished bool
+	result   interface{}
+	err      error
+}
+
+func (t *taskState) args() ([]reflect.Value, bool) {
+	var args []reflect.Value
+	for _, arg := range t.def.args {
+		for _, dep := range arg.deps() {
+			if depState, ok := t.w.tasks[dep]; !ok || !depState.finished || depState.err != nil {
+				return nil, false
+			}
+		}
+		args = append(args, arg.value(t.w))
+	}
+	return args, true
+}
+
+func (t *taskState) toExported() *TaskState {
+	return &TaskState{
+		Name:     t.def.name,
+		Finished: t.finished,
+		Result:   t.result,
+		Error:    t.err,
+	}
+}
+
+// TaskState contains the state of a task in a running workflow. Once finished
+// is true, either Result or Error will be populated.
+type TaskState struct {
+	Name     string
+	Finished bool
+	Result   interface{}
+	Error    error
+}
+
+// Start instantiates a workflow with the given parameters.
+func Start(def *Definition, params map[string]string) (*Workflow, error) {
+	w := &Workflow{
+		id:     uuid.New().String(),
+		def:    def,
+		params: params,
+		tasks:  map[*taskDefinition]*taskState{},
+	}
+	used := map[*taskDefinition]bool{}
+	for _, taskDef := range def.tasks {
+		w.tasks[taskDef] = &taskState{def: taskDef, w: w}
+		for _, arg := range taskDef.args {
+			for _, argDep := range arg.deps() {
+				used[argDep] = true
+			}
+		}
+	}
+	for _, output := range def.outputs {
+		used[output.task] = true
+	}
+	for _, task := range def.tasks {
+		if !used[task] {
+			return nil, fmt.Errorf("task %v is not referenced and should be deleted", task.name)
+		}
+	}
+	return w, nil
+}
+
+// Run runs a workflow to successful completion and returns its outputs.
+// statusFunc will be called when each task starts and finishes. It should be
+// used only for monitoring purposes - to read task results, register Outputs.
+func (w *Workflow) Run(ctx context.Context, stateFunc func(*TaskState)) (map[string]interface{}, error) {
+	var running sync.WaitGroup
+	defer running.Wait()
+
+	stateChan := make(chan taskState, 2*len(w.def.tasks))
+	for {
+		// If we have all the outputs, the workflow is done.
+		outValues := map[string]interface{}{}
+		for outName, outDef := range w.def.outputs {
+			if task := w.tasks[outDef.task]; task.finished && task.err == nil {
+				outValues[outName] = task.result
+			}
+		}
+		if len(outValues) == len(w.def.outputs) {
+			return outValues, nil
+		}
+
+		// Start any idle tasks whose dependencies are all done.
+		for _, task := range w.tasks {
+			if task.started {
+				continue
+			}
+			in, ready := task.args()
+			if !ready {
+				continue
+			}
+			task.started = true
+			stateFunc(task.toExported())
+			running.Add(1)
+			go func(task taskState) {
+				stateChan <- w.runTask(ctx, task, in)
+				running.Done()
+			}(*task)
+		}
+
+		select {
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		case state := <-stateChan:
+			w.tasks[state.def] = &state
+			stateFunc(state.toExported())
+		}
+	}
+}
+
+func (w *Workflow) runTask(ctx context.Context, state taskState, args []reflect.Value) taskState {
+	in := append([]reflect.Value{reflect.ValueOf(ctx)}, args...)
+	out := reflect.ValueOf(state.def.f).Call(in)
+	var err error
+	if !out[1].IsNil() {
+		err = out[1].Interface().(error)
+	}
+	state.finished = true
+	state.result, state.err = out[0].Interface(), err
+	return state
+}
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
new file mode 100644
index 0000000..aac7cf5
--- /dev/null
+++ b/internal/workflow/workflow_test.go
@@ -0,0 +1,129 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package workflow_test
+
+import (
+	"context"
+	"fmt"
+	"reflect"
+	"strings"
+	"testing"
+	"time"
+
+	"golang.org/x/build/internal/workflow"
+)
+
+func TestTrivial(t *testing.T) {
+	wd := workflow.New()
+	greeting := wd.Task("echo", echo, wd.Constant("hello world"))
+	wd.Output("greeting", greeting)
+
+	w, err := workflow.Start(wd, map[string]string{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	outputs, err := w.Run(context.Background(), loggingListener(t))
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got, want := outputs["greeting"], "hello world"; got != want {
+		t.Errorf("greeting = %q, want %q", got, want)
+	}
+}
+
+func TestSplitJoin(t *testing.T) {
+	wd := workflow.New()
+	in := wd.Task("echo", echo, wd.Constant("string #"))
+	add1 := wd.Task("add 1", appendInt, in, wd.Constant(1))
+	add2 := wd.Task("add 2", appendInt, in, wd.Constant(2))
+	both := wd.Slice([]workflow.Value{add1, add2})
+	out := wd.Task("join", join, both)
+	wd.Output("strings", out)
+
+	w, err := workflow.Start(wd, map[string]string{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	outputs, err := w.Run(context.Background(), loggingListener(t))
+	if err != nil {
+		t.Fatal(err)
+	}
+	if got, want := outputs["strings"], "string #1,string #2"; got != want {
+		t.Errorf("joined output = %q, want %q", got, want)
+	}
+}
+
+func TestParallelism(t *testing.T) {
+	wd := workflow.New()
+	out1 := wd.Task("sleep #1", sleep, wd.Constant(100*time.Millisecond))
+	out2 := wd.Task("sleep #2", sleep, wd.Constant(100*time.Millisecond))
+	wd.Output("out1", out1)
+	wd.Output("out2", out2)
+
+	w, err := workflow.Start(wd, map[string]string{})
+	if err != nil {
+		t.Fatal(err)
+	}
+	start := time.Now()
+	_, err = w.Run(context.Background(), loggingListener(t))
+	if err != nil {
+		t.Fatal(err)
+	}
+	if delay := time.Since(start); delay > 150*time.Millisecond {
+		t.Errorf("too much time elapsed: %v", delay)
+	}
+}
+
+func TestParameters(t *testing.T) {
+	wd := workflow.New()
+	param1 := wd.Parameter("param1")
+	param2 := wd.Parameter("param2")
+	out1 := wd.Task("echo 1", echo, param1)
+	out2 := wd.Task("echo 2", echo, param2)
+	wd.Output("out1", out1)
+	wd.Output("out2", out2)
+
+	w, err := workflow.Start(wd, map[string]string{"param1": "#1", "param2": "#2"})
+	if err != nil {
+		t.Fatal(err)
+	}
+	outputs, err := w.Run(context.Background(), loggingListener(t))
+	if err != nil {
+		t.Fatal(err)
+	}
+	if want := map[string]interface{}{"out1": "#1", "out2": "#2"}; !reflect.DeepEqual(outputs, want) {
+		t.Errorf("outputs = %#v, want %#v", outputs, want)
+	}
+}
+
+func sleep(ctx context.Context, d time.Duration) (struct{}, error) {
+	time.Sleep(d)
+	return struct{}{}, nil
+}
+
+func appendInt(ctx context.Context, s string, i int) (string, error) {
+	return fmt.Sprintf("%v%v", s, i), nil
+}
+
+func join(ctx context.Context, s []string) (string, error) {
+	return strings.Join(s, ","), nil
+}
+
+func echo(ctx context.Context, arg string) (string, error) {
+	return arg, nil
+}
+
+func loggingListener(t *testing.T) func(*workflow.TaskState) {
+	return func(st *workflow.TaskState) {
+		switch {
+		case !st.Finished:
+			t.Logf("task %-10v: started", st.Name)
+		case st.Error != nil:
+			t.Logf("task %-10v: error: %v", st.Name, st.Error)
+		default:
+			t.Logf("task %-10v: done: %v", st.Name, st.Result)
+		}
+	}
+}