internal/workflow: allow resuming of workflows

Immediately update all task states on (*Workflow).Run.

If a workflow had tasks that had never started, their state would have
never been persisted by the listener. If relui restarted before a task
had ever begun, it would be impossible to resume the workflow, as
(*Workflow).Resume validates state for all tasks is present to ensure
the WorkflowDefinition has not changed between service restarts.

Also, remove unused function.

Change-Id: I7fc50586001e319a8d3a35ae5cdcad7ea42ba55d
Reviewed-on: https://go-review.googlesource.com/c/build/+/411201
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Run-TryBot: Alex Rakoczy <alex@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/relui/db/workflows.sql.go b/internal/relui/db/workflows.sql.go
index 41e10a7..ed512da 100644
--- a/internal/relui/db/workflows.sql.go
+++ b/internal/relui/db/workflows.sql.go
@@ -281,7 +281,7 @@
 const tasks = `-- name: Tasks :many
 SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at
 FROM tasks
-ORDER BY created_at
+ORDER BY updated_at
 `
 
 func (q *Queries) Tasks(ctx context.Context) ([]Task, error) {
diff --git a/internal/relui/queries/workflows.sql b/internal/relui/queries/workflows.sql
index 755a7d9..09c8e52 100644
--- a/internal/relui/queries/workflows.sql
+++ b/internal/relui/queries/workflows.sql
@@ -37,7 +37,7 @@
 -- name: Tasks :many
 SELECT tasks.*
 FROM tasks
-ORDER BY created_at;
+ORDER BY updated_at;
 
 -- name: TasksForWorkflow :many
 SELECT tasks.*
diff --git a/internal/relui/worker_test.go b/internal/relui/worker_test.go
index 9431cca..c4b8d17 100644
--- a/internal/relui/worker_test.go
+++ b/internal/relui/worker_test.go
@@ -186,7 +186,10 @@
 			UpdatedAt:  time.Now(), // cmpopts.EquateApproxTime
 		},
 	}
-	if diff := cmp.Diff(want, tasks, cmpopts.EquateApproxTime(time.Minute)); diff != "" {
+	sort := cmpopts.SortSlices(func(x db.Task, y db.Task) bool {
+		return x.WorkflowID.String() < y.WorkflowID.String()
+	})
+	if diff := cmp.Diff(want, tasks, cmpopts.EquateApproxTime(time.Minute), sort); diff != "" {
 		t.Errorf("q.Tasks() mismatch (-want +got):\n%s", diff)
 	}
 }
diff --git a/internal/relui/workflows.go b/internal/relui/workflows.go
index 1d9527d..fdd7ccb 100644
--- a/internal/relui/workflows.go
+++ b/internal/relui/workflows.go
@@ -558,10 +558,6 @@
 	return len(p), nil
 }
 
-func combineResults(ctx *workflow.TaskContext, artifacts []artifact) ([]artifact, error) {
-	return artifacts, nil
-}
-
 func (tasks *BuildReleaseTasks) copyToStaging(ctx *workflow.TaskContext, version string, artifacts []artifact) ([]artifact, error) {
 	scratchFS, err := gcsfs.FromURL(ctx, tasks.GCSClient, tasks.ScratchURL)
 	if err != nil {
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index ee44960..ee20c40 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -529,14 +529,18 @@
 }
 
 // Run runs a workflow to completion or quiescence and returns its outputs.
-// listener.TaskStateChanged will be called when each task starts and
-// finishes. It should be used only for monitoring and persistence purposes -
-// to read task results, register Outputs.
+// listener.TaskStateChanged will be called immediately, when each task starts,
+// and when they finish. It should be used only for monitoring and persistence
+// purposes. Register Outputs to read task results.
 func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error) {
 	if listener == nil {
 		listener = &defaultListener{}
 	}
 
+	for _, task := range w.tasks {
+		listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
+	}
+
 	stateChan := make(chan taskState, 2*len(w.def.tasks))
 	for {
 		// If we have all the outputs, the workflow is done.