internal/relui: add approval step to build workflow

This change adds a manual approval task to the build before making any
user-facing changes. It uses a sentinel log entry in the approval task's
log in order to verify whether a human has approved the workflow to
continue.

Ideally, we'd change the data model to allow for this, perhaps by adding
a specific type of task, or a special state column, but this should be
sufficient for testing the beta release. Also, there should be some
shared definition of approval, but leaving that for a follow-up CL.

Updates golang/go#53295
Updates golang/go#51797

Change-Id: Iacd8c8a3ba59f9e3a343916dbe1a3a8ba59ce80c
Reviewed-on: https://go-review.googlesource.com/c/build/+/411199
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Reviewed-by: Carlos Amedee <carlos@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Alex Rakoczy <alex@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
diff --git a/cmd/relui/main.go b/cmd/relui/main.go
index 4f0136c..d25f13d 100644
--- a/cmd/relui/main.go
+++ b/cmd/relui/main.go
@@ -118,6 +118,11 @@
 	if err != nil {
 		log.Fatalf("Could not connect to GCS: %v", err)
 	}
+	db, err := pgxpool.Connect(ctx, *pgConnect)
+	if err != nil {
+		log.Fatal(err)
+	}
+	defer db.Close()
 
 	buildTasks := &relui.BuildReleaseTasks{
 		GerritURL:      "https://go.googlesource.com",
@@ -130,6 +135,7 @@
 		PublishFile: func(f *relui.WebsiteFile) error {
 			return publishFile(*websiteUploadURL, userPassAuth, f)
 		},
+		ApproveActionFunc: relui.ApproveActionDep(db),
 	}
 	githubHTTPClient := oauth2.NewClient(ctx, oauth2.StaticTokenSource(&oauth2.Token{AccessToken: *githubToken}))
 	milestoneTasks := &task.MilestoneTasks{
@@ -140,13 +146,8 @@
 		RepoOwner: "golang",
 		RepoName:  "go",
 	}
-
 	relui.RegisterReleaseWorkflows(dh, buildTasks, milestoneTasks, versionTasks)
-	db, err := pgxpool.Connect(ctx, *pgConnect)
-	if err != nil {
-		log.Fatal(err)
-	}
-	defer db.Close()
+
 	w := relui.NewWorker(dh, db, relui.NewPGListener(db))
 	go w.Run(ctx)
 	if err := w.ResumeAll(ctx); err != nil {
diff --git a/internal/relui/buildrelease_test.go b/internal/relui/buildrelease_test.go
index b47b706..3d2b93b 100644
--- a/internal/relui/buildrelease_test.go
+++ b/internal/relui/buildrelease_test.go
@@ -106,6 +106,11 @@
 		CreateBuildlet: fakeBuildlets.createBuildlet,
 		DownloadURL:    dlServer.URL,
 		PublishFile:    publishFile,
+		ApproveActionFunc: func(taskName string) func(*workflow.TaskContext, interface{}) error {
+			return func(_ *workflow.TaskContext, _ interface{}) error {
+				return nil
+			}
+		},
 	}
 	wd := workflow.New()
 	if err := addSingleReleaseWorkflow(buildTasks, milestoneTasks, versionTasks, wd, "go1.18", kind); err != nil {
diff --git a/internal/relui/templates/home.html b/internal/relui/templates/home.html
index 75fd59c..9df7c33 100644
--- a/internal/relui/templates/home.html
+++ b/internal/relui/templates/home.html
@@ -106,6 +106,14 @@
                       </form>
                     </div>
                     {{end}}
+                    {{if and (not $task.Finished) (hasPrefix $task.Name "APPROVE-")}}
+                    <div class="TaskList-approveTask">
+                      <form action="{{baseLink (printf "/workflows/%s/tasks/%s/approve" $workflow.ID $task.Name) }}" method="post">
+                        <input type="hidden" id="workflow.id" name="workflow.id" value="{{$workflow.ID}}" />
+                        <input name="task.approve" type="submit" value="Approve" onclick="return this.form.reportValidity() && confirm('This will mark the task approved and resume the workflow.\n\nReady to proceed?')" />
+                      </form>
+                    </div>
+                    {{end}}
                   </td>
                 </tr>
                 <tr class="TaskList-itemLogsRow">
diff --git a/internal/relui/web.go b/internal/relui/web.go
index e946ac2..8cafbac 100644
--- a/internal/relui/web.go
+++ b/internal/relui/web.go
@@ -76,12 +76,14 @@
 		header:  header,
 	}
 	helpers := map[string]interface{}{
-		"baseLink": s.BaseLink,
+		"baseLink":  s.BaseLink,
+		"hasPrefix": strings.HasPrefix,
 	}
 	layout := template.Must(template.New("layout.html").Funcs(helpers).ParseFS(templates, "templates/layout.html"))
 	s.homeTmpl = template.Must(template.Must(layout.Clone()).Funcs(helpers).ParseFS(templates, "templates/home.html"))
 	s.newWorkflowTmpl = template.Must(template.Must(layout.Clone()).Funcs(helpers).ParseFS(templates, "templates/new_workflow.html"))
 	s.m.POST("/workflows/:id/tasks/:name/retry", s.retryTaskHandler)
