internal/relui: add manual retry button to failed tasks

Sometimes, tasks fail. This change adds an ability for the user to retry
specific tasks in a workflow. Retrying a task will reset the state of
the task, clear the state of the workflow, and restart the workflow to
retry the task.

This also fixes an issue resuming pending tasks that have no output to
unmarshal.

Adds httprouter for routing nested parameter routes.

Updates golang/go#53165

Change-Id: I9a899d1e6e0d5d9511648b478148a665459aae72
Reviewed-on: https://go-review.googlesource.com/c/build/+/411074
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Run-TryBot: Alex Rakoczy <alex@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
diff --git a/go.mod b/go.mod
index 476d919..f378f9e 100644
--- a/go.mod
+++ b/go.mod
@@ -36,6 +36,7 @@
 	github.com/jackc/pgconn v1.11.0
 	github.com/jackc/pgx/v4 v4.13.0
 	github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1
+	github.com/julienschmidt/httprouter v1.3.0
 	github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51
 	github.com/mattn/go-sqlite3 v1.14.6
 	github.com/shurcooL/githubv4 v0.0.0-20220520033151-0b4e3294ff00
diff --git a/go.sum b/go.sum
index 6393236..b58de2c 100644
--- a/go.sum
+++ b/go.sum
@@ -592,6 +592,7 @@
 github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
 github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/julienschmidt/httprouter v1.3.0 h1:U0609e9tgbseu3rBINet9P48AI/D3oJs4dN7jwJOQ1U=
 github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM=
 github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
 github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
diff --git a/internal/relui/db/workflows.sql.go b/internal/relui/db/workflows.sql.go
index 19e9a8f..11a780d 100644
--- a/internal/relui/db/workflows.sql.go
+++ b/internal/relui/db/workflows.sql.go
@@ -114,6 +114,68 @@
 	return i, err
 }
 
