internal/workflow: allow resuming of workflows
Immediately update all task states on (*Workflow).Run.
If a workflow had tasks that had never started, their state would have
never been persisted by the listener. If relui restarted before a task
had ever begun, it would be impossible to resume the workflow, as
(*Workflow).Resume validates state for all tasks is present to ensure
the WorkflowDefinition has not changed between service restarts.
Also, remove unused function.
Change-Id: I7fc50586001e319a8d3a35ae5cdcad7ea42ba55d
Reviewed-on: https://go-review.googlesource.com/c/build/+/411201
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Run-TryBot: Alex Rakoczy <alex@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/relui/db/workflows.sql.go b/internal/relui/db/workflows.sql.go
index 41e10a7..ed512da 100644
--- a/internal/relui/db/workflows.sql.go
+++ b/internal/relui/db/workflows.sql.go
@@ -281,7 +281,7 @@
const tasks = `-- name: Tasks :many
SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at
FROM tasks
-ORDER BY created_at
+ORDER BY updated_at
`
func (q *Queries) Tasks(ctx context.Context) ([]Task, error) {
diff --git a/internal/relui/queries/workflows.sql b/internal/relui/queries/workflows.sql
index 755a7d9..09c8e52 100644
--- a/internal/relui/queries/workflows.sql
+++ b/internal/relui/queries/workflows.sql
@@ -37,7 +37,7 @@
-- name: Tasks :many
SELECT tasks.*
FROM tasks
-ORDER BY created_at;
+ORDER BY updated_at;
-- name: TasksForWorkflow :many
SELECT tasks.*
diff --git a/internal/relui/worker_test.go b/internal/relui/worker_test.go
index 9431cca..c4b8d17 100644
--- a/internal/relui/worker_test.go
+++ b/internal/relui/worker_test.go
@@ -186,7 +186,10 @@
UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
},
}
- if diff := cmp.Diff(want, tasks, cmpopts.EquateApproxTime(time.Minute)); diff != "" {
+ sort := cmpopts.SortSlices(func(x db.Task, y db.Task) bool {
+ return x.WorkflowID.String() < y.WorkflowID.String()
+ })
+ if diff := cmp.Diff(want, tasks, cmpopts.EquateApproxTime(time.Minute), sort); diff != "" {
t.Errorf("q.Tasks() mismatch (-want +got):\n%s", diff)
}
}
diff --git a/internal/relui/workflows.go b/internal/relui/workflows.go
index 1d9527d..fdd7ccb 100644
--- a/internal/relui/workflows.go
+++ b/internal/relui/workflows.go
@@ -558,10 +558,6 @@
return len(p), nil
}
-func combineResults(ctx *workflow.TaskContext, artifacts []artifact) ([]artifact, error) {
- return artifacts, nil
-}
-
func (tasks *BuildReleaseTasks) copyToStaging(ctx *workflow.TaskContext, version string, artifacts []artifact) ([]artifact, error) {
scratchFS, err := gcsfs.FromURL(ctx, tasks.GCSClient, tasks.ScratchURL)
if err != nil {
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go
index ee44960..ee20c40 100644
--- a/internal/workflow/workflow.go
+++ b/internal/workflow/workflow.go
@@ -529,14 +529,18 @@
}
// Run runs a workflow to completion or quiescence and returns its outputs.
-// listener.TaskStateChanged will be called when each task starts and
-// finishes. It should be used only for monitoring and persistence purposes -
-// to read task results, register Outputs.
+// listener.TaskStateChanged will be called immediately, when each task starts,
+// and when they finish. It should be used only for monitoring and persistence
+// purposes. Register Outputs to read task results.
func (w *Workflow) Run(ctx context.Context, listener Listener) (map[string]interface{}, error) {
if listener == nil {
listener = &defaultListener{}
}
+ for _, task := range w.tasks {
+ listener.TaskStateChanged(w.ID, task.def.name, task.toExported())
+ }
+
stateChan := make(chan taskState, 2*len(w.def.tasks))
for {
// If we have all the outputs, the workflow is done.