+	s.m.POST("/workflows/:id/tasks/:name/approve", s.approveTaskHandler)
 	s.m.Handler(http.MethodGet, "/workflows/new", http.HandlerFunc(s.newWorkflowHandler))
 	s.m.Handler(http.MethodPost, "/workflows", http.HandlerFunc(s.createWorkflowHandler))
 	s.m.Handler(http.MethodGet, "/static/*path", fileServerHandler(static))
@@ -305,3 +307,25 @@
 	l.Printf("workflow reset. Previous state: %#v", wf)
 	return nil
 }
+
+func (s *Server) approveTaskHandler(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
+	id, err := uuid.Parse(params.ByName("id"))
+	if err != nil {
+		log.Printf("approveTaskHandler(_, _, %v) uuid.Parse(%v): %v", params, params.ByName("id"), err)
+		http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
+		return
+	}
+	q := db.New(s.db)
+	t, err := q.Task(r.Context(), db.TaskParams{WorkflowID: id, Name: params.ByName("name")})
+	if errors.Is(err, sql.ErrNoRows) || errors.Is(err, pgx.ErrNoRows) {
+		http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
+		return
+	} else if err != nil {
+		log.Printf("q.Task(_, %q): %v", id, err)
+		http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
+		return
+	}
+	// This log entry serves as approval.
+	s.w.l.Logger(id, t.Name).Printf("USER-APPROVED")
+	http.Redirect(w, r, s.BaseLink("/"), http.StatusSeeOther)
+}
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go
index 40db6b3..f225f68 100644
--- a/internal/relui/web_test.go
+++ b/internal/relui/web_test.go
@@ -423,6 +423,16 @@
 
 	hourAgo := time.Now().Add(-1 * time.Hour)
 	wfID := uuid.New()
