cmd/relui,internal/relui: resume workflows
Resume all unfinished workflows on relui startup. Moves most workflow
running logic to a new Worker type.
This also adds a couple new columns to the workflow to record output and
whether a workflow has completed. In order to actually resume workflows,
we'll have to either add functionality to unmark them as "finished"
(restarting manually, which seems reasonable), or not marking a workflow
finished if it terminates with context.Cancelled.
For golang/go#47401
Change-Id: I3e0ed021d7a47fb125f1034df83dc3c6d95887f8
Reviewed-on: https://go-review.googlesource.com/c/build/+/353553
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Alexander Rakoczy <alex@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go
index 2a12386..a968410 100644
--- a/internal/relui/web_test.go
+++ b/internal/relui/web_test.go
@@ -25,7 +25,6 @@
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
- "golang.org/x/build/internal"
"golang.org/x/build/internal/relui/db"
)
@@ -117,7 +116,7 @@
req := httptest.NewRequest(http.MethodGet, "/", nil)
w := httptest.NewRecorder()
- s := NewServer(p)
+ s := NewServer(p, NewWorker(p, &PGListener{p}))
s.homeHandler(w, req)
resp := w.Result()
@@ -174,7 +173,6 @@
wantCode int
wantHeaders map[string]string
wantWorkflows []db.Workflow
- wantTasks []db.Task
}{
{
desc: "no params",
@@ -205,21 +203,8 @@
{
ID: uuid.New(), // SameUUIDVariant
Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
- Name: nullString(`Echo`),
- CreatedAt: time.Now(), // cmpopts.EquateApproxTime
- UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
- },
- },
- wantTasks: []db.Task{
- {
- Name: "greeting",
- Error: sql.NullString{},
- CreatedAt: time.Now(), // cmpopts.EquateApproxTime
- UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
- },
- {
- Name: "farewell",
- Error: sql.NullString{},
+ Name: nullString(`echo`),
+ Output: "{}",
CreatedAt: time.Now(), // cmpopts.EquateApproxTime
UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
},
@@ -231,12 +216,12 @@
p := testDB(ctx, t)
req := httptest.NewRequest(http.MethodPost, "/workflows/create", strings.NewReader(c.params.Encode()))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
- w := httptest.NewRecorder()
+ rec := httptest.NewRecorder()
q := db.New(p)
- s := NewServer(p)
- s.createWorkflowHandler(w, req)
- resp := w.Result()
+ s := NewServer(p, NewWorker(p, &PGListener{p}))
+ s.createWorkflowHandler(rec, req)
+ resp := rec.Result()
if resp.StatusCode != c.wantCode {
t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
@@ -256,24 +241,6 @@
if diff := cmp.Diff(c.wantWorkflows, wfs, SameUUIDVariant(), cmpopts.EquateApproxTime(time.Minute)); diff != "" {
t.Fatalf("q.Workflows() mismatch (-want +got):\n%s", diff)
}
- var tasks []db.Task
- ctx, cancel := context.WithTimeout(ctx, time.Minute)
- defer cancel()
- internal.PeriodicallyDo(ctx, 10*time.Millisecond, func(ctx context.Context, _ time.Time) {
- tasks, err = q.TasksForWorkflow(ctx, wfs[0].ID)
- if err != nil {
- t.Fatalf("q.TasksForWorkflow(_, %q) = %v, %v, wanted no error", wfs[0].ID, tasks, err)
- }
- if len(tasks) > 0 {
- cancel()
- }
- })
- for i := range c.wantTasks {
- c.wantTasks[i].WorkflowID = wfs[0].ID
- }
- if diff := cmp.Diff(c.wantTasks, tasks, cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.Task{}, "Finished", "Result")); diff != "" {
- t.Errorf("q.TasksForWorkflow(_, %q) mismatch (-want +got):\n%s", wfs[0].ID, diff)
- }
})
}
}
@@ -282,7 +249,7 @@
// connection.
//
// All tables in the public schema of the connected database will be
-// truncated, with the exception of the migrations table.
+// truncated except for the migrations table.
func resetDB(ctx context.Context, t *testing.T, p *pgxpool.Pool) {
t.Helper()
tableQuery := `SELECT table_name FROM information_schema.tables WHERE table_schema='public'`