blob: 353b7864c8d467854d8a051da110305bde93c8a2 [file] [log] [blame]
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package workflow declaratively defines computation graphs that support
// automatic parallelization, persistence, and monitoring.
//
// Workflows are a set of tasks and actions that produce and consume Values.
// Tasks don't run until the workflow is started, so Values represent data that
// doesn't exist yet, and can't be used directly. Each value has a dynamic type,
// which must match its uses.
//
// To wrap an existing Go object in a Value, use Const. To define a
// parameter that will be set when the workflow is started, use Param.
// To read a task's return value, register it as an Output, and it will be
// returned from Run. An arbitrary number of Values of the same type can
// be combined with Slice.
//
// Each Task has a set of input Values, and returns a single output Value.
// Calling Task defines a task that will run a Go function when it runs. That
// function must take a context.Context or *TaskContext, followed by arguments
// corresponding to the dynamic type of the Values passed to it. It must return
// a value of any type and an error. The TaskContext can be used as a normal
// Context, and also supports workflow features like unstructured logging.
// A task only runs once all of its inputs are ready. All task outputs must be
// used either as inputs to another task or as a workflow Output.
//
// In addition to Tasks, a workflow can have Actions, which represent functions
// that don't produce an output. Their Go function must only return an error,
// and their definition results in a Dependency rather than a Value. Both
// Dependencies and Values can be passed to After and then to Task and Action
// definitions to create an ordering dependency that doesn't correspond to a
// function argument.
//
// Once a Definition is complete, call Start to set its parameters and
// instantiate it into a Workflow. Call Run to execute the workflow until
// completion.
package workflow
import (
"context"
"encoding/json"
"fmt"
"reflect"
"strings"
"time"
"github.com/google/uuid"
)
// New creates a new workflow definition.
func New() *Definition {
return &Definition{
definitionState: &definitionState{
tasks: make(map[string]*taskDefinition),
outputs: make(map[string]*taskDefinition),
},
}
}
// A Definition defines the structure of a workflow.
type Definition struct {
namePrefix string // For sub-workflows, the prefix that will be prepended to various names.
*definitionState
}
func (d *Definition) Sub(name string) *Definition {
return &Definition{
namePrefix: name + ": " + d.namePrefix,
definitionState: d.definitionState,
}
}
func (d *Definition) name(name string) string {
return d.namePrefix + name
}
type definitionState struct {
parameters []MetaParameter // Ordered according to registration, unique parameter names.
tasks map[string]*taskDefinition
outputs map[string]*taskDefinition
}
// A TaskOption affects the execution of a task but is not an argument to its function.
type TaskOption interface {
taskOption()
}
// A Value is a piece of data that will be produced or consumed when a task
// runs. It cannot be read directly.
type Value[T any] interface {
// This function prevents Values of different types from being convertible
// to each other.
valueType(T)
metaValue
}
type metaValue interface {
Dependency
typ() reflect.Type
value(*Workflow) reflect.Value
}
type MetaParameter interface {
// RequireNonZero reports whether parameter p is required to have a non-zero value.
RequireNonZero() bool
Name() string
Type() reflect.Type
HTMLElement() string
HTMLInputType() string
Doc() string
Example() string
}
// ParamDef describes a Value that is filled in at workflow creation time.
//
// It can be registered to a workflow with the Parameter function.
type ParamDef[T any] struct {
Name string // Name identifies the parameter within a workflow. Must be non-empty.
ParamType[T] // Parameter type. For strings, defaults to BasicString if not specified.
Doc string // Doc documents the parameter. Optional.
Example string // Example is an example value. Optional.
}
// parameter adds Value methods to ParamDef, so that users can't accidentally
// use a ParamDef without registering it.
type parameter[T any] struct {
d ParamDef[T]
}
func (p parameter[T]) Name() string { return p.d.Name }
func (p parameter[T]) Type() reflect.Type { return p.typ() }
func (p parameter[T]) HTMLElement() string { return p.d.HTMLElement }
func (p parameter[T]) HTMLInputType() string { return p.d.HTMLInputType }
func (p parameter[T]) Doc() string { return p.d.Doc }
func (p parameter[T]) Example() string { return p.d.Example }
func (p parameter[T]) RequireNonZero() bool {
return !strings.HasSuffix(p.d.Name, " (optional)")
}
func (p parameter[T]) valueType(T) {}
func (p parameter[T]) typ() reflect.Type {
var zero T
return reflect.TypeOf(zero)
}
func (p parameter[T]) value(w *Workflow) reflect.Value { return reflect.ValueOf(w.params[p.d.Name]) }
func (p parameter[T]) dependencies() []*taskDefinition { return nil }
// ParamType defines the type of a workflow parameter.
//
// Since parameters are entered via an HTML form,
// there are some HTML-related knobs available.
type ParamType[T any] struct {
// HTMLElement configures the HTML element for entering the parameter value.
// Supported values are "input" and "textarea".
HTMLElement string
// HTMLInputType optionally configures the <input> type attribute when HTMLElement is "input".
// If this attribute is not specified, <input> elements default to type="text".
// See https://developer.mozilla.org/en-US/docs/Web/HTML/Element/input#input_types.
HTMLInputType string
}
var (
// String parameter types.
BasicString = ParamType[string]{
HTMLElement: "input",
}
URL = ParamType[string]{
HTMLElement: "input",
HTMLInputType: "url",
}
// Slice of string parameter types.
SliceShort = ParamType[[]string]{
HTMLElement: "input",
}
SliceLong = ParamType[[]string]{
HTMLElement: "textarea",
}
)
// Param registers a new parameter p that is filled in at
// workflow creation time and returns the corresponding Value.
// Param name must be non-empty and uniquely identify the
// parameter in the workflow definition.
func Param[T any](d *Definition, p ParamDef[T]) Value[T] {
if p.Name == "" {
panic(fmt.Errorf("parameter name must be non-empty"))
}
p.Name = d.name(p.Name)
if p.HTMLElement == "" {
var zero T
switch any(zero).(type) {
case string:
p.HTMLElement = "input"
default:
panic(fmt.Errorf("must specify ParamType for %T", zero))
}
}
for _, old := range d.parameters {
if p.Name == old.Name() {
panic(fmt.Errorf("parameter with name %q was already registered with this workflow definition", p.Name))
}
}
d.parameters = append(d.parameters, parameter[T]{p})
return parameter[T]{p}
}
// Parameters returns parameters associated with the Definition
// in the same order that they were registered.
func (d *Definition) Parameters() []MetaParameter {
return d.parameters
}
// Const creates a Value from an existing object.
func Const[T any](value T) Value[T] {
return &constant[T]{value}
}
type constant[T any] struct {
v T
}
func (c *constant[T]) valueType(T) {}
func (c *constant[T]) typ() reflect.Type {
var zero []T
return reflect.TypeOf(zero)
}
func (c *constant[T]) value(_ *Workflow) reflect.Value { return reflect.ValueOf(c.v) }
func (c *constant[T]) dependencies() []*taskDefinition { return nil }
// Slice combines multiple Values of the same type into a Value containing
// a slice of that type.
func Slice[T any](vs ...Value[T]) Value[[]T] {
return &slice[T]{vals: vs}
}
type slice[T any] struct {
vals []Value[T]
}
func (s *slice[T]) valueType([]T) {}
func (s *slice[T]) typ() reflect.Type {
var zero []T
return reflect.TypeOf(zero)
}
func (s *slice[T]) value(w *Workflow) reflect.Value {
value := reflect.ValueOf(make([]T, len(s.vals)))
for i, v := range s.vals {
value.Index(i).Set(v.value(w))
}
return value
}
func (s *slice[T]) dependencies() []*taskDefinition {
var result []*taskDefinition
for _, v := range s.vals {
result = append(result, v.dependencies()...)
}
return result
}
// Output registers a Value as a workflow output which will be returned when
// the workflow finishes.
func Output[T any](d *Definition, name string, v Value[T]) {
tr, ok := v.(*taskResult[T])
if !ok {
panic(fmt.Errorf("output must be a task result, is %T", v))
}
d.outputs[d.name(name)] = tr.task
}
// A Dependency represents a dependency on a prior task.
type Dependency interface {
dependencies() []*taskDefinition
}
// After represents an ordering dependency on another Task or Action. It can be
// passed in addition to any arguments to the task's function.
func After(afters ...Dependency) TaskOption {
var deps []*taskDefinition
for _, a := range afters {
deps = append(deps, a.dependencies()...)
}
return &after{deps}
}
type after struct {
deps []*taskDefinition
}
func (a *after) taskOption() {}
// TaskN adds a task to the workflow definition. It takes N inputs, and returns
// one output. name must uniquely identify the task in the workflow.
// f must be a function that takes a context.Context or *TaskContext argument,
// followed by one argument for each Value in inputs, corresponding to the
// Value's dynamic type. It must return two values, the first of which will
// be returned as its Value, and an error that will be used by the workflow
// engine. See the package documentation for examples.
func Task0[C context.Context, O1 any](d *Definition, name string, f func(C) (O1, error), opts ...TaskOption) Value[O1] {
return addTask[O1](d, name, f, nil, opts)
}
func Task1[C context.Context, I1, O1 any](d *Definition, name string, f func(C, I1) (O1, error), i1 Value[I1], opts ...TaskOption) Value[O1] {
return addTask[O1](d, name, f, []metaValue{i1}, opts)
}
func Task2[C context.Context, I1, I2, O1 any](d *Definition, name string, f func(C, I1, I2) (O1, error), i1 Value[I1], i2 Value[I2], opts ...TaskOption) Value[O1] {
return addTask[O1](d, name, f, []metaValue{i1, i2}, opts)
}
func Task3[C context.Context, I1, I2, I3, O1 any](d *Definition, name string, f func(C, I1, I2, I3) (O1, error), i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) Value[O1] {
return addTask[O1](d, name, f, []metaValue{i1, i2, i3}, opts)
}
func Task4[C context.Context, I1, I2, I3, I4, O1 any](d *Definition, name string, f func(C, I1, I2, I3, I4) (O1, error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) Value[O1] {
return addTask[O1](d, name, f, []metaValue{i1, i2, i3, i4}, opts)
}
func Task5[C context.Context, I1, I2, I3, I4, I5, O1 any](d *Definition, name string, f func(C, I1, I2, I3, I4, I5) (O1, error), i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) Value[O1] {
return addTask[O1](d, name, f, []metaValue{i1, i2, i3, i4, i5}, opts)
}
func addTask[O1 any](d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) *taskResult[O1] {
name = d.name(name)
td := &taskDefinition{name: name, f: f, args: inputs}
for _, input := range inputs {
td.deps = append(td.deps, input.dependencies()...)
}
for _, opt := range opts {
td.deps = append(td.deps, opt.(*after).deps...)
}
d.tasks[name] = td
return &taskResult[O1]{td}
}
func addAction(d *Definition, name string, f interface{}, inputs []metaValue, opts []TaskOption) *dependency {
tr := addTask[interface{}](d, name, f, inputs, opts)
return &dependency{tr.task}
}
// ActionN adds an Action to the workflow definition. Its behavior and
// requirements are the same as Task, except that f must only return an error,
// and the result of the definition is a Dependency.
func Action0[C context.Context](d *Definition, name string, f func(C) error, opts ...TaskOption) Dependency {
return addAction(d, name, f, nil, opts)
}
func Action1[C context.Context, I1 any](d *Definition, name string, f func(C, I1) error, i1 Value[I1], opts ...TaskOption) Dependency {
return addAction(d, name, f, []metaValue{i1}, opts)
}
func Action2[C context.Context, I1, I2 any](d *Definition, name string, f func(C, I1, I2) error, i1 Value[I1], i2 Value[I2], opts ...TaskOption) Dependency {
return addAction(d, name, f, []metaValue{i1, i2}, opts)
}
func Action3[C context.Context, I1, I2, I3 any](d *Definition, name string, f func(C, I1, I2, I3) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], opts ...TaskOption) Dependency {
return addAction(d, name, f, []metaValue{i1, i2, i3}, opts)
}
func Action4[C context.Context, I1, I2, I3, I4 any](d *Definition, name string, f func(C, I1, I2, I3, I4) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], opts ...TaskOption) Dependency {
return addAction(d, name, f, []metaValue{i1, i2, i3, i4}, opts)
}
func Action5[C context.Context, I1, I2, I3, I4, I5 any](d *Definition, name string, f func(C, I1, I2, I3, I4, I5) error, i1 Value[I1], i2 Value[I2], i3 Value[I3], i4 Value[I4], i5 Value[I5], opts ...TaskOption) Dependency {
return addAction(d, name, f, []metaValue{i1, i2, i3, i4, i5}, opts)
}
type dependency struct {
task *taskDefinition
}
func (d *dependency) dependencies() []*taskDefinition {
return []*taskDefinition{d.task}
}
// A TaskContext is a context.Context, plus workflow-related features.
type TaskContext struct {
disableRetries bool
context.Context
Logger Logger
TaskName string
WorkflowID uuid.UUID
watchdogTimer *time.Timer
}
func (c *TaskContext) Printf(format string, v ...interface{}) {
c.ResetWatchdog()
c.Logger.Printf(format, v...)
}
func (c *TaskContext) DisableRetries() {
c.disableRetries = true
}
func (c *TaskContext) ResetWatchdog() {
c.watchdogTimer.Reset(WatchdogDelay)
}
// A Listener is used to notify the workflow host of state changes, for display
// and persistence.
type Listener interface {
// TaskStateChanged is called when the state of a task changes.
// state is safe to store or modify.
TaskStateChanged(workflowID uuid.UUID, taskID string, state *TaskState) error
// Logger is called to obtain a Logger for a particular task.
Logger(workflowID uuid.UUID, taskID string) Logger
}
// TaskState contains the state of a task in a running workflow. Once Finished
// is true, either Result or Error will be populated.
type TaskState struct {
Name string
Started bool
Finished bool
Result interface{}
SerializedResult []byte
Error string
RetryCount int
}
// WorkflowState contains the shallow state of a running workflow.
type WorkflowState struct {
ID uuid.UUID
Params map[string]interface{}
}
// A Logger is a debug logger passed to a task implementation.
type Logger interface {
Printf(format string, v ...interface{})
}
type taskDefinition struct {
name string
args []metaValue
deps []*taskDefinition
f interface{}
}
type taskResult[T any] struct {
task *taskDefinition
}
func (tr *taskResult[T]) valueType(T) {}
func (tr *taskResult[T]) typ() reflect.Type {
var zero []T
return reflect.TypeOf(zero)
}
func (tr *taskResult[T]) value(w *Workflow) reflect.Value {
return reflect.ValueOf(w.tasks[tr.task].result)
}
func (tr *taskResult[T]) dependencies() []*taskDefinition {
return []*taskDefinition{tr.task}
}
// A Workflow is an instantiated workflow instance, ready to run.
type Workflow struct {
ID uuid.UUID
def *Definition
params map[string]interface{}
tasks map[*taskDefinition]*taskState
}
type taskState struct {
def *taskDefinition
w *Workflow
started bool
finished bool
result interface{}
serializedResult []byte
err error
retryCount int
}
func (t *taskState) args() ([]reflect.Value, bool) {
for _, dep := range t.def.deps {
if depState, ok := t.w.tasks[dep]; !ok || !depState.finished || depState.err != nil {
return nil, false
}
}
var args []reflect.Value
for _, v := range t.def.args {
args = append(args, v.value(t.w))
}
return args, true
}
func (t *taskState) toExported() *TaskState {
state := &TaskState{
Name: t.def.name,
Finished: t.finished,
Result: t.result,
SerializedResult: append([]byte(nil), t.serializedResult...),
Started: t.started,
RetryCount: t.retryCount,
}
if t.err != nil {
state.Error = t.err.Error()
}
return state
}
// Start instantiates a workflow with the given parameters.
func Start(def *Definition, params map[string]interface{}) (*Workflow, error) {
w := &Workflow{
ID: uuid.New(),
def: def,
params: params,
tasks: map[*taskDefinition]*taskState{},
}
if err := w.validate(); err != nil {
return nil, err
}
for _, taskDef := range def.tasks {
w.tasks[taskDef] = &taskState{def: taskDef, w: w}
}
return w, nil
}
func (w *Workflow) validate() error {
// Validate tasks.
used := map[*taskDefinition]bool{}
for _, taskDef := range w.def.tasks {
for _, dep := range taskDef.deps {
used[dep] = true
}
}
for _, output := range w.def.outputs {
used[output] = true
}
for _, task := range w.def.tasks {
if !used[task] {
return fmt.Errorf("task %v is not referenced and should be deleted", task.name)
}
}
// Validate parameters.
if got, want := len(w.params), len(w.def.parameters); got != want {
return fmt.Errorf("parameter count mismatch: workflow instance has %d, but definition has %d", got, want)
}
paramDefs := map[string]MetaParameter{} // Key is parameter name.
for _, p := range w.def.parameters {
if _, ok := w.params[p.Name()]; !ok {
return fmt.Errorf("parameter name mismatch: workflow instance doesn't have %q, but definition requires it", p.Name())
}
paramDefs[p.Name()] = p
}
for name, v := range w.params {
if !paramDefs[name].Type().AssignableTo(reflect.TypeOf(v)) {
return fmt.Errorf("parameter type mismatch: value of parameter %q has type %v, but definition specifies %v", name, reflect.TypeOf(v), paramDefs[name].Type())
}
}
return nil
}
// Resume restores a workflow from stored state. Tasks that had not finished
// will be restarted, but tasks that finished in errors will not be retried.
//
// The host must create the WorkflowState. TaskStates should be saved from
// listener callbacks, but for ease of storage, their Result field does not
// need to be populated.
func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
w := &Workflow{
ID: state.ID,
def: def,
params: state.Params,
tasks: map[*taskDefinition]*taskState{},
}
if err := w.validate(); err != nil {
return nil, err
}
for _, taskDef := range def.tasks {
tState, ok := taskStates[taskDef.name]
if !ok {
return nil, fmt.Errorf("task state for %q not found", taskDef.name)
}
state := &taskState{
def: taskDef,
w: w,
started: tState.Finished, // Can't resume tasks, so either it's new or done.
finished: tState.Finished,
serializedResult: tState.SerializedResult,
retryCount: tState.RetryCount,
}
if state.serializedResult != nil {
result, err := unmarshalNew(reflect.ValueOf(taskDef.f).Type().Out(0), tState.SerializedResult)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal result of %v: %v", taskDef.name, err)
}
state.result = result
}
if tState.Error != "" {
state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter.
}
w.tasks[taskDef] = state
}
return w, nil
}
func unmarshalNew(t reflect.Type, data []byte) (interface{}, error) {
ptr := reflect.New(t)
if err := json.Unmarshal(data, ptr.Interface()); err != nil {
return nil, err
}
return ptr.Elem().Interface(), nil
}
// Run runs a workflow to completion or quiescence and returns its outputs.
// listener.TaskStateChanged will be called immediately, when each task starts,
// and when they finish. It should be used only for monitoring and persistence
// purposes. Register Outputs to read task results.
func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error) {
if listener == nil {
listener = &defaultListener{}
}
for _, task := range w.tasks {
listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
}
stateChan := make(chan taskState, 2*len(w.def.tasks))
for {
// If we have all the outputs, the workflow is done.
outValues := map[string]interface{}{}
for outName, outDef := range w.def.outputs {
if task := w.tasks[outDef]; task.finished && task.err == nil {
outValues[outName] = task.result
}
}
if len(outValues) == len(w.def.outputs) {
return outValues, nil
}
running := 0
for _, task := range w.tasks {
if task.started && !task.finished {
running++
}
}
if ctx.Err() == nil {
// Start any idle tasks whose dependencies are all done.
for _, task := range w.tasks {
if task.started {
continue
}
in, ready := task.args()
if !ready {
continue
}
task.started = true
running++
listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
go func(task taskState) {
stateChan <- w.runTask(ctx, listener, task, in)
}(*task)
}
}
// Exit if we've run everything we can given errors.
if running == 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
return nil, fmt.Errorf("workflow has progressed as far as it can")
}
}
state := <-stateChan
listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
w.tasks[state.def] = &state
}
}
// Maximum number of retries. This could be a workflow property.
var MaxRetries = 3
var WatchdogDelay = 10 * time.Minute
func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskState, args []reflect.Value) taskState {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tctx := &TaskContext{
Context: ctx,
Logger: listener.Logger(w.ID, state.def.name),
TaskName: state.def.name,
WorkflowID: w.ID,
watchdogTimer: time.AfterFunc(WatchdogDelay, cancel),
}
in := append([]reflect.Value{reflect.ValueOf(tctx)}, args...)
fv := reflect.ValueOf(state.def.f)
out := fv.Call(in)
if !tctx.watchdogTimer.Stop() {
state.err = fmt.Errorf("task did not log for %v, assumed hung", WatchdogDelay)
} else if errIdx := len(out) - 1; !out[errIdx].IsNil() {
state.err = out[errIdx].Interface().(error)
}
state.finished = true
if len(out) == 2 && state.err == nil {
state.serializedResult, state.err = json.Marshal(out[0].Interface())
if state.err == nil {
state.result, state.err = unmarshalNew(fv.Type().Out(0), state.serializedResult)
}
if state.err == nil && !reflect.DeepEqual(out[0].Interface(), state.result) {
state.err = fmt.Errorf("JSON marshaling changed result from %#v to %#v", out[0].Interface(), state.result)
}
}
if state.err != nil && !tctx.disableRetries && state.retryCount+1 < MaxRetries {
tctx.Printf("task failed, will retry (%v of %v): %v", state.retryCount+1, MaxRetries, state.err)
state = taskState{
def: state.def,
w: state.w,
retryCount: state.retryCount + 1,
}
}
return state
}
type defaultListener struct{}
func (s *defaultListener) TaskStateChanged(_ uuid.UUID, _ string, _ *TaskState) error {
return nil
}
func (s *defaultListener) Logger(_ uuid.UUID, task string) Logger {
return &defaultLogger{}
}
type defaultLogger struct{}
func (l *defaultLogger) Printf(format string, v ...interface{}) {}