+	unchangedWorkflow := db.Workflow{
+		ID:        wfID,
+		Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
+		Name:      nullString(`echo`),
+		Finished:  true,
+		Output:    `{"some": "thing"}`,
+		Error:     "internal explosion",
+		CreatedAt: hourAgo, // cmpopts.EquateApproxTime
+		UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
+	}
 
 	cases := []struct {
 		desc          string
@@ -432,54 +442,21 @@
 		wantWorkflows []db.Workflow
 	}{
 		{
-			desc:     "no params",
-			wantCode: http.StatusNotFound,
-			wantWorkflows: []db.Workflow{
-				{
-					ID:        wfID,
-					Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
-					Name:      nullString(`echo`),
-					Finished:  true,
-					Output:    `{"some": "thing"}`,
-					Error:     "internal explosion",
-					CreatedAt: hourAgo, // cmpopts.EquateApproxTime
-					UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
-				},
-			},
+			desc:          "no params",
+			wantCode:      http.StatusNotFound,
+			wantWorkflows: []db.Workflow{unchangedWorkflow},
 		},
 		{
-			desc:     "invalid workflow id",
-			params:   map[string]string{"id": "invalid", "name": "greeting"},
-			wantCode: http.StatusNotFound,
-			wantWorkflows: []db.Workflow{
-				{
-					ID:        wfID,
-					Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
-					Name:      nullString(`echo`),
-					Finished:  true,
-					Output:    `{"some": "thing"}`,
-					Error:     "internal explosion",
-					CreatedAt: hourAgo, // cmpopts.EquateApproxTime
-					UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
-				},
-			},
+			desc:          "invalid workflow id",
+			params:        map[string]string{"id": "invalid", "name": "greeting"},
+			wantCode:      http.StatusNotFound,
+			wantWorkflows: []db.Workflow{unchangedWorkflow},
 		},
 		{
-			desc:     "wrong workflow id",
-			params:   map[string]string{"id": uuid.New().String(), "name": "greeting"},
-			wantCode: http.StatusNotFound,
-			wantWorkflows: []db.Workflow{
-				{
-					ID:        wfID,
-					Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
-					Name:      nullString(`echo`),
-					Finished:  true,
-					Output:    `{"some": "thing"}`,
-					Error:     "internal explosion",
-					CreatedAt: hourAgo, // cmpopts.EquateApproxTime
-					UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
-				},
-			},
+			desc:          "wrong workflow id",
+			params:        map[string]string{"id": uuid.New().String(), "name": "greeting"},
+			wantCode:      http.StatusNotFound,
+			wantWorkflows: []db.Workflow{unchangedWorkflow},
 		},
 		{
 			desc:     "invalid task name",
@@ -594,3 +571,112 @@
 		})
 	}
 }
