internal/workflow: add persistence and logging support
Add workflow.Resume, which takes saved state and resumes a workflow.
The host can pass a Listener to capture task states to pass to it. There
is no offical API for modifying the state of the workflow before
resuming, e.g. to implement retries.
Add per-task logging support. Task functions can accept a *TaskContext
instead of a context.Context, which adds a logging interface that's
implemented by the host to do logging however it wants.
Change-Id: Iae9411ca55ac55718025e1e62f3010cba9b8363a
Reviewed-on: https://go-review.googlesource.com/c/build/+/348131
Trust: Heschi Kreinick <heschi@google.com>
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 781437f..36dbf6a 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -18,8 +18,9 @@
//
// Each task has a set of input Values, and returns a single output Value.
// Calling Task defines a task that will run a Go function when it runs. That
-// function must take a context.Context, followed by arguments corresponding to
-// the dynamic type of the Values passed to it.
+// function must take a *TaskContext or context.Context, followed by arguments
+// corresponding to the dynamic type of the Values passed to it. The TaskContext
+// can be used as a normal Context, and also supports unstructured logging.
//
// Once a Definition is complete, call Start to set its parameters and
// instantiate it into a Workflow. Call Run to execute the workflow until
@@ -163,8 +164,8 @@
if ftyp.NumIn()-1 != len(args) {
panic(fmt.Errorf("%v takes %v non-Context arguments, but was passed %v", f, ftyp.NumIn()-1, len(args)))
}
- if ftyp.In(0) != reflect.TypeOf((*context.Context)(nil)).Elem() {
- panic(fmt.Errorf("the first argument of %v must be a context.Context, is %v", f, ftyp.In(0)))
+ if !reflect.TypeOf((*TaskContext)(nil)).AssignableTo(ftyp.In(0)) {
+ panic(fmt.Errorf("the first argument of %v must be a context.Context or *TaskContext, is %v", f, ftyp.In(0)))
}
for i, arg := range args {
if !arg.typ().AssignableTo(ftyp.In(i + 1)) {
@@ -182,6 +183,42 @@
return &taskResult{task: td}
}
+// A TaskContext is a context.Context, plus workflow-related features.
+type TaskContext struct {
+ context.Context
+ Logger
+}
+
+// A Listener is used to notify the workflow host of state changes, for display
+// and persistence.
+type Listener interface {
+ // TaskStateChanged is called when the state of a task changes.
+ // state is safe to store or modify.
+ TaskStateChanged(workflowID, taskID string, state *TaskState) error
+ // Logger is called to obtain a Logger for a particular task.
+ Logger(workflowID, taskID string) Logger
+}
+
+// TaskState contains the state of a task in a running workflow. Once Finished
+// is true, either Result or Error will be populated.
+type TaskState struct {
+ Name string
+ Finished bool
+ Result interface{}
+ Error error
+}
+
+// WorkflowState contains the shallow state of a running workflow.
+type WorkflowState struct {
+ ID string
+ Params map[string]string
+}
+
+// A Logger is a debug logger passed to a task implementation.
+type Logger interface {
+ Printf(format string, v ...interface{})
+}
+
type taskDefinition struct {
name string
args []Value
@@ -206,7 +243,7 @@
// A Workflow is an instantiated workflow instance, ready to run.
type Workflow struct {
- id string
+ ID string
def *Definition
params map[string]string
@@ -244,47 +281,83 @@
}
}
-// TaskState contains the state of a task in a running workflow. Once finished
-// is true, either Result or Error will be populated.
-type TaskState struct {
- Name string
- Finished bool
- Result interface{}
- Error error
-}
-
// Start instantiates a workflow with the given parameters.
func Start(def *Definition, params map[string]string) (*Workflow, error) {
w := &Workflow{
- id: uuid.New().String(),
+ ID: uuid.New().String(),
def: def,
params: params,
tasks: map[*taskDefinition]*taskState{},
}
- used := map[*taskDefinition]bool{}
+ if err := w.validate(); err != nil {
+ return nil, err
+ }
for _, taskDef := range def.tasks {
w.tasks[taskDef] = &taskState{def: taskDef, w: w}
+ }
+ return w, nil
+}
+
+func (w *Workflow) validate() error {
+ used := map[*taskDefinition]bool{}
+ for _, taskDef := range w.def.tasks {
for _, arg := range taskDef.args {
for _, argDep := range arg.deps() {
used[argDep] = true
}
}
}
- for _, output := range def.outputs {
+ for _, output := range w.def.outputs {
used[output.task] = true
}
- for _, task := range def.tasks {
+ for _, task := range w.def.tasks {
if !used[task] {
- return nil, fmt.Errorf("task %v is not referenced and should be deleted", task.name)
+ return fmt.Errorf("task %v is not referenced and should be deleted", task.name)
}
}
+ return nil
+}
+
+// Resume restores a workflow from stored state. The WorkflowState can be
+// constructed by the host. TaskStates should be saved from Listener calls.
+// Tasks that had not finished will be restarted, but tasks that finished in
+// errors will not be retried.
+func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
+ w := &Workflow{
+ ID: state.ID,
+ def: def,
+ params: state.Params,
+ tasks: map[*taskDefinition]*taskState{},
+ }
+ if err := w.validate(); err != nil {
+ return nil, err
+ }
+ for _, taskDef := range def.tasks {
+ tState, ok := taskStates[taskDef.name]
+ if !ok {
+ return nil, fmt.Errorf("task state for %q not found", taskDef.name)
+ }
+ w.tasks[taskDef] = &taskState{
+ def: taskDef,
+ w: w,
+ started: tState.Finished, // Can't resume tasks, so either it's new or done.
+ finished: tState.Finished,
+ result: tState.Result,
+ err: tState.Error,
+ }
+ }
+
return w, nil
}
// Run runs a workflow to successful completion and returns its outputs.
-// statusFunc will be called when each task starts and finishes. It should be
-// used only for monitoring purposes - to read task results, register Outputs.
-func (w *Workflow) Run(ctx context.Context, stateFunc func(*TaskState)) (map[string]interface{}, error) {
+// listener.TaskStateChanged will be called when each task starts and
+// finishes. It should be used only for monitoring and persistence purposes -
+// to read task results, register Outputs.
+func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error) {
+ if listener == nil {
+ listener = &defaultListener{}
+ }
var running sync.WaitGroup
defer running.Wait()
@@ -311,10 +384,10 @@
continue
}
task.started = true
- stateFunc(task.toExported())
+ listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
running.Add(1)
go func(task taskState) {
- stateChan <- w.runTask(ctx, task, in)
+ stateChan <- w.runTask(ctx, listener, task, in)
running.Done()
}(*task)
}
@@ -324,13 +397,14 @@
return nil, ctx.Err()
case state := <-stateChan:
w.tasks[state.def] = &state
- stateFunc(state.toExported())
+ listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
}
}
}
-func (w *Workflow) runTask(ctx context.Context, state taskState, args []reflect.Value) taskState {
- in := append([]reflect.Value{reflect.ValueOf(ctx)}, args...)
+func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskState, args []reflect.Value) taskState {
+ tctx := &TaskContext{Context: ctx, Logger: listener.Logger(w.ID, state.def.name)}
+ in := append([]reflect.Value{reflect.ValueOf(tctx)}, args...)
out := reflect.ValueOf(state.def.f).Call(in)
var err error
if !out[1].IsNil() {
@@ -340,3 +414,17 @@
state.result, state.err = out[0].Interface(), err
return state
}
+
+type defaultListener struct{}
+
+func (s *defaultListener) TaskStateChanged(_, _ string, _ *TaskState) error {
+ return nil
+}
+
+func (s *defaultListener) Logger(_, task string) Logger {
+ return &defaultLogger{}
+}
+
+type defaultLogger struct{}
+
+func (l *defaultLogger) Printf(format string, v ...interface{}) {}