internal/workflow: make persistence easier
Loading arbitrarily-typed values from a database is hard -- you have to
know the type you're trying to load. The workflow host isn't in a great
position to know that. The workflow itself is. Do serialization in the
workflow.
We can require our results to be JSON-serializable, but not our errors.
Fortunately, right now we don't look at the value of errors, just their
presence, so we can afford to lose their types.
Change-Id: I7ff16ad381d0290bf9dd38aaff192c70537bb283
Reviewed-on: https://go-review.googlesource.com/c/build/+/350450
Trust: Heschi Kreinick <heschi@google.com>
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 36dbf6a..2720a37 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -29,6 +29,7 @@
import (
"context"
+ "encoding/json"
"fmt"
"reflect"
"sync"
@@ -202,10 +203,11 @@
// TaskState contains the state of a task in a running workflow. Once Finished
// is true, either Result or Error will be populated.
type TaskState struct {
- Name string
- Finished bool
- Result interface{}
- Error error
+ Name string
+ Finished bool
+ Result interface{}
+ SerializedResult []byte
+ Error string
}
// WorkflowState contains the shallow state of a running workflow.
@@ -251,12 +253,13 @@
}
type taskState struct {
- def *taskDefinition
- w *Workflow
- started bool
- finished bool
- result interface{}
- err error
+ def *taskDefinition
+ w *Workflow
+ started bool
+ finished bool
+ result interface{}
+ serializedResult []byte
+ err error
}
func (t *taskState) args() ([]reflect.Value, bool) {
@@ -273,12 +276,16 @@
}
func (t *taskState) toExported() *TaskState {
- return &TaskState{
- Name: t.def.name,
- Finished: t.finished,
- Result: t.result,
- Error: t.err,
+ state := &TaskState{
+ Name: t.def.name,
+ Finished: t.finished,
+ Result: t.result,
+ SerializedResult: append([]byte(nil), t.serializedResult...),
}
+ if t.err != nil {
+ state.Error = t.err.Error()
+ }
+ return state
}
// Start instantiates a workflow with the given parameters.
@@ -318,10 +325,12 @@
return nil
}
-// Resume restores a workflow from stored state. The WorkflowState can be
-// constructed by the host. TaskStates should be saved from Listener calls.
-// Tasks that had not finished will be restarted, but tasks that finished in
-// errors will not be retried.
+// Resume restores a workflow from stored state. Tasks that had not finished
+// will be restarted, but tasks that finished in errors will not be retried.
+//
+// The host must create the WorkflowState. TaskStates should be saved from
+// listener callbacks, but for ease of stoage, their Result field does not
+// need to be populated.
func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
w := &Workflow{
ID: state.ID,
@@ -337,16 +346,25 @@
if !ok {
return nil, fmt.Errorf("task state for %q not found", taskDef.name)
}
- w.tasks[taskDef] = &taskState{
- def: taskDef,
- w: w,
- started: tState.Finished, // Can't resume tasks, so either it's new or done.
- finished: tState.Finished,
- result: tState.Result,
- err: tState.Error,
+ state := &taskState{
+ def: taskDef,
+ w: w,
+ started: tState.Finished, // Can't resume tasks, so either it's new or done.
+ finished: tState.Finished,
+ serializedResult: tState.SerializedResult,
}
+ if state.serializedResult != nil {
+ ptr := reflect.New(reflect.ValueOf(taskDef.f).Type().Out(0))
+ if err := json.Unmarshal(tState.SerializedResult, ptr.Interface()); err != nil {
+ return nil, fmt.Errorf("failed to unmarshal result of %v: %v", taskDef.name, err)
+ }
+ state.result = ptr.Elem().Interface()
+ }
+ if tState.Error != "" {
+ state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter.
+ }
+ w.tasks[taskDef] = state
}
-
return w, nil
}
@@ -412,6 +430,9 @@
}
state.finished = true
state.result, state.err = out[0].Interface(), err
+ if err == nil {
+ state.serializedResult, state.err = json.Marshal(state.result)
+ }
return state
}
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
index a4379bc..bacf4c1 100644
--- a/internal/workflow/workflow_test.go
+++ b/internal/workflow/workflow_test.go
@@ -15,6 +15,7 @@
"time"
"github.com/google/go-cmp/cmp"
+ "github.com/google/go-cmp/cmp/cmpopts"
"golang.org/x/build/internal/workflow"
)
@@ -200,6 +201,9 @@
wfState := &workflow.WorkflowState{ID: w.ID, Params: nil}
taskStates := storage.states[w.ID]
w2, err := workflow.Resume(wd, wfState, taskStates)
+ if err != nil {
+ t.Fatal(err)
+ }
out := runWorkflow(t, w2, storage)
if got, want := out["output"], "not blocked"; got != want {
t.Errorf("output from maybeBlock was %q, wanted %q", got, want)
@@ -230,12 +234,13 @@
}
func (l *mapListener) assertState(t *testing.T, w *workflow.Workflow, want map[string]*workflow.TaskState) {
- if diff := cmp.Diff(l.states[w.ID], want); diff != "" {
+ if diff := cmp.Diff(l.states[w.ID], want, cmpopts.IgnoreFields(workflow.TaskState{}, "SerializedResult")); diff != "" {
t.Errorf("task state didn't match expections: %v", diff)
}
}
func startWorkflow(t *testing.T, wd *workflow.Definition, params map[string]string) *workflow.Workflow {
+ t.Helper()
w, err := workflow.Start(wd, params)
if err != nil {
t.Fatal(err)
@@ -244,6 +249,7 @@
}
func runWorkflow(t *testing.T, w *workflow.Workflow, listener workflow.Listener) map[string]interface{} {
+ t.Helper()
if listener == nil {
listener = &verboseListener{t}
}
@@ -260,7 +266,7 @@
switch {
case !st.Finished:
l.t.Logf("task %-10v: started", st.Name)
- case st.Error != nil:
+ case st.Error != "":
l.t.Logf("task %-10v: error: %v", st.Name, st.Error)
default:
l.t.Logf("task %-10v: done: %v", st.Name, st.Result)