+
+func TestServerApproveTaskHandler(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	hourAgo := time.Now().Add(-1 * time.Hour)
+	wfID := uuid.New()
+
+	cases := []struct {
+		desc        string
+		params      map[string]string
+		wantCode    int
+		wantHeaders map[string]string
+		wantLogs    []db.TaskLog
+	}{
+		{
+			desc:     "no params",
+			wantCode: http.StatusNotFound,
+		},
+		{
+			desc:     "invalid workflow id",
+			params:   map[string]string{"id": "invalid", "name": "greeting"},
+			wantCode: http.StatusNotFound,
+		},
+		{
+			desc:     "wrong workflow id",
+			params:   map[string]string{"id": uuid.New().String(), "name": "greeting"},
+			wantCode: http.StatusNotFound,
+		},
+		{
+			desc:     "invalid task name",
+			params:   map[string]string{"id": wfID.String(), "name": "invalid"},
+			wantCode: http.StatusNotFound,
+		},
+		{
+			desc:     "successful reset",
+			params:   map[string]string{"id": wfID.String(), "name": "APPROVE-please"},
+			wantCode: http.StatusSeeOther,
+			wantHeaders: map[string]string{
+				"Location": "/",
+			},
+			wantLogs: []db.TaskLog{{
+				WorkflowID: wfID,
+				TaskName:   "APPROVE-please",
+				Body:       "USER-APPROVED",
+				CreatedAt:  time.Now(),
+				UpdatedAt:  time.Now(),
+			}},
+		},
+	}
+	for _, c := range cases {
+		t.Run(c.desc, func(t *testing.T) {
+			p := testDB(ctx, t)
+			q := db.New(p)
+
+			wf := db.CreateWorkflowParams{
+				ID:        wfID,
+				Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
+				Name:      nullString(`echo`),
+				CreatedAt: hourAgo,
+				UpdatedAt: hourAgo,
+			}
+			if _, err := q.CreateWorkflow(ctx, wf); err != nil {
+				t.Fatalf("CreateWorkflow(_, %v) = _, %v, wanted no error", wf, err)
+			}
+			gtg := db.CreateTaskParams{
+				WorkflowID: wf.ID,
+				Name:       "APPROVE-please",
+				Finished:   true,
+				Error:      nullString("internal explosion"),
+				CreatedAt:  hourAgo,
+				UpdatedAt:  hourAgo,
+			}
+			if _, err := q.CreateTask(ctx, gtg); err != nil {
+				t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", gtg, err)
+			}
+
+			req := httptest.NewRequest(http.MethodPost, path.Join("/workflows/", c.params["id"], "tasks", c.params["name"], "approve"), nil)
+			req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+			rec := httptest.NewRecorder()
+			s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{})
+
+			s.m.ServeHTTP(rec, req)
+			resp := rec.Result()
+
+			if resp.StatusCode != c.wantCode {
+				t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
+			}
+			for k, v := range c.wantHeaders {
+				if resp.Header.Get(k) != v {
+					t.Errorf("resp.Header.Get(%q) = %q, wanted %q", k, resp.Header.Get(k), v)
+				}
+			}
+			if c.wantCode == http.StatusBadRequest {
+				return
+			}
+			logs, err := q.TaskLogsForTask(ctx, db.TaskLogsForTaskParams{
+				WorkflowID: wfID,
+				TaskName:   "APPROVE-please",
+			})
+			if err != nil {
+				t.Fatalf("q.TaskLogsForTask() = %v, %v, wanted no error", logs, err)
+			}
+			if diff := cmp.Diff(c.wantLogs, logs, SameUUIDVariant(), cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.TaskLog{}, "ID")); diff != "" {
+				t.Fatalf("q.TaskLogsForTask() mismatch (-want +got):\n%s", diff)
+			}
+		})
+	}
+}
diff --git a/internal/relui/workflows.go b/internal/relui/workflows.go
index 29b6824..1d9527d 100644
--- a/internal/relui/workflows.go
+++ b/internal/relui/workflows.go
@@ -12,14 +12,17 @@
 	"math/rand"
 	"net/http"
 	"path"
+	"strings"
 	"sync"
 	"time"
 
 	"cloud.google.com/go/storage"
+	"github.com/jackc/pgx/v4/pgxpool"
 	"golang.org/x/build/buildlet"
 	"golang.org/x/build/dashboard"
 	"golang.org/x/build/internal/gcsfs"
 	"golang.org/x/build/internal/releasetargets"
+	"golang.org/x/build/internal/relui/db"
 	"golang.org/x/build/internal/task"
 	"golang.org/x/build/internal/workflow"
 )
@@ -187,6 +190,64 @@
 	return arg, nil
 }
 
