internal/workflow: make persistence easier

Loading arbitrarily-typed values from a database is hard -- you have to
know the type you're trying to load. The workflow host isn't in a great
position to know that. The workflow itself is. Do serialization in the
workflow.

We can require our results to be JSON-serializable, but not our errors.
Fortunately, right now we don't look at the value of errors, just their
presence, so we can afford to lose their types.

Change-Id: I7ff16ad381d0290bf9dd38aaff192c70537bb283
Reviewed-on: https://go-review.googlesource.com/c/build/+/350450
Trust: Heschi Kreinick <heschi@google.com>
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Heschi Kreinick <heschi@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index 36dbf6a..2720a37 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -29,6 +29,7 @@
 
 import (
 	"context"
+	"encoding/json"
 	"fmt"
 	"reflect"
 	"sync"
@@ -202,10 +203,11 @@
 // TaskState contains the state of a task in a running workflow. Once Finished
 // is true, either Result or Error will be populated.
 type TaskState struct {
-	Name     string
-	Finished bool
-	Result   interface{}
-	Error    error
+	Name             string
+	Finished         bool
+	Result           interface{}
+	SerializedResult []byte
+	Error            string
 }
 
 // WorkflowState contains the shallow state of a running workflow.
@@ -251,12 +253,13 @@
 }
 
 type taskState struct {
-	def      *taskDefinition
-	w        *Workflow
-	started  bool
-	finished bool
-	result   interface{}
-	err      error
+	def              *taskDefinition
+	w                *Workflow
+	started          bool
+	finished         bool
+	result           interface{}
+	serializedResult []byte
+	err              error
 }
 
 func (t *taskState) args() ([]reflect.Value, bool) {
@@ -273,12 +276,16 @@
 }
 
 func (t *taskState) toExported() *TaskState {
-	return &TaskState{
-		Name:     t.def.name,
-		Finished: t.finished,
-		Result:   t.result,
-		Error:    t.err,
+	state := &TaskState{
+		Name:             t.def.name,
+		Finished:         t.finished,
+		Result:           t.result,
+		SerializedResult: append([]byte(nil), t.serializedResult...),
 	}
+	if t.err != nil {
+		state.Error = t.err.Error()
+	}
+	return state
 }
 
 // Start instantiates a workflow with the given parameters.
@@ -318,10 +325,12 @@
 	return nil
 }
 
-// Resume restores a workflow from stored state. The WorkflowState can be
-// constructed by the host. TaskStates should be saved from Listener calls.
-// Tasks that had not finished will be restarted, but tasks that finished in
-// errors will not be retried.
+// Resume restores a workflow from stored state. Tasks that had not finished
+// will be restarted, but tasks that finished in errors will not be retried.
+//
+// The host must create the WorkflowState. TaskStates should be saved from
+// listener callbacks, but for ease of stoage, their Result field does not
+// need to be populated.
 func Resume(def *Definition, state *WorkflowState, taskStates map[string]*TaskState) (*Workflow, error) {
 	w := &Workflow{
 		ID:     state.ID,
@@ -337,16 +346,25 @@
 		if !ok {
 			return nil, fmt.Errorf("task state for %q not found", taskDef.name)
 		}
-		w.tasks[taskDef] = &taskState{
-			def:      taskDef,
-			w:        w,
-			started:  tState.Finished, // Can't resume tasks, so either it's new or done.
-			finished: tState.Finished,
-			result:   tState.Result,
-			err:      tState.Error,
+		state := &taskState{
+			def:              taskDef,
+			w:                w,
+			started:          tState.Finished, // Can't resume tasks, so either it's new or done.
+			finished:         tState.Finished,
+			serializedResult: tState.SerializedResult,
 		}
+		if state.serializedResult != nil {
+			ptr := reflect.New(reflect.ValueOf(taskDef.f).Type().Out(0))
+			if err := json.Unmarshal(tState.SerializedResult, ptr.Interface()); err != nil {
+				return nil, fmt.Errorf("failed to unmarshal result of %v: %v", taskDef.name, err)
+			}
+			state.result = ptr.Elem().Interface()
+		}
+		if tState.Error != "" {
+			state.err = fmt.Errorf("serialized error: %v", tState.Error) // untyped, but hopefully that doesn't matter.
+		}
+		w.tasks[taskDef] = state
 	}
-
 	return w, nil
 }
 
@@ -412,6 +430,9 @@
 	}
 	state.finished = true
 	state.result, state.err = out[0].Interface(), err
+	if err == nil {
+		state.serializedResult, state.err = json.Marshal(state.result)
+	}
 	return state
 }