+const resetTask = `-- name: ResetTask :one
+UPDATE tasks
+SET finished   = false,
+    result     = DEFAULT,
+    error      = DEFAULT,
+    updated_at = $3
+WHERE workflow_id = $1 AND name = $2
+RETURNING workflow_id, name, finished, result, error, created_at, updated_at
+`
+
+type ResetTaskParams struct {
+	WorkflowID uuid.UUID
+	Name       string
+	UpdatedAt  time.Time
+}
+
+func (q *Queries) ResetTask(ctx context.Context, arg ResetTaskParams) (Task, error) {
+	row := q.db.QueryRow(ctx, resetTask, arg.WorkflowID, arg.Name, arg.UpdatedAt)
+	var i Task
+	err := row.Scan(
+		&i.WorkflowID,
+		&i.Name,
+		&i.Finished,
+		&i.Result,
+		&i.Error,
+		&i.CreatedAt,
+		&i.UpdatedAt,
+	)
+	return i, err
+}
+
+const resetWorkflow = `-- name: ResetWorkflow :one
+UPDATE workflows
+SET finished = false,
+    output = DEFAULT,
+    error = DEFAULT,
+    updated_at = $2
+WHERE id = $1
+RETURNING id, params, name, created_at, updated_at, finished, output, error
+`
+
+type ResetWorkflowParams struct {
+	ID        uuid.UUID
+	UpdatedAt time.Time
+}
+
+func (q *Queries) ResetWorkflow(ctx context.Context, arg ResetWorkflowParams) (Workflow, error) {
+	row := q.db.QueryRow(ctx, resetWorkflow, arg.ID, arg.UpdatedAt)
+	var i Workflow
+	err := row.Scan(
+		&i.ID,
+		&i.Params,
+		&i.Name,
+		&i.CreatedAt,
+		&i.UpdatedAt,
+		&i.Finished,
+		&i.Output,
+		&i.Error,
+	)
+	return i, err
+}
+
 const taskLogs = `-- name: TaskLogs :many
 SELECT task_logs.id, task_logs.workflow_id, task_logs.task_name, task_logs.body, task_logs.created_at, task_logs.updated_at
 FROM task_logs
diff --git a/internal/relui/queries/workflows.sql b/internal/relui/queries/workflows.sql
index de78bea..4a21cac 100644
--- a/internal/relui/queries/workflows.sql
+++ b/internal/relui/queries/workflows.sql
@@ -76,3 +76,20 @@
 WHERE workflows.id = $1
 RETURNING *;
 
+-- name: ResetTask :one
+UPDATE tasks
+SET finished   = false,
+    result     = DEFAULT,
+    error      = DEFAULT,
+    updated_at = $3
+WHERE workflow_id = $1 AND name = $2
+RETURNING *;
+
+-- name: ResetWorkflow :one
+UPDATE workflows
+SET finished = false,
+    output = DEFAULT,
+    error = DEFAULT,
+    updated_at = $2
+WHERE id = $1
+RETURNING *;
diff --git a/internal/relui/static/styles.css b/internal/relui/static/styles.css
index 50aca0e..051f9e5 100644
--- a/internal/relui/static/styles.css
+++ b/internal/relui/static/styles.css
@@ -218,10 +218,10 @@
   padding: 0 1rem;
   white-space: pre-wrap;
   tab-size: 4;
+  font-family: monospace;
 }
 .TaskList-itemLogLine:nth-child(even) {
   background-color: #fafafa;
-  font-family: monospace;
 }
 .TaskList-itemLogLineError {
   background-color: #c9483c;
diff --git a/internal/relui/templates/home.html b/internal/relui/templates/home.html
index 3ba985b..41f0bdf 100644
--- a/internal/relui/templates/home.html
+++ b/internal/relui/templates/home.html
@@ -61,6 +61,7 @@
                 <th class="TaskList-itemHeaderCol TaskList-itemStarted">Started</th>
                 <th class="TaskList-itemHeaderCol TaskList-itemUpdated">Updated</th>
                 <th class="TaskList-itemHeaderCol TaskList-itemResult">Result</th>
+                <th class="TaskList-itemHeaderCol TaskList-itemActions">Actions</th>
               </tr>
             </thead>
             {{$tasks := index $.WorkflowTasks $workflow.ID}}
@@ -96,9 +97,19 @@
                   <td class="TaskList-itemCol TaskList-itemResult">
                     {{$task.Result}}
                   </td>
+                  <td class="TaskList-itemCol TaskList-itemAction">
+                    {{if $task.Error.Valid}}
+                    <div class="TaskList-retryTask">
+                      <form action="{{baseLink (printf "/workflows/%s/tasks/%s/retry" $workflow.ID $task.Name) }}" method="post">
+                        <input type="hidden" id="workflow.id" name="workflow.id" value="{{$workflow.ID}}" />
+                        <input name="task.reset" type="submit" value="Retry" onclick="return this.form.reportValidity() && confirm('This will retry the task and clear workflow errors.\n\nReady to proceed?')" />
+                      </form>
+                    </div>
+                    {{end}}
+                  </td>
                 </tr>
                 <tr class="TaskList-itemLogsRow">
-                  <td class="TaskList-itemLogs" colspan="6">
+                  <td class="TaskList-itemLogs" colspan="7">
                     {{if $task.Error.Valid}}
                       <div class="TaskList-itemLogLine TaskList-itemLogLineError">
                         {{- $task.Error.Value -}}
diff --git a/internal/relui/templates/new_workflow.html b/internal/relui/templates/new_workflow.html
index c9ba871..4810a8f 100644
--- a/internal/relui/templates/new_workflow.html
+++ b/internal/relui/templates/new_workflow.html
@@ -21,7 +21,7 @@
       </noscript>
     </form>
     {{if .Selected}}
-      <form action="{{baseLink "/workflows/create"}}" method="post">
+      <form action="{{baseLink "/workflows"}}" method="post">
         <input type="hidden" id="workflow.name" name="workflow.name" value="{{$.Name}}" />
         {{range $_, $p := .Selected.Parameters}}
           {{if eq $p.Type.String "string"}}
diff --git a/internal/relui/web.go b/internal/relui/web.go
index eaa0271..807ee0d 100644
--- a/internal/relui/web.go
+++ b/internal/relui/web.go
@@ -7,7 +7,9 @@
 import (
 	"bytes"
 	"context"
+	"database/sql"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"html/template"
 	"io"
@@ -18,24 +20,22 @@
 	"net/url"
 	"path"
 	"strings"
+	"time"
 
 	"github.com/google/uuid"
+	"github.com/jackc/pgx/v4"
 	"github.com/jackc/pgx/v4/pgxpool"
+	"github.com/julienschmidt/httprouter"
 	"golang.org/x/build/internal/relui/db"
 	"golang.org/x/build/internal/workflow"
 )
 
-// fileServerHandler returns a http.Handler rooted at root. It will
-// call the next handler provided for requests to "/".
+// fileServerHandler returns a http.Handler for serving static assets.
 //
 // The returned handler sets the appropriate Content-Type and
 // Cache-Control headers for the returned file.
-func fileServerHandler(fs fs.FS, next http.Handler) http.Handler {
+func fileServerHandler(fs fs.FS) http.Handler {
 	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		if r.URL.Path == "/" {
-			next.ServeHTTP(w, r)
-			return
-		}
 		w.Header().Set("Content-Type", mime.TypeByExtension(path.Ext(r.URL.Path)))
 		w.Header().Set("Cache-Control", "no-cache, private, max-age=0")
 		s := http.FileServer(http.FS(fs))
@@ -52,7 +52,7 @@
 // Server implements the http handlers for relui.
 type Server struct {
 	db      *pgxpool.Pool
-	m       *http.ServeMux
+	m       *httprouter.Router
 	w       *Worker
 	baseURL *url.URL // nil means "/".
 	header  SiteHeader
@@ -70,7 +70,7 @@
 func NewServer(p *pgxpool.Pool, w *Worker, baseURL *url.URL, header SiteHeader) *Server {
 	s := &Server{
 		db:      p,
-		m:       new(http.ServeMux),
+		m:       httprouter.New(),
 		w:       w,
 		baseURL: baseURL,
 		header:  header,
@@ -81,9 +81,11 @@
 	layout := template.Must(template.New("layout.html").Funcs(helpers).ParseFS(templates, "templates/layout.html"))
 	s.homeTmpl = template.Must(template.Must(layout.Clone()).Funcs(helpers).ParseFS(templates, "templates/home.html"))
 	s.newWorkflowTmpl = template.Must(template.Must(layout.Clone()).Funcs(helpers).ParseFS(templates, "templates/new_workflow.html"))
-	s.m.Handle("/workflows/create", http.HandlerFunc(s.createWorkflowHandler))
-	s.m.Handle("/workflows/new", http.HandlerFunc(s.newWorkflowHandler))
-	s.m.Handle("/", fileServerHandler(static, http.HandlerFunc(s.homeHandler)))
+	s.m.POST("/workflows/:id/tasks/:name/retry", s.retryTaskHandler)
+	s.m.Handler(http.MethodGet, "/workflows/new", http.HandlerFunc(s.newWorkflowHandler))
+	s.m.Handler(http.MethodPost, "/workflows", http.HandlerFunc(s.createWorkflowHandler))
+	s.m.Handler(http.MethodGet, "/static/*path", fileServerHandler(static))
+	s.m.Handler(http.MethodGet, "/", http.HandlerFunc(s.homeHandler))
 	if baseURL != nil && baseURL.Path != "/" && baseURL.Path != "" {
 		nosuffix := strings.TrimSuffix(baseURL.Path, "/")
 		s.bm = new(http.ServeMux)
@@ -249,3 +251,49 @@
 	}
 	http.Redirect(w, r, s.BaseLink("/"), http.StatusSeeOther)
 }
+
+func (s *Server) retryTaskHandler(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
+	id, err := uuid.Parse(params.ByName("id"))
+	if err != nil {
+		log.Printf("retryTaskHandler(_, _, %v) uuid.Parse(%v): %v", params, params.ByName("id"), err)
+		http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
+		return
+	}
+	if err := s.retryTask(r.Context(), id, params.ByName("name")); err != nil {
+		log.Printf("s.retryTask(_, %q, %q): %v", id, params.ByName("id"), err)
+		if errors.Is(err, sql.ErrNoRows) || errors.Is(err, pgx.ErrNoRows) {
+			http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
+			return
+		}
+		http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
+		return
+	}
+	if err := s.w.Resume(r.Context(), id); err != nil {
+		log.Printf("s.w.Resume(_, %q): %v", id, err)
+		http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
+		return
+	}
+	http.Redirect(w, r, s.BaseLink("/"), http.StatusSeeOther)
+}
+
+func (s *Server) retryTask(ctx context.Context, id uuid.UUID, name string) error {
+	tx, err := s.db.Begin(ctx)
+	if err != nil {
+		return fmt.Errorf("tx.Begin(): %w", err)
+	}
+	q := db.New(s.db)
+	q = q.WithTx(tx)
+	if _, err := q.ResetTask(ctx, db.ResetTaskParams{WorkflowID: id, Name: name, UpdatedAt: time.Now()}); err != nil {
+		tx.Rollback(ctx)
+		return fmt.Errorf("q.ResetTask: %w", err)
+	}
+	if _, err := q.ResetWorkflow(ctx, db.ResetWorkflowParams{ID: id, UpdatedAt: time.Now()}); err != nil {
+		tx.Rollback(ctx)
+		return fmt.Errorf("q.ResetWorkflow: %w", err)
+	}
+	if err := tx.Commit(ctx); err != nil {
+		tx.Rollback(ctx)
+		return fmt.Errorf("tx.Commit: %w", err)
+	}
+	return nil
+}
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go
index 0698158..40db6b3 100644
--- a/internal/relui/web_test.go
+++ b/internal/relui/web_test.go
@@ -15,6 +15,7 @@
 	"net/http/httptest"
 	"net/url"
 	"os"
+	"path"
 	"strings"
 	"sync"
 	"testing"
@@ -34,9 +35,7 @@
 var testStatic embed.FS
 
 func TestFileServerHandler(t *testing.T) {
-	h := fileServerHandler(testStatic, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
-		w.Write([]byte("Home"))
-	}))
+	h := fileServerHandler(testStatic)
 
 	cases := []struct {
 		desc        string
@@ -46,12 +45,6 @@
 		wantHeaders map[string]string
 	}{
 		{
-			desc:     "fallback to next handler",
-			path:     "/",
-			wantCode: http.StatusOK,
-			wantBody: "Home",
-		},
-		{
 			desc:     "sets headers and returns file",
 			path:     "/testing/test.css",
 			wantCode: http.StatusOK,
@@ -423,3 +416,181 @@
 		})
 	}
 }
+
+func TestServerRetryTaskHandler(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	hourAgo := time.Now().Add(-1 * time.Hour)
+	wfID := uuid.New()
+
+	cases := []struct {
+		desc          string
+		params        map[string]string
+		wantCode      int
+		wantHeaders   map[string]string
+		wantWorkflows []db.Workflow
+	}{
+		{
+			desc:     "no params",
+			wantCode: http.StatusNotFound,
+			wantWorkflows: []db.Workflow{
+				{
+					ID:        wfID,
+					Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
+					Name:      nullString(`echo`),
+					Finished:  true,
+					Output:    `{"some": "thing"}`,
+					Error:     "internal explosion",
+					CreatedAt: hourAgo, // cmpopts.EquateApproxTime
+					UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
+				},
+			},
+		},
+		{
+			desc:     "invalid workflow id",
+			params:   map[string]string{"id": "invalid", "name": "greeting"},
+			wantCode: http.StatusNotFound,
+			wantWorkflows: []db.Workflow{
+				{
+					ID:        wfID,
+					Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
+					Name:      nullString(`echo`),
+					Finished:  true,
+					Output:    `{"some": "thing"}`,
+					Error:     "internal explosion",
+					CreatedAt: hourAgo, // cmpopts.EquateApproxTime
+					UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
+				},
+			},
+		},
+		{
+			desc:     "wrong workflow id",
+			params:   map[string]string{"id": uuid.New().String(), "name": "greeting"},
+			wantCode: http.StatusNotFound,
+			wantWorkflows: []db.Workflow{
+				{
+					ID:        wfID,
+					Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
+					Name:      nullString(`echo`),
+					Finished:  true,
+					Output:    `{"some": "thing"}`,
+					Error:     "internal explosion",
+					CreatedAt: hourAgo, // cmpopts.EquateApproxTime
+					UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
+				},
+			},
+		},
+		{
+			desc:     "invalid task name",
+			params:   map[string]string{"id": wfID.String(), "name": "invalid"},
+			wantCode: http.StatusNotFound,
+			wantWorkflows: []db.Workflow{
+				{
+					ID:        wfID,
+					Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
+					Name:      nullString(`echo`),
+					Finished:  true,
+					Output:    `{"some": "thing"}`,
+					Error:     "internal explosion",
+					CreatedAt: hourAgo, // cmpopts.EquateApproxTime
+					UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
+				},
+			},
+		},
+		{
+			desc:     "successful reset",
+			params:   map[string]string{"id": wfID.String(), "name": "greeting"},
+			wantCode: http.StatusSeeOther,
+			wantHeaders: map[string]string{
+				"Location": "/",
+			},
+			wantWorkflows: []db.Workflow{
+				{
+					ID:        wfID,
+					Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
+					Name:      nullString(`echo`),
+					Output:    "{}",
+					CreatedAt: hourAgo,    // cmpopts.EquateApproxTime
+					UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
+				},
+			},
+		},
+	}
+	for _, c := range cases {
+		t.Run(c.desc, func(t *testing.T) {
+			p := testDB(ctx, t)
+			q := db.New(p)
+
+			wf := db.CreateWorkflowParams{
+				ID:        wfID,
+				Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
+				Name:      nullString(`echo`),
+				CreatedAt: hourAgo,
+				UpdatedAt: hourAgo,
+			}
+			if _, err := q.CreateWorkflow(ctx, wf); err != nil {
+				t.Fatalf("CreateWorkflow(_, %v) = _, %v, wanted no error", wf, err)
+			}
+			wff := db.WorkflowFinishedParams{
+				ID:        wf.ID,
+				Finished:  true,
+				Output:    `{"some": "thing"}`,
+				Error:     "internal explosion",
+				UpdatedAt: hourAgo,
+			}
+			if _, err := q.WorkflowFinished(ctx, wff); err != nil {
+				t.Fatalf("WorkflowFinished(_, %v) = _, %v, wanted no error", wff, err)
+			}
+			gtg := db.CreateTaskParams{
+				WorkflowID: wf.ID,
+				Name:       "greeting",
+				Finished:   true,
+				Error:      nullString("internal explosion"),
+				CreatedAt:  hourAgo,
+				UpdatedAt:  hourAgo,
+			}
+			if _, err := q.CreateTask(ctx, gtg); err != nil {
+				t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", gtg, err)
+			}
+			fw := db.CreateTaskParams{
+				WorkflowID: wf.ID,
+				Name:       "farewell",
+				Finished:   true,
+				Error:      nullString("internal explosion"),
+				CreatedAt:  hourAgo,
+				UpdatedAt:  hourAgo,
+			}
+			if _, err := q.CreateTask(ctx, fw); err != nil {
+				t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", fw, err)
+			}
+
+			req := httptest.NewRequest(http.MethodPost, path.Join("/workflows/", c.params["id"], "tasks", c.params["name"], "retry"), nil)
+			req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+			rec := httptest.NewRecorder()
+			s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{})
+
+			s.m.ServeHTTP(rec, req)
+			resp := rec.Result()
+
+			if resp.StatusCode != c.wantCode {
+				t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
+			}
+			for k, v := range c.wantHeaders {
+				if resp.Header.Get(k) != v {
+					t.Errorf("resp.Header.Get(%q) = %q, wanted %q", k, resp.Header.Get(k), v)
+				}
+			}
+			if c.wantCode == http.StatusBadRequest {
+				return
+			}
+			wfs, err := q.Workflows(ctx)
+			if err != nil {
+				t.Fatalf("q.Workflows() = %v, %v, wanted no error", wfs, err)
+			}
+			if diff := cmp.Diff(c.wantWorkflows, wfs, SameUUIDVariant(), cmpopts.EquateApproxTime(time.Minute)); diff != "" {
+				t.Fatalf("q.Workflows() mismatch (-want +got):\n%s", diff)
+			}
+		})
+	}
+}
diff --git a/internal/relui/worker.go b/internal/relui/worker.go
index 100745a..d14a3fb 100644
--- a/internal/relui/worker.go
+++ b/internal/relui/worker.go
@@ -174,10 +174,12 @@
 	taskStates := make(map[string]*workflow.TaskState)
 	for _, t := range tasks {
 		taskStates[t.Name] = &workflow.TaskState{
-			Name:             t.Name,
-			Finished:         t.Finished,
-			SerializedResult: []byte(t.Result.String),
-			Error:            t.Error.String,
+			Name:     t.Name,
+			Finished: t.Finished,
+			Error:    t.Error.String,
+		}
+		if t.Result.Valid {
+			taskStates[t.Name].SerializedResult = []byte(t.Result.String)
 		}
 	}
 	res, err := workflow.Resume(d, state, taskStates)