cmd/relui,internal/relui: resume workflows

Resume all unfinished workflows on relui startup. Moves most workflow
running logic to a new Worker type.

This also adds a couple new columns to the workflow to record output and
whether a workflow has completed. In order to actually resume workflows,
we'll have to either add functionality to unmark them as "finished"
(restarting manually, which seems reasonable), or not marking a workflow
finished if it terminates with context.Cancelled.

For golang/go#47401

Change-Id: I3e0ed021d7a47fb125f1034df83dc3c6d95887f8
Reviewed-on: https://go-review.googlesource.com/c/build/+/353553
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Alexander Rakoczy <alex@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go
index 2a12386..a968410 100644
--- a/internal/relui/web_test.go
+++ b/internal/relui/web_test.go
@@ -25,7 +25,6 @@
 	"github.com/google/uuid"
 	"github.com/jackc/pgx/v4"
 	"github.com/jackc/pgx/v4/pgxpool"
-	"golang.org/x/build/internal"
 	"golang.org/x/build/internal/relui/db"
 )
 
@@ -117,7 +116,7 @@
 	req := httptest.NewRequest(http.MethodGet, "/", nil)
 	w := httptest.NewRecorder()
 
-	s := NewServer(p)
+	s := NewServer(p, NewWorker(p, &PGListener{p}))
 	s.homeHandler(w, req)
 	resp := w.Result()
 
@@ -174,7 +173,6 @@
 		wantCode      int
 		wantHeaders   map[string]string
 		wantWorkflows []db.Workflow
-		wantTasks     []db.Task
 	}{
 		{
 			desc:     "no params",
@@ -205,21 +203,8 @@
 				{
 					ID:        uuid.New(), // SameUUIDVariant
 					Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
-					Name:      nullString(`Echo`),
-					CreatedAt: time.Now(), // cmpopts.EquateApproxTime
-					UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
-				},
-			},
-			wantTasks: []db.Task{
-				{
-					Name:      "greeting",
-					Error:     sql.NullString{},
-					CreatedAt: time.Now(), // cmpopts.EquateApproxTime
-					UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
-				},
-				{
-					Name:      "farewell",
-					Error:     sql.NullString{},
+					Name:      nullString(`echo`),
+					Output:    "{}",
 					CreatedAt: time.Now(), // cmpopts.EquateApproxTime
 					UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
 				},
@@ -231,12 +216,12 @@
 			p := testDB(ctx, t)
 			req := httptest.NewRequest(http.MethodPost, "/workflows/create", strings.NewReader(c.params.Encode()))
 			req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
-			w := httptest.NewRecorder()
+			rec := httptest.NewRecorder()
 			q := db.New(p)
 
-			s := NewServer(p)
-			s.createWorkflowHandler(w, req)
-			resp := w.Result()
+			s := NewServer(p, NewWorker(p, &PGListener{p}))
+			s.createWorkflowHandler(rec, req)
+			resp := rec.Result()
 
 			if resp.StatusCode != c.wantCode {
 				t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
@@ -256,24 +241,6 @@
 			if diff := cmp.Diff(c.wantWorkflows, wfs, SameUUIDVariant(), cmpopts.EquateApproxTime(time.Minute)); diff != "" {
 				t.Fatalf("q.Workflows() mismatch (-want +got):\n%s", diff)
 			}
-			var tasks []db.Task
-			ctx, cancel := context.WithTimeout(ctx, time.Minute)
-			defer cancel()
-			internal.PeriodicallyDo(ctx, 10*time.Millisecond, func(ctx context.Context, _ time.Time) {
-				tasks, err = q.TasksForWorkflow(ctx, wfs[0].ID)
-				if err != nil {
-					t.Fatalf("q.TasksForWorkflow(_, %q) = %v, %v, wanted no error", wfs[0].ID, tasks, err)
-				}
-				if len(tasks) > 0 {
-					cancel()
-				}
-			})
-			for i := range c.wantTasks {
-				c.wantTasks[i].WorkflowID = wfs[0].ID
-			}
-			if diff := cmp.Diff(c.wantTasks, tasks, cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.Task{}, "Finished", "Result")); diff != "" {
-				t.Errorf("q.TasksForWorkflow(_, %q) mismatch (-want +got):\n%s", wfs[0].ID, diff)
-			}
 		})
 	}
 }
@@ -282,7 +249,7 @@
 // connection.
 //
 // All tables in the public schema of the connected database will be
-// truncated, with the exception of the migrations table.
+// truncated except for the migrations table.
 func resetDB(ctx context.Context, t *testing.T, p *pgxpool.Pool) {
 	t.Helper()
 	tableQuery := `SELECT table_name FROM information_schema.tables WHERE table_schema='public'`