blob: 91d7d8dcc673a0ee864370c31504d74e2ab97029 [file] [log] [blame]
Heschi Kreinick6437f7a2021-08-04 14:44:29 -04001// Copyright 2021 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Package workflow declaratively defines computation graphs that support
6// automatic parallelization, persistance, and monitoring.
7//
8// Workflows are a set of tasks that produce and consume Values. Tasks don't
9// run until the workflow is started, so Values represent data that doesn't
10// exist yet, and can't be used directly. Each value has a dynamic type, which
11// must match its uses.
12//
13// To wrap an existing Go object in a Value, use Constant. To define a
14// parameter that will be set when the workflow is started, use Parameter.
15// To read a task's return value, register it as an Output, and it will be
16// returned from Run. An arbitrary number of Values of the same type can
17// be combined with Slice.
18//
19// Each task has a set of input Values, and returns a single output Value.
20// Calling Task defines a task that will run a Go function when it runs. That
Heschi Kreinickcc073c52021-09-01 17:51:05 -040021// function must take a *TaskContext or context.Context, followed by arguments
22// corresponding to the dynamic type of the Values passed to it. The TaskContext
23// can be used as a normal Context, and also supports unstructured logging.
Heschi Kreinick6437f7a2021-08-04 14:44:29 -040024//
25// Once a Definition is complete, call Start to set its parameters and
26// instantiate it into a Workflow. Call Run to execute the workflow until
27// completion.
28package workflow
29
30import (
31 "context"
Heschi Kreinick74917a52021-09-16 18:40:42 -040032 "encoding/json"
Heschi Kreinick6437f7a2021-08-04 14:44:29 -040033 "fmt"
34 "reflect"
Heschi Kreinick6437f7a2021-08-04 14:44:29 -040035
36 "github.com/google/uuid"
37)
38
39// New creates a new workflow definition.
40func New() *Definition {
41 return &Definition{
42 parameterNames: map[string]struct{}{},
43 tasks: map[string]*taskDefinition{},
44 outputs: map[string]*taskResult{},
45 }
46}
47
48// A Definition defines the structure of a workflow.
49type Definition struct {
50 parameterNames map[string]struct{}
51 tasks map[string]*taskDefinition
52 outputs map[string]*taskResult
53}
54
55// A Value is a piece of data that will be produced or consumed when a task
56// runs. It cannot be read directly.
57type Value interface {
58 typ() reflect.Type
59 value(*Workflow) reflect.Value
60 deps() []*taskDefinition
61}
62
63// Parameter creates a Value that is filled in at workflow creation time.
64func (d *Definition) Parameter(name string) Value {
65 d.parameterNames[name] = struct{}{}
66 return &workflowParameter{name: name}
67}
68
Alexander Rakoczy88204a42021-09-17 12:15:05 -040069// ParameterNames returns the names of all parameters associated with
70// the Definition.
71func (d *Definition) ParameterNames() []string {
72 var names []string
73 for n := range d.parameterNames {
74 names = append(names, n)
75 }
76 return names
77}
78
Heschi Kreinick6437f7a2021-08-04 14:44:29 -040079type workflowParameter struct {
80 name string
81}
82
83func (wp *workflowParameter) typ() reflect.Type {
84 return reflect.TypeOf("")
85}
86
87func (wp *workflowParameter) value(w *Workflow) reflect.Value {
88 return reflect.ValueOf(w.params[wp.name])
89}
90
91func (wp *workflowParameter) deps() []*taskDefinition {
92 return nil
93}
94
95// Constant creates a Value from an existing object.
96func (d *Definition) Constant(value interface{}) Value {
97 return &constant{reflect.ValueOf(value)}
98}
99
100type constant struct {
101 v reflect.Value
102}
103
104func (c *constant) typ() reflect.Type { return c.v.Type() }
105func (c *constant) value(_ *Workflow) reflect.Value { return c.v }
106func (c *constant) deps() []*taskDefinition { return nil }
107
108// Slice combines multiple Values of the same type into a Value containing
109// a slice of that type.
110func (d *Definition) Slice(vs []Value) Value {
111 if len(vs) == 0 {
112 return &slice{}
113 }
114 typ := vs[0].typ()
115 for _, v := range vs[1:] {
116 if v.typ() != typ {
117 panic(fmt.Errorf("mismatched value types in Slice: %v vs. %v", v.typ(), typ))
118 }
119 }
120 return &slice{elt: typ, vals: vs}
121}
122
123type slice struct {
124 elt reflect.Type
125 vals []Value
126}
127
128func (s *slice) typ() reflect.Type {
129 return reflect.SliceOf(s.elt)
130}
131
132func (s *slice) value(w *Workflow) reflect.Value {
133 value := reflect.MakeSlice(reflect.SliceOf(s.elt), len(s.vals), len(s.vals))
134 for i, v := range s.vals {
135 value.Index(i).Set(v.value(w))
136 }
137 return value
138}
139
140func (s *slice) deps() []*taskDefinition {
141 var result []*taskDefinition
142 for _, v := range s.vals {
143 result = append(result, v.deps()...)
144 }
145 return result
146}
147
148// Output registers a Value as a workflow output which will be returned when
149// the workflow finishes.
150func (d *Definition) Output(name string, v Value) {
151 tr, ok := v.(*taskResult)
152 if !ok {
153 panic(fmt.Errorf("output must be a task result"))
154 }
155 d.outputs[name] = tr
156}
157
158// Task adds a task to the workflow definition. It can take any number of
159// arguments, and returns one output. name must uniquely identify the task in
160// the workflow.
161// f must be a function that takes a context.Context argument, followed by one
162// argument for each of args, corresponding to the Value's dynamic type.
163// It must return two values, the first of which will be returned as its Value,
164// and an error that will be used by the workflow engine. See the package
165// documentation for examples.
166func (d *Definition) Task(name string, f interface{}, args ...Value) Value {
167 if d.tasks[name] != nil {
168 panic(fmt.Errorf("task %q already exists in the workflow", name))
169 }
170 ftyp := reflect.ValueOf(f).Type()
171 if ftyp.Kind() != reflect.Func {
172 panic(fmt.Errorf("%v is not a function", f))
173 }
174 if ftyp.NumIn()-1 != len(args) {
175 panic(fmt.Errorf("%v takes %v non-Context arguments, but was passed %v", f, ftyp.NumIn()-1, len(args)))
176 }
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400177 if !reflect.TypeOf((*TaskContext)(nil)).AssignableTo(ftyp.In(0)) {
178 panic(fmt.Errorf("the first argument of %v must be a context.Context or *TaskContext, is %v", f, ftyp.In(0)))
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400179 }
180 for i, arg := range args {
181 if !arg.typ().AssignableTo(ftyp.In(i + 1)) {
182 panic(fmt.Errorf("argument %v to %v is %v, but was passed %v", i, f, ftyp.In(i+1), arg.typ()))
183 }
184 }
185 if ftyp.NumOut() != 2 {
186 panic(fmt.Errorf("%v returns %v results, must return 2", f, ftyp.NumOut()))
187 }
188 if ftyp.Out(1) != reflect.TypeOf((*error)(nil)).Elem() {
189 panic(fmt.Errorf("%v's second return value must be error, is %v", f, ftyp.Out(1)))
190 }
191 td := &taskDefinition{name: name, args: args, f: f}
192 d.tasks[name] = td
193 return &taskResult{task: td}
194}
195
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400196// A TaskContext is a context.Context, plus workflow-related features.
197type TaskContext struct {
198 context.Context
199 Logger
200}
201
202// A Listener is used to notify the workflow host of state changes, for display
203// and persistence.
204type Listener interface {
205 // TaskStateChanged is called when the state of a task changes.
206 // state is safe to store or modify.
Alexander Rakoczycbb51122021-09-15 20:49:39 -0400207 TaskStateChanged(workflowID uuid.UUID, taskID string, state *TaskState) error
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400208 // Logger is called to obtain a Logger for a particular task.
Alexander Rakoczycbb51122021-09-15 20:49:39 -0400209 Logger(workflowID uuid.UUID, taskID string) Logger
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400210}
211
212// TaskState contains the state of a task in a running workflow. Once Finished
213// is true, either Result or Error will be populated.
214type TaskState struct {
Heschi Kreinick74917a52021-09-16 18:40:42 -0400215 Name string
216 Finished bool
217 Result interface{}
218 SerializedResult []byte
219 Error string
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400220}
221
222// WorkflowState contains the shallow state of a running workflow.
223type WorkflowState struct {
Alexander Rakoczycbb51122021-09-15 20:49:39 -0400224 ID uuid.UUID
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400225 Params map[string]string
226}
227
228// A Logger is a debug logger passed to a task implementation.
229type Logger interface {
230 Printf(format string, v ...interface{})
231}
232
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400233type taskDefinition struct {
234 name string
235 args []Value
236 f interface{}
237}
238
239type taskResult struct {
240 task *taskDefinition
241}
242
243func (tr *taskResult) typ() reflect.Type {
244 return reflect.ValueOf(tr.task.f).Type().Out(0)
245}
246
247func (tr *taskResult) value(w *Workflow) reflect.Value {
248 return reflect.ValueOf(w.tasks[tr.task].result)
249}
250
251func (tr *taskResult) deps() []*taskDefinition {
252 return []*taskDefinition{tr.task}
253}
254
255// A Workflow is an instantiated workflow instance, ready to run.
256type Workflow struct {
Alexander Rakoczycbb51122021-09-15 20:49:39 -0400257 ID uuid.UUID
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400258 def *Definition
259 params map[string]string
260
261 tasks map[*taskDefinition]*taskState
262}
263
264type taskState struct {
Heschi Kreinick74917a52021-09-16 18:40:42 -0400265 def *taskDefinition
266 w *Workflow
267 started bool
268 finished bool
269 result interface{}
270 serializedResult []byte
271 err error
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400272}
273
274func (t *taskState) args() ([]reflect.Value, bool) {
275 var args []reflect.Value
276 for _, arg := range t.def.args {
277 for _, dep := range arg.deps() {
278 if depState, ok := t.w.tasks[dep]; !ok || !depState.finished || depState.err != nil {
279 return nil, false
280 }
281 }
282 args = append(args, arg.value(t.w))
283 }
284 return args, true
285}
286
287func (t *taskState) toExported() *TaskState {
Heschi Kreinick74917a52021-09-16 18:40:42 -0400288 state := &TaskState{
289 Name: t.def.name,
290 Finished: t.finished,
291 Result: t.result,
292 SerializedResult: append([]byte(nil), t.serializedResult...),
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400293 }
Heschi Kreinick74917a52021-09-16 18:40:42 -0400294 if t.err != nil {
295 state.Error = t.err.Error()
296 }
297 return state
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400298}
299
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400300// Start instantiates a workflow with the given parameters.
301func Start(def *Definition, params map[string]string) (*Workflow, error) {
302 w := &Workflow{
Alexander Rakoczycbb51122021-09-15 20:49:39 -0400303 ID: uuid.New(),
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400304 def: def,
305 params: params,
306 tasks: map[*taskDefinition]*taskState{},
307 }
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400308 if err := w.validate(); err != nil {
309 return nil, err
310 }
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400311 for _, taskDef := range def.tasks {
312 w.tasks[taskDef] = &taskState{def: taskDef, w: w}
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400313 }
314 return w, nil
315}
316
317func (w *Workflow) validate() error {
318 used := map[*taskDefinition]bool{}
319 for _, taskDef := range w.def.tasks {
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400320 for _, arg := range taskDef.args {
321 for _, argDep := range arg.deps() {
322 used[argDep] = true
323 }
324 }
325 }
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400326 for _, output := range w.def.outputs {
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400327 used[output.task] = true
328 }
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400329 for _, task := range w.def.tasks {
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400330 if !used[task] {
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400331 return fmt.Errorf("task %v is not referenced and should be deleted", task.name)
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400332 }
333 }
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400334 return nil
335}
336
Heschi Kreinick74917a52021-09-16 18:40:42 -0400337// Resume restores a workflow from stored state. Tasks that had not finished
338// will be restarted, but tasks that finished in errors will not be retried.
339//
340// The host must create the WorkflowState. TaskStates should be saved from
Alexander Rakoczy79bf6882021-09-30 11:51:42 -0400341// listener callbacks, but for ease of storage, their Result field does not
Heschi Kreinick74917a52021-09-16 18:40:42 -0400342// need to be populated.
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400343func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
344 w := &Workflow{
345 ID: state.ID,
346 def: def,
347 params: state.Params,
348 tasks: map[*taskDefinition]*taskState{},
349 }
350 if err := w.validate(); err != nil {
351 return nil, err
352 }
353 for _, taskDef := range def.tasks {
354 tState, ok := taskStates[taskDef.name]
355 if !ok {
356 return nil, fmt.Errorf("task state for %q not found", taskDef.name)
357 }
Heschi Kreinick74917a52021-09-16 18:40:42 -0400358 state := &taskState{
359 def: taskDef,
360 w: w,
361 started: tState.Finished, // Can't resume tasks, so either it's new or done.
362 finished: tState.Finished,
363 serializedResult: tState.SerializedResult,
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400364 }
Heschi Kreinick74917a52021-09-16 18:40:42 -0400365 if state.serializedResult != nil {
366 ptr := reflect.New(reflect.ValueOf(taskDef.f).Type().Out(0))
367 if err := json.Unmarshal(tState.SerializedResult, ptr.Interface()); err != nil {
368 return nil, fmt.Errorf("failed to unmarshal result of %v: %v", taskDef.name, err)
369 }
370 state.result = ptr.Elem().Interface()
371 }
372 if tState.Error != "" {
373 state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter.
374 }
375 w.tasks[taskDef] = state
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400376 }
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400377 return w, nil
378}
379
Heschi Kreinickc54e9bc2021-11-03 17:15:17 -0400380// Run runs a workflow to completion or quiescence and returns its outputs.
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400381// listener.TaskStateChanged will be called when each task starts and
382// finishes. It should be used only for monitoring and persistence purposes -
383// to read task results, register Outputs.
384func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error) {
385 if listener == nil {
386 listener = &defaultListener{}
387 }
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400388
389 stateChan := make(chan taskState, 2*len(w.def.tasks))
390 for {
391 // If we have all the outputs, the workflow is done.
392 outValues := map[string]interface{}{}
393 for outName, outDef := range w.def.outputs {
394 if task := w.tasks[outDef.task]; task.finished && task.err == nil {
395 outValues[outName] = task.result
396 }
397 }
398 if len(outValues) == len(w.def.outputs) {
399 return outValues, nil
400 }
401
Heschi Kreinickc54e9bc2021-11-03 17:15:17 -0400402 running := 0
403 for _, task := range w.tasks {
404 if task.started && !task.finished {
405 running++
406 }
407 }
Bryan C. Millsf3c74042021-11-10 10:27:06 -0500408
409 if ctx.Err() == nil {
410 // Start any idle tasks whose dependencies are all done.
411 for _, task := range w.tasks {
412 if task.started {
413 continue
414 }
415 in, ready := task.args()
416 if !ready {
417 continue
418 }
419 task.started = true
420 running++
421 listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
422 go func(task taskState) {
423 stateChan <- w.runTask(ctx, listener, task, in)
424 }(*task)
425 }
Heschi Kreinickc54e9bc2021-11-03 17:15:17 -0400426 }
427
Bryan C. Millsf3c74042021-11-10 10:27:06 -0500428 // Exit if we've run everything we can given errors.
429 if running == 0 {
430 select {
431 case <-ctx.Done():
432 return nil, ctx.Err()
433 default:
434 return nil, fmt.Errorf("workflow has progressed as far as it can")
435 }
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400436 }
Bryan C. Millsf3c74042021-11-10 10:27:06 -0500437
438 state := <-stateChan
439 w.tasks[state.def] = &state
440 if state.err != nil && ctx.Err() != nil {
441 // Don't report failures that occur after cancellation has begun.
442 // They may be due to the cancellation itself.
443 continue
444 }
445 listener.TaskStateChanged(w.ID, state.def.name, state.toExported())
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400446 }
447}
448
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400449func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskState, args []reflect.Value) taskState {
450 tctx := &TaskContext{Context: ctx, Logger: listener.Logger(w.ID, state.def.name)}
451 in := append([]reflect.Value{reflect.ValueOf(tctx)}, args...)
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400452 out := reflect.ValueOf(state.def.f).Call(in)
453 var err error
454 if !out[1].IsNil() {
455 err = out[1].Interface().(error)
456 }
457 state.finished = true
458 state.result, state.err = out[0].Interface(), err
Heschi Kreinick74917a52021-09-16 18:40:42 -0400459 if err == nil {
460 state.serializedResult, state.err = json.Marshal(state.result)
461 }
Heschi Kreinick6437f7a2021-08-04 14:44:29 -0400462 return state
463}
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400464
465type defaultListener struct{}
466
Alexander Rakoczycbb51122021-09-15 20:49:39 -0400467func (s *defaultListener) TaskStateChanged(_ uuid.UUID, _ string, _ *TaskState) error {
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400468 return nil
469}
470
Alexander Rakoczycbb51122021-09-15 20:49:39 -0400471func (s *defaultListener) Logger(_ uuid.UUID, task string) Logger {
Heschi Kreinickcc073c52021-09-01 17:51:05 -0400472 return &defaultLogger{}
473}
474
475type defaultLogger struct{}
476
477func (l *defaultLogger) Printf(format string, v ...interface{}) {}