internal/workflow: make persistence easier

Loading arbitrarily-typed values from a database is hard -- you have to
know the type you're trying to load. The workflow host isn't in a great
position to know that. The workflow itself is. Do serialization in the
workflow.

We can require our results to be JSON-serializable, but not our errors.
Fortunately, right now we don't look at the value of errors, just their
presence, so we can afford to lose their types.

Change-Id: I7ff16ad381d0290bf9dd38aaff192c70537bb283
Reviewed-on: https://go-review.googlesource.com/c/build/+/350450
Trust: Heschi Kreinick <heschi@google.com>
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 36dbf6a..2720a37 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -29,6 +29,7 @@
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"reflect"
 	"sync"
@@ -202,10 +203,11 @@
 // TaskState contains the state of a task in a running workflow. Once Finished
 // is true, either Result or Error will be populated.
 type TaskState struct {
-	Name     string
-	Finished bool
-	Result   interface{}
-	Error    error
+	Name             string
+	Finished         bool
+	Result           interface{}
+	SerializedResult []byte
+	Error            string
 }
 
 // WorkflowState contains the shallow state of a running workflow.
@@ -251,12 +253,13 @@
 }
 
 type taskState struct {
-	def      *taskDefinition
-	w        *Workflow
-	started  bool
-	finished bool
-	result   interface{}
-	err      error
+	def              *taskDefinition
+	w                *Workflow
+	started          bool
+	finished         bool
+	result           interface{}
+	serializedResult []byte
+	err              error
 }
 
 func (t *taskState) args() ([]reflect.Value, bool) {
@@ -273,12 +276,16 @@
 }
 
 func (t *taskState) toExported() *TaskState {
-	return &TaskState{
-		Name:     t.def.name,
-		Finished: t.finished,
-		Result:   t.result,
-		Error:    t.err,
+	state := &TaskState{
+		Name:             t.def.name,
+		Finished:         t.finished,
+		Result:           t.result,
+		SerializedResult: append([]byte(nil), t.serializedResult...),
 	}
+	if t.err != nil {
+		state.Error = t.err.Error()
+	}
+	return state
 }
 
 // Start instantiates a workflow with the given parameters.
@@ -318,10 +325,12 @@
 	return nil
 }
 
-// Resume restores a workflow from stored state. The WorkflowState can be
-// constructed by the host. TaskStates should be saved from Listener calls.
-// Tasks that had not finished will be restarted, but tasks that finished in
-// errors will not be retried.
+// Resume restores a workflow from stored state. Tasks that had not finished
+// will be restarted, but tasks that finished in errors will not be retried.
+//
+// The host must create the WorkflowState. TaskStates should be saved from
+// listener callbacks, but for ease of stoage, their Result field does not
+// need to be populated.
 func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
 	w := &Workflow{
 		ID:     state.ID,
@@ -337,16 +346,25 @@
 		if !ok {
 			return nil, fmt.Errorf("task state for %q not found", taskDef.name)
 		}
-		w.tasks[taskDef] = &taskState{
-			def:      taskDef,
-			w:        w,
-			started:  tState.Finished, // Can't resume tasks, so either it's new or done.
-			finished: tState.Finished,
-			result:   tState.Result,
-			err:      tState.Error,
+		state := &taskState{
+			def:              taskDef,
+			w:                w,
+			started:          tState.Finished, // Can't resume tasks, so either it's new or done.
+			finished:         tState.Finished,
+			serializedResult: tState.SerializedResult,
 		}
+		if state.serializedResult != nil {
+			ptr := reflect.New(reflect.ValueOf(taskDef.f).Type().Out(0))
+			if err := json.Unmarshal(tState.SerializedResult, ptr.Interface()); err != nil {
+				return nil, fmt.Errorf("failed to unmarshal result of %v: %v", taskDef.name, err)
+			}
+			state.result = ptr.Elem().Interface()
+		}
+		if tState.Error != "" {
+			state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter.
+		}
+		w.tasks[taskDef] = state
 	}
-
 	return w, nil
 }
 
@@ -412,6 +430,9 @@
 	}
 	state.finished = true
 	state.result, state.err = out[0].Interface(), err
+	if err == nil {
+		state.serializedResult, state.err = json.Marshal(state.result)
+	}
 	return state
 }
 
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go
index a4379bc..bacf4c1 100644
--- a/internal/workflow/workflow_test.go
+++ b/internal/workflow/workflow_test.go
@@ -15,6 +15,7 @@
 	"time"
 
 	"github.com/google/go-cmp/cmp"
+	"github.com/google/go-cmp/cmp/cmpopts"
 	"golang.org/x/build/internal/workflow"
 )
 
@@ -200,6 +201,9 @@
 	wfState := &workflow.WorkflowState{ID: w.ID, Params: nil}
 	taskStates := storage.states[w.ID]
 	w2, err := workflow.Resume(wd, wfState, taskStates)
+	if err != nil {
+		t.Fatal(err)
+	}
 	out := runWorkflow(t, w2, storage)
 	if got, want := out["output"], "not blocked"; got != want {
 		t.Errorf("output from maybeBlock was %q, wanted %q", got, want)
@@ -230,12 +234,13 @@
 }
 
 func (l *mapListener) assertState(t *testing.T, w *workflow.Workflow, want map[string]*workflow.TaskState) {
-	if diff := cmp.Diff(l.states[w.ID], want); diff != "" {
+	if diff := cmp.Diff(l.states[w.ID], want, cmpopts.IgnoreFields(workflow.TaskState{}, "SerializedResult")); diff != "" {
 		t.Errorf("task state didn't match expections: %v", diff)
 	}
 }
 
 func startWorkflow(t *testing.T, wd *workflow.Definition, params map[string]string) *workflow.Workflow {
+	t.Helper()
 	w, err := workflow.Start(wd, params)
 	if err != nil {
 		t.Fatal(err)
@@ -244,6 +249,7 @@
 }
 
 func runWorkflow(t *testing.T, w *workflow.Workflow, listener workflow.Listener) map[string]interface{} {
+	t.Helper()
 	if listener == nil {
 		listener = &verboseListener{t}
 	}
@@ -260,7 +266,7 @@
 	switch {
 	case !st.Finished:
 		l.t.Logf("task %-10v: started", st.Name)
-	case st.Error != nil:
+	case st.Error != "":
 		l.t.Logf("task %-10v: error: %v", st.Name, st.Error)
 	default:
 		l.t.Logf("task %-10v: done: %v", st.Name, st.Result)