Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 1 | // Copyright 2021 The Go Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | // Package workflow declaratively defines computation graphs that support |
| 6 | // automatic parallelization, persistance, and monitoring. |
| 7 | // |
| 8 | // Workflows are a set of tasks that produce and consume Values. Tasks don't |
| 9 | // run until the workflow is started, so Values represent data that doesn't |
| 10 | // exist yet, and can't be used directly. Each value has a dynamic type, which |
| 11 | // must match its uses. |
| 12 | // |
| 13 | // To wrap an existing Go object in a Value, use Constant. To define a |
| 14 | // parameter that will be set when the workflow is started, use Parameter. |
| 15 | // To read a task's return value, register it as an Output, and it will be |
| 16 | // returned from Run. An arbitrary number of Values of the same type can |
| 17 | // be combined with Slice. |
| 18 | // |
| 19 | // Each task has a set of input Values, and returns a single output Value. |
| 20 | // Calling Task defines a task that will run a Go function when it runs. That |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 21 | // function must take a *TaskContext or context.Context, followed by arguments |
| 22 | // corresponding to the dynamic type of the Values passed to it. The TaskContext |
| 23 | // can be used as a normal Context, and also supports unstructured logging. |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 24 | // |
| 25 | // Once a Definition is complete, call Start to set its parameters and |
| 26 | // instantiate it into a Workflow. Call Run to execute the workflow until |
| 27 | // completion. |
| 28 | package workflow |
| 29 | |
| 30 | import ( |
| 31 | "context" |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 32 | "encoding/json" |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 33 | "fmt" |
| 34 | "reflect" |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 35 | |
| 36 | "github.com/google/uuid" |
| 37 | ) |
| 38 | |
| 39 | // New creates a new workflow definition. |
| 40 | func New() *Definition { |
| 41 | return &Definition{ |
| 42 | parameterNames: map[string]struct{}{}, |
| 43 | tasks: map[string]*taskDefinition{}, |
| 44 | outputs: map[string]*taskResult{}, |
| 45 | } |
| 46 | } |
| 47 | |
| 48 | // A Definition defines the structure of a workflow. |
| 49 | type Definition struct { |
| 50 | parameterNames map[string]struct{} |
| 51 | tasks map[string]*taskDefinition |
| 52 | outputs map[string]*taskResult |
| 53 | } |
| 54 | |
| 55 | // A Value is a piece of data that will be produced or consumed when a task |
| 56 | // runs. It cannot be read directly. |
| 57 | type Value interface { |
| 58 | typ() reflect.Type |
| 59 | value(*Workflow) reflect.Value |
| 60 | deps() []*taskDefinition |
| 61 | } |
| 62 | |
| 63 | // Parameter creates a Value that is filled in at workflow creation time. |
| 64 | func (d *Definition) Parameter(name string) Value { |
| 65 | d.parameterNames[name] = struct{}{} |
| 66 | return &workflowParameter{name: name} |
| 67 | } |
| 68 | |
Alexander Rakoczy | 88204a4 | 2021-09-17 12:15:05 -0400 | [diff] [blame] | 69 | // ParameterNames returns the names of all parameters associated with |
| 70 | // the Definition. |
| 71 | func (d *Definition) ParameterNames() []string { |
| 72 | var names []string |
| 73 | for n := range d.parameterNames { |
| 74 | names = append(names, n) |
| 75 | } |
| 76 | return names |
| 77 | } |
| 78 | |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 79 | type workflowParameter struct { |
| 80 | name string |
| 81 | } |
| 82 | |
| 83 | func (wp *workflowParameter) typ() reflect.Type { |
| 84 | return reflect.TypeOf("") |
| 85 | } |
| 86 | |
| 87 | func (wp *workflowParameter) value(w *Workflow) reflect.Value { |
| 88 | return reflect.ValueOf(w.params[wp.name]) |
| 89 | } |
| 90 | |
| 91 | func (wp *workflowParameter) deps() []*taskDefinition { |
| 92 | return nil |
| 93 | } |
| 94 | |
| 95 | // Constant creates a Value from an existing object. |
| 96 | func (d *Definition) Constant(value interface{}) Value { |
| 97 | return &constant{reflect.ValueOf(value)} |
| 98 | } |
| 99 | |
| 100 | type constant struct { |
| 101 | v reflect.Value |
| 102 | } |
| 103 | |
| 104 | func (c *constant) typ() reflect.Type { return c.v.Type() } |
| 105 | func (c *constant) value(_ *Workflow) reflect.Value { return c.v } |
| 106 | func (c *constant) deps() []*taskDefinition { return nil } |
| 107 | |
| 108 | // Slice combines multiple Values of the same type into a Value containing |
| 109 | // a slice of that type. |
| 110 | func (d *Definition) Slice(vs []Value) Value { |
| 111 | if len(vs) == 0 { |
| 112 | return &slice{} |
| 113 | } |
| 114 | typ := vs[0].typ() |
| 115 | for _, v := range vs[1:] { |
| 116 | if v.typ() != typ { |
| 117 | panic(fmt.Errorf("mismatched value types in Slice: %v vs. %v", v.typ(), typ)) |
| 118 | } |
| 119 | } |
| 120 | return &slice{elt: typ, vals: vs} |
| 121 | } |
| 122 | |
| 123 | type slice struct { |
| 124 | elt reflect.Type |
| 125 | vals []Value |
| 126 | } |
| 127 | |
| 128 | func (s *slice) typ() reflect.Type { |
| 129 | return reflect.SliceOf(s.elt) |
| 130 | } |
| 131 | |
| 132 | func (s *slice) value(w *Workflow) reflect.Value { |
| 133 | value := reflect.MakeSlice(reflect.SliceOf(s.elt), len(s.vals), len(s.vals)) |
| 134 | for i, v := range s.vals { |
| 135 | value.Index(i).Set(v.value(w)) |
| 136 | } |
| 137 | return value |
| 138 | } |
| 139 | |
| 140 | func (s *slice) deps() []*taskDefinition { |
| 141 | var result []*taskDefinition |
| 142 | for _, v := range s.vals { |
| 143 | result = append(result, v.deps()...) |
| 144 | } |
| 145 | return result |
| 146 | } |
| 147 | |
| 148 | // Output registers a Value as a workflow output which will be returned when |
| 149 | // the workflow finishes. |
| 150 | func (d *Definition) Output(name string, v Value) { |
| 151 | tr, ok := v.(*taskResult) |
| 152 | if !ok { |
| 153 | panic(fmt.Errorf("output must be a task result")) |
| 154 | } |
| 155 | d.outputs[name] = tr |
| 156 | } |
| 157 | |
| 158 | // Task adds a task to the workflow definition. It can take any number of |
| 159 | // arguments, and returns one output. name must uniquely identify the task in |
| 160 | // the workflow. |
| 161 | // f must be a function that takes a context.Context argument, followed by one |
| 162 | // argument for each of args, corresponding to the Value's dynamic type. |
| 163 | // It must return two values, the first of which will be returned as its Value, |
| 164 | // and an error that will be used by the workflow engine. See the package |
| 165 | // documentation for examples. |
| 166 | func (d *Definition) Task(name string, f interface{}, args ...Value) Value { |
| 167 | if d.tasks[name] != nil { |
| 168 | panic(fmt.Errorf("task %q already exists in the workflow", name)) |
| 169 | } |
| 170 | ftyp := reflect.ValueOf(f).Type() |
| 171 | if ftyp.Kind() != reflect.Func { |
| 172 | panic(fmt.Errorf("%v is not a function", f)) |
| 173 | } |
| 174 | if ftyp.NumIn()-1 != len(args) { |
| 175 | panic(fmt.Errorf("%v takes %v non-Context arguments, but was passed %v", f, ftyp.NumIn()-1, len(args))) |
| 176 | } |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 177 | if !reflect.TypeOf((*TaskContext)(nil)).AssignableTo(ftyp.In(0)) { |
| 178 | panic(fmt.Errorf("the first argument of %v must be a context.Context or *TaskContext, is %v", f, ftyp.In(0))) |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 179 | } |
| 180 | for i, arg := range args { |
| 181 | if !arg.typ().AssignableTo(ftyp.In(i + 1)) { |
| 182 | panic(fmt.Errorf("argument %v to %v is %v, but was passed %v", i, f, ftyp.In(i+1), arg.typ())) |
| 183 | } |
| 184 | } |
| 185 | if ftyp.NumOut() != 2 { |
| 186 | panic(fmt.Errorf("%v returns %v results, must return 2", f, ftyp.NumOut())) |
| 187 | } |
| 188 | if ftyp.Out(1) != reflect.TypeOf((*error)(nil)).Elem() { |
| 189 | panic(fmt.Errorf("%v's second return value must be error, is %v", f, ftyp.Out(1))) |
| 190 | } |
| 191 | td := &taskDefinition{name: name, args: args, f: f} |
| 192 | d.tasks[name] = td |
| 193 | return &taskResult{task: td} |
| 194 | } |
| 195 | |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 196 | // A TaskContext is a context.Context, plus workflow-related features. |
| 197 | type TaskContext struct { |
| 198 | context.Context |
| 199 | Logger |
| 200 | } |
| 201 | |
| 202 | // A Listener is used to notify the workflow host of state changes, for display |
| 203 | // and persistence. |
| 204 | type Listener interface { |
| 205 | // TaskStateChanged is called when the state of a task changes. |
| 206 | // state is safe to store or modify. |
Alexander Rakoczy | cbb5112 | 2021-09-15 20:49:39 -0400 | [diff] [blame] | 207 | TaskStateChanged(workflowID uuid.UUID, taskID string, state *TaskState) error |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 208 | // Logger is called to obtain a Logger for a particular task. |
Alexander Rakoczy | cbb5112 | 2021-09-15 20:49:39 -0400 | [diff] [blame] | 209 | Logger(workflowID uuid.UUID, taskID string) Logger |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 210 | } |
| 211 | |
| 212 | // TaskState contains the state of a task in a running workflow. Once Finished |
| 213 | // is true, either Result or Error will be populated. |
| 214 | type TaskState struct { |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 215 | Name string |
| 216 | Finished bool |
| 217 | Result interface{} |
| 218 | SerializedResult []byte |
| 219 | Error string |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 220 | } |
| 221 | |
| 222 | // WorkflowState contains the shallow state of a running workflow. |
| 223 | type WorkflowState struct { |
Alexander Rakoczy | cbb5112 | 2021-09-15 20:49:39 -0400 | [diff] [blame] | 224 | ID uuid.UUID |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 225 | Params map[string]string |
| 226 | } |
| 227 | |
| 228 | // A Logger is a debug logger passed to a task implementation. |
| 229 | type Logger interface { |
| 230 | Printf(format string, v ...interface{}) |
| 231 | } |
| 232 | |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 233 | type taskDefinition struct { |
| 234 | name string |
| 235 | args []Value |
| 236 | f interface{} |
| 237 | } |
| 238 | |
| 239 | type taskResult struct { |
| 240 | task *taskDefinition |
| 241 | } |
| 242 | |
| 243 | func (tr *taskResult) typ() reflect.Type { |
| 244 | return reflect.ValueOf(tr.task.f).Type().Out(0) |
| 245 | } |
| 246 | |
| 247 | func (tr *taskResult) value(w *Workflow) reflect.Value { |
| 248 | return reflect.ValueOf(w.tasks[tr.task].result) |
| 249 | } |
| 250 | |
| 251 | func (tr *taskResult) deps() []*taskDefinition { |
| 252 | return []*taskDefinition{tr.task} |
| 253 | } |
| 254 | |
| 255 | // A Workflow is an instantiated workflow instance, ready to run. |
| 256 | type Workflow struct { |
Alexander Rakoczy | cbb5112 | 2021-09-15 20:49:39 -0400 | [diff] [blame] | 257 | ID uuid.UUID |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 258 | def *Definition |
| 259 | params map[string]string |
| 260 | |
| 261 | tasks map[*taskDefinition]*taskState |
| 262 | } |
| 263 | |
| 264 | type taskState struct { |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 265 | def *taskDefinition |
| 266 | w *Workflow |
| 267 | started bool |
| 268 | finished bool |
| 269 | result interface{} |
| 270 | serializedResult []byte |
| 271 | err error |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 272 | } |
| 273 | |
| 274 | func (t *taskState) args() ([]reflect.Value, bool) { |
| 275 | var args []reflect.Value |
| 276 | for _, arg := range t.def.args { |
| 277 | for _, dep := range arg.deps() { |
| 278 | if depState, ok := t.w.tasks[dep]; !ok || !depState.finished || depState.err != nil { |
| 279 | return nil, false |
| 280 | } |
| 281 | } |
| 282 | args = append(args, arg.value(t.w)) |
| 283 | } |
| 284 | return args, true |
| 285 | } |
| 286 | |
| 287 | func (t *taskState) toExported() *TaskState { |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 288 | state := &TaskState{ |
| 289 | Name: t.def.name, |
| 290 | Finished: t.finished, |
| 291 | Result: t.result, |
| 292 | SerializedResult: append([]byte(nil), t.serializedResult...), |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 293 | } |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 294 | if t.err != nil { |
| 295 | state.Error = t.err.Error() |
| 296 | } |
| 297 | return state |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 298 | } |
| 299 | |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 300 | // Start instantiates a workflow with the given parameters. |
| 301 | func Start(def *Definition, params map[string]string) (*Workflow, error) { |
| 302 | w := &Workflow{ |
Alexander Rakoczy | cbb5112 | 2021-09-15 20:49:39 -0400 | [diff] [blame] | 303 | ID: uuid.New(), |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 304 | def: def, |
| 305 | params: params, |
| 306 | tasks: map[*taskDefinition]*taskState{}, |
| 307 | } |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 308 | if err := w.validate(); err != nil { |
| 309 | return nil, err |
| 310 | } |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 311 | for _, taskDef := range def.tasks { |
| 312 | w.tasks[taskDef] = &taskState{def: taskDef, w: w} |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 313 | } |
| 314 | return w, nil |
| 315 | } |
| 316 | |
| 317 | func (w *Workflow) validate() error { |
| 318 | used := map[*taskDefinition]bool{} |
| 319 | for _, taskDef := range w.def.tasks { |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 320 | for _, arg := range taskDef.args { |
| 321 | for _, argDep := range arg.deps() { |
| 322 | used[argDep] = true |
| 323 | } |
| 324 | } |
| 325 | } |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 326 | for _, output := range w.def.outputs { |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 327 | used[output.task] = true |
| 328 | } |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 329 | for _, task := range w.def.tasks { |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 330 | if !used[task] { |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 331 | return fmt.Errorf("task %v is not referenced and should be deleted", task.name) |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 332 | } |
| 333 | } |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 334 | return nil |
| 335 | } |
| 336 | |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 337 | // Resume restores a workflow from stored state. Tasks that had not finished |
| 338 | // will be restarted, but tasks that finished in errors will not be retried. |
| 339 | // |
| 340 | // The host must create the WorkflowState. TaskStates should be saved from |
Alexander Rakoczy | 79bf688 | 2021-09-30 11:51:42 -0400 | [diff] [blame] | 341 | // listener callbacks, but for ease of storage, their Result field does not |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 342 | // need to be populated. |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 343 | func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) { |
| 344 | w := &Workflow{ |
| 345 | ID: state.ID, |
| 346 | def: def, |
| 347 | params: state.Params, |
| 348 | tasks: map[*taskDefinition]*taskState{}, |
| 349 | } |
| 350 | if err := w.validate(); err != nil { |
| 351 | return nil, err |
| 352 | } |
| 353 | for _, taskDef := range def.tasks { |
| 354 | tState, ok := taskStates[taskDef.name] |
| 355 | if !ok { |
| 356 | return nil, fmt.Errorf("task state for %q not found", taskDef.name) |
| 357 | } |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 358 | state := &taskState{ |
| 359 | def: taskDef, |
| 360 | w: w, |
| 361 | started: tState.Finished, // Can't resume tasks, so either it's new or done. |
| 362 | finished: tState.Finished, |
| 363 | serializedResult: tState.SerializedResult, |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 364 | } |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 365 | if state.serializedResult != nil { |
| 366 | ptr := reflect.New(reflect.ValueOf(taskDef.f).Type().Out(0)) |
| 367 | if err := json.Unmarshal(tState.SerializedResult, ptr.Interface()); err != nil { |
| 368 | return nil, fmt.Errorf("failed to unmarshal result of %v: %v", taskDef.name, err) |
| 369 | } |
| 370 | state.result = ptr.Elem().Interface() |
| 371 | } |
| 372 | if tState.Error != "" { |
| 373 | state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter. |
| 374 | } |
| 375 | w.tasks[taskDef] = state |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 376 | } |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 377 | return w, nil |
| 378 | } |
| 379 | |
Heschi Kreinick | c54e9bc | 2021-11-03 17:15:17 -0400 | [diff] [blame] | 380 | // Run runs a workflow to completion or quiescence and returns its outputs. |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 381 | // listener.TaskStateChanged will be called when each task starts and |
| 382 | // finishes. It should be used only for monitoring and persistence purposes - |
| 383 | // to read task results, register Outputs. |
| 384 | func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error) { |
| 385 | if listener == nil { |
| 386 | listener = &defaultListener{} |
| 387 | } |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 388 | |
| 389 | stateChan := make(chan taskState, 2*len(w.def.tasks)) |
| 390 | for { |
| 391 | // If we have all the outputs, the workflow is done. |
| 392 | outValues := map[string]interface{}{} |
| 393 | for outName, outDef := range w.def.outputs { |
| 394 | if task := w.tasks[outDef.task]; task.finished && task.err == nil { |
| 395 | outValues[outName] = task.result |
| 396 | } |
| 397 | } |
| 398 | if len(outValues) == len(w.def.outputs) { |
| 399 | return outValues, nil |
| 400 | } |
| 401 | |
Heschi Kreinick | c54e9bc | 2021-11-03 17:15:17 -0400 | [diff] [blame] | 402 | running := 0 |
| 403 | for _, task := range w.tasks { |
| 404 | if task.started && !task.finished { |
| 405 | running++ |
| 406 | } |
| 407 | } |
Bryan C. Mills | f3c7404 | 2021-11-10 10:27:06 -0500 | [diff] [blame] | 408 | |
| 409 | if ctx.Err() == nil { |
| 410 | // Start any idle tasks whose dependencies are all done. |
| 411 | for _, task := range w.tasks { |
| 412 | if task.started { |
| 413 | continue |
| 414 | } |
| 415 | in, ready := task.args() |
| 416 | if !ready { |
| 417 | continue |
| 418 | } |
| 419 | task.started = true |
| 420 | running++ |
| 421 | listener.TaskStateChanged(w.ID, task.def.name, task.toExported()) |
| 422 | go func(task taskState) { |
| 423 | stateChan <- w.runTask(ctx, listener, task, in) |
| 424 | }(*task) |
| 425 | } |
Heschi Kreinick | c54e9bc | 2021-11-03 17:15:17 -0400 | [diff] [blame] | 426 | } |
| 427 | |
Bryan C. Mills | f3c7404 | 2021-11-10 10:27:06 -0500 | [diff] [blame] | 428 | // Exit if we've run everything we can given errors. |
| 429 | if running == 0 { |
| 430 | select { |
| 431 | case <-ctx.Done(): |
| 432 | return nil, ctx.Err() |
| 433 | default: |
| 434 | return nil, fmt.Errorf("workflow has progressed as far as it can") |
| 435 | } |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 436 | } |
Bryan C. Mills | f3c7404 | 2021-11-10 10:27:06 -0500 | [diff] [blame] | 437 | |
| 438 | state := <-stateChan |
| 439 | w.tasks[state.def] = &state |
| 440 | if state.err != nil && ctx.Err() != nil { |
| 441 | // Don't report failures that occur after cancellation has begun. |
| 442 | // They may be due to the cancellation itself. |
| 443 | continue |
| 444 | } |
| 445 | listener.TaskStateChanged(w.ID, state.def.name, state.toExported()) |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 446 | } |
| 447 | } |
| 448 | |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 449 | func (w *Workflow) runTask(ctx context.Context, listener Listener, state taskState, args []reflect.Value) taskState { |
| 450 | tctx := &TaskContext{Context: ctx, Logger: listener.Logger(w.ID, state.def.name)} |
| 451 | in := append([]reflect.Value{reflect.ValueOf(tctx)}, args...) |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 452 | out := reflect.ValueOf(state.def.f).Call(in) |
| 453 | var err error |
| 454 | if !out[1].IsNil() { |
| 455 | err = out[1].Interface().(error) |
| 456 | } |
| 457 | state.finished = true |
| 458 | state.result, state.err = out[0].Interface(), err |
Heschi Kreinick | 74917a5 | 2021-09-16 18:40:42 -0400 | [diff] [blame] | 459 | if err == nil { |
| 460 | state.serializedResult, state.err = json.Marshal(state.result) |
| 461 | } |
Heschi Kreinick | 6437f7a | 2021-08-04 14:44:29 -0400 | [diff] [blame] | 462 | return state |
| 463 | } |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 464 | |
| 465 | type defaultListener struct{} |
| 466 | |
Alexander Rakoczy | cbb5112 | 2021-09-15 20:49:39 -0400 | [diff] [blame] | 467 | func (s *defaultListener) TaskStateChanged(_ uuid.UUID, _ string, _ *TaskState) error { |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 468 | return nil |
| 469 | } |
| 470 | |
Alexander Rakoczy | cbb5112 | 2021-09-15 20:49:39 -0400 | [diff] [blame] | 471 | func (s *defaultListener) Logger(_ uuid.UUID, task string) Logger { |
Heschi Kreinick | cc073c5 | 2021-09-01 17:51:05 -0400 | [diff] [blame] | 472 | return &defaultLogger{} |
| 473 | } |
| 474 | |
| 475 | type defaultLogger struct{} |
| 476 | |
| 477 | func (l *defaultLogger) Printf(format string, v ...interface{}) {} |