+type AwaitConditionFunc func(ctx *workflow.TaskContext) (done bool, err error)
+
+// AwaitFunc is a workflow.Task that polls the provided awaitCondition
+// every period until it either returns true or returns an error.
+func AwaitFunc(ctx *workflow.TaskContext, period time.Duration, awaitCondition AwaitConditionFunc) (bool, error) {
+	ticker := time.NewTicker(period)
+	defer ticker.Stop()
+	for {
+		select {
+		case <-ctx.Done():
+			return false, ctx.Err()
+		case <-ticker.C:
+			ok, err := awaitCondition(ctx)
+			if ok || err != nil {
+				return ok, err
+			}
+		}
+	}
+}
+
+// AwaitAction is a workflow.Action that is a convenience wrapper
+// around AwaitFunc.
+func AwaitAction(ctx *workflow.TaskContext, period time.Duration, awaitCondition AwaitConditionFunc) error {
+	_, err := AwaitFunc(ctx, period, awaitCondition)
+	return err
+}
+
+func checkTaskApproved(ctx *workflow.TaskContext, p *pgxpool.Pool, taskName string) (bool, error) {
+	q := db.New(p)
+	logs, err := q.TaskLogsForTask(ctx, db.TaskLogsForTaskParams{
+		WorkflowID: ctx.WorkflowID,
+		TaskName:   taskName,
+	})
+	if err != nil {
+		return false, err
+	}
+	for _, l := range logs {
+		if strings.Contains(l.Body, "USER-APPROVED") {
+			return true, nil
+		}
+	}
+	return false, nil
+}
+
+func approveActionDep(p *pgxpool.Pool, taskName string) func(*workflow.TaskContext, interface{}) error {
+	return func(ctx *workflow.TaskContext, _ interface{}) error {
+		return AwaitAction(ctx, 10*time.Second, func(ctx *workflow.TaskContext) (done bool, err error) {
+			return checkTaskApproved(ctx, p, taskName)
+		})
+	}
+}
+
+func ApproveActionDep(p *pgxpool.Pool) func(taskName string) func(*workflow.TaskContext, interface{}) error {
+	return func(taskName string) func(*workflow.TaskContext, interface{}) error {
+		return approveActionDep(p, taskName)
+	}
+}
+
 func RegisterReleaseWorkflows(h *DefinitionHolder, build *BuildReleaseTasks, milestone *task.MilestoneTasks, version *task.VersionTasks) error {
 	createSingle := func(name, major string, kind task.ReleaseKind) error {
 		wd := workflow.New()
@@ -250,8 +311,11 @@
 		return err
 	}
 
+	verifiedName := "APPROVE-Wait for Release Coordinator Approval"
+	verified := wd.Action(verifiedName, build.ApproveActionFunc(verifiedName), signedAndTestedArtifacts)
+
 	// Tag version and upload to CDN/website.
-	uploaded := wd.Action("Upload artifacts to CDN", build.uploadArtifacts, signedAndTestedArtifacts)
+	uploaded := wd.Action("Upload artifacts to CDN", build.uploadArtifacts, signedAndTestedArtifacts, verified)
 
 	tagCommit := branchHead
 	if branch != "master" {
@@ -322,6 +386,7 @@
 	DownloadURL                        string
 	PublishFile                        func(*WebsiteFile) error
 	CreateBuildlet                     func(string) (buildlet.Client, error)
+	ApproveActionFunc                  func(taskName string) func(*workflow.TaskContext, interface{}) error
 }
 
 func (b *BuildReleaseTasks) buildSource(ctx *workflow.TaskContext, revision, version string) (artifact, error) {
diff --git a/internal/relui/workflows_test.go b/internal/relui/workflows_test.go
new file mode 100644
index 0000000..cc326d8
--- /dev/null
+++ b/internal/relui/workflows_test.go
@@ -0,0 +1,156 @@
+package relui
+
+import (
+	"context"
+	"errors"
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+	"github.com/google/uuid"
+	"golang.org/x/build/internal/relui/db"
+	"golang.org/x/build/internal/workflow"
+)
+
+func TestAwaitFunc(t *testing.T) {
+	cases := []struct {
+		desc       string
+		want       map[string]interface{}
+		wantErr    bool
+		wantCancel bool
+	}{
+		{
+			desc: "success",
+			want: map[string]interface{}{"await": true},
+		},
+		{
+			desc:    "error",
+			wantErr: true,
+		},
+		{
+			desc:       "cancel",
+			wantCancel: true,
+			wantErr:    true,
+		},
+	}
+	for _, c := range cases {
+		t.Run(c.desc, func(t *testing.T) {
+			ctx, cancel := context.WithCancel(context.Background())
+			defer cancel()
+
+			didWork := make(chan struct{}, 2)
+			success := make(chan interface{})
+			done := make(chan interface{})
+			cond := func(ctx *workflow.TaskContext) (bool, error) {
+				select {
+				case <-success:
+					if c.wantCancel {
+						cancel()
+						return false, ctx.Err()
+					} else if c.wantErr {
+						return false, errors.New("someError")
+					}
+					return true, nil
+				case <-ctx.Done():
+					return false, ctx.Err()
+				case didWork <- struct{}{}:
+					return false, nil
+				}
+			}
+			wd := workflow.New()
+			await := wd.Task("AwaitFunc", AwaitFunc, wd.Constant(10*time.Millisecond), wd.Constant(cond))
+			wd.Output("await", await)
+
+			w, err := workflow.Start(wd, nil)
+			if err != nil {
+				t.Fatalf("workflow.Start(%v, %v) = %v, %v, wanted no error", wd, nil, w, err)
+			}
+			go func() {
+				outputs, err := runWorkflow(t, ctx, w, nil)
+				if diff := cmp.Diff(c.want, outputs); diff != "" {
+					t.Errorf("runWorkflow() mismatch (-want +got):\n%s", diff)
+				}
+				if (err != nil) != c.wantErr {
+					t.Errorf("runworkflow() = _, %v, wantErr: %v", err, c.wantErr)
+				}
+				close(done)
+			}()
+
+			select {
+			case <-time.After(5 * time.Second):
+				t.Error("AwaitFunc() never called f, wanted at least one call")
+			case <-didWork:
+				// AwaitFunc() called f successfully.
+			}
+			select {
+			case <-done:
+				t.Errorf("AwaitFunc() finished early, wanted it to still be looping")
+			case <-didWork:
+				close(success)
+			}
+			<-done
+		})
+	}
+}
+
+func TestCheckTaskApproved(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	hourAgo := time.Now().Add(-1 * time.Hour)
+	p := testDB(ctx, t)
+	q := db.New(p)
+
+	wf := db.CreateWorkflowParams{
+		ID:        uuid.New(),
+		Params:    nullString(`{"farewell": "bye", "greeting": "hello"}`),
+		Name:      nullString(`echo`),
+		CreatedAt: hourAgo,
+		UpdatedAt: hourAgo,
+	}
+	if _, err := q.CreateWorkflow(ctx, wf); err != nil {
+		t.Fatalf("CreateWorkflow(_, %v) = _, %v, wanted no error", wf, err)
+	}
+	gtg := db.CreateTaskParams{
+		WorkflowID: wf.ID,
+		Name:       "APPROVE-please",
+		Finished:   true,
+		Error:      nullString("internal explosion"),
+		CreatedAt:  hourAgo,
+		UpdatedAt:  hourAgo,
+	}
+	if _, err := q.CreateTask(ctx, gtg); err != nil {
+		t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", gtg, err)
+	}
+	tctx := &workflow.TaskContext{Context: ctx, WorkflowID: wf.ID}
+
+	got, err := checkTaskApproved(tctx, p, gtg.Name)
+	if err != nil || got {
+		t.Errorf("checkTaskApproved(_, %v, %q) = %t, %v wanted %t, %v", p, gtg.Name, got, err, false, nil)
+	}
+
+	ctlp := db.CreateTaskLogParams{
+		WorkflowID: wf.ID,
+		TaskName:   gtg.Name,
+		Body:       "USER-APPROVED",
+	}
+	_, err = q.CreateTaskLog(ctx, ctlp)
+	if err != nil {
+		t.Errorf("CreateTaskLog(_, %v) = _, %v, wanted no error", ctlp, err)
+	}
+
+	got, err = checkTaskApproved(tctx, p, gtg.Name)
+	if err != nil || !got {
+		t.Errorf("checkTaskApproved(_, %v, %q) = %t, %v wanted %t, %v", p, gtg.Name, got, err, true, nil)
+	}
+}
+
+func runWorkflow(t *testing.T, ctx context.Context, w *workflow.Workflow, listener workflow.Listener) (map[string]interface{}, error) {
+	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+	defer cancel()
+	t.Helper()
+	if listener == nil {
+		listener = &verboseListener{t}
+	}
+	return w.Run(ctx, listener)
+}