internal/relui: add approval step to build workflow
This change adds a manual approval task to the build before making any
user-facing changes. It uses a sentinel log entry in the approval task's
log in order to verify whether a human has approved the workflow to
continue.
Ideally, we'd change the data model to allow for this, perhaps by adding
a specific type of task, or a special state column, but this should be
sufficient for testing the beta release. Also, there should be some
shared definition of approval, but leaving that for a follow-up CL.
Updates golang/go#53295
Updates golang/go#51797
Change-Id: Iacd8c8a3ba59f9e3a343916dbe1a3a8ba59ce80c
Reviewed-on: https://go-review.googlesource.com/c/build/+/411199
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
Reviewed-by: Carlos Amedee <carlos@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Run-TryBot: Alex Rakoczy <alex@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
diff --git a/cmd/relui/main.go b/cmd/relui/main.go
index 4f0136c..d25f13d 100644
--- a/cmd/relui/main.go
+++ b/cmd/relui/main.go
@@ -118,6 +118,11 @@
if err != nil {
log.Fatalf("Could not connect to GCS: %v", err)
}
+ db, err := pgxpool.Connect(ctx, *pgConnect)
+ if err != nil {
+ log.Fatal(err)
+ }
+ defer db.Close()
buildTasks := &relui.BuildReleaseTasks{
GerritURL: "https://go.googlesource.com",
@@ -130,6 +135,7 @@
PublishFile: func(f *relui.WebsiteFile) error {
return publishFile(*websiteUploadURL, userPassAuth, f)
},
+ ApproveActionFunc: relui.ApproveActionDep(db),
}
githubHTTPClient := oauth2.NewClient(ctx, oauth2.StaticTokenSource(&oauth2.Token{AccessToken: *githubToken}))
milestoneTasks := &task.MilestoneTasks{
@@ -140,13 +146,8 @@
RepoOwner: "golang",
RepoName: "go",
}
-
relui.RegisterReleaseWorkflows(dh, buildTasks, milestoneTasks, versionTasks)
- db, err := pgxpool.Connect(ctx, *pgConnect)
- if err != nil {
- log.Fatal(err)
- }
- defer db.Close()
+
w := relui.NewWorker(dh, db, relui.NewPGListener(db))
go w.Run(ctx)
if err := w.ResumeAll(ctx); err != nil {
diff --git a/internal/relui/buildrelease_test.go b/internal/relui/buildrelease_test.go
index b47b706..3d2b93b 100644
--- a/internal/relui/buildrelease_test.go
+++ b/internal/relui/buildrelease_test.go
@@ -106,6 +106,11 @@
CreateBuildlet: fakeBuildlets.createBuildlet,
DownloadURL: dlServer.URL,
PublishFile: publishFile,
+ ApproveActionFunc: func(taskName string) func(*workflow.TaskContext, interface{}) error {
+ return func(_ *workflow.TaskContext, _ interface{}) error {
+ return nil
+ }
+ },
}
wd := workflow.New()
if err := addSingleReleaseWorkflow(buildTasks, milestoneTasks, versionTasks, wd, "go1.18", kind); err != nil {
diff --git a/internal/relui/templates/home.html b/internal/relui/templates/home.html
index 75fd59c..9df7c33 100644
--- a/internal/relui/templates/home.html
+++ b/internal/relui/templates/home.html
@@ -106,6 +106,14 @@
</form>
</div>
{{end}}
+ {{if and (not $task.Finished) (hasPrefix $task.Name "APPROVE-")}}
+ <div class="TaskList-approveTask">
+ <form action="{{baseLink (printf "/workflows/%s/tasks/%s/approve" $workflow.ID $task.Name) }}" method="post">
+ <input type="hidden" id="workflow.id" name="workflow.id" value="{{$workflow.ID}}" />
+ <input name="task.approve" type="submit" value="Approve" onclick="return this.form.reportValidity() && confirm('This will mark the task approved and resume the workflow.\n\nReady to proceed?')" />
+ </form>
+ </div>
+ {{end}}
</td>
</tr>
<tr class="TaskList-itemLogsRow">
diff --git a/internal/relui/web.go b/internal/relui/web.go
index e946ac2..8cafbac 100644
--- a/internal/relui/web.go
+++ b/internal/relui/web.go
@@ -76,12 +76,14 @@
header: header,
}
helpers := map[string]interface{}{
- "baseLink": s.BaseLink,
+ "baseLink": s.BaseLink,
+ "hasPrefix": strings.HasPrefix,
}
layout := template.Must(template.New("layout.html").Funcs(helpers).ParseFS(templates, "templates/layout.html"))
s.homeTmpl = template.Must(template.Must(layout.Clone()).Funcs(helpers).ParseFS(templates, "templates/home.html"))
s.newWorkflowTmpl = template.Must(template.Must(layout.Clone()).Funcs(helpers).ParseFS(templates, "templates/new_workflow.html"))
s.m.POST("/workflows/:id/tasks/:name/retry", s.retryTaskHandler)
+ s.m.POST("/workflows/:id/tasks/:name/approve", s.approveTaskHandler)
s.m.Handler(http.MethodGet, "/workflows/new", http.HandlerFunc(s.newWorkflowHandler))
s.m.Handler(http.MethodPost, "/workflows", http.HandlerFunc(s.createWorkflowHandler))
s.m.Handler(http.MethodGet, "/static/*path", fileServerHandler(static))
@@ -305,3 +307,25 @@
l.Printf("workflow reset. Previous state: %#v", wf)
return nil
}
+
+func (s *Server) approveTaskHandler(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
+ id, err := uuid.Parse(params.ByName("id"))
+ if err != nil {
+ log.Printf("approveTaskHandler(_, _, %v) uuid.Parse(%v): %v", params, params.ByName("id"), err)
+ http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
+ return
+ }
+ q := db.New(s.db)
+ t, err := q.Task(r.Context(), db.TaskParams{WorkflowID: id, Name: params.ByName("name")})
+ if errors.Is(err, sql.ErrNoRows) || errors.Is(err, pgx.ErrNoRows) {
+ http.Error(w, http.StatusText(http.StatusNotFound), http.StatusNotFound)
+ return
+ } else if err != nil {
+ log.Printf("q.Task(_, %q): %v", id, err)
+ http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError)
+ return
+ }
+ // This log entry serves as approval.
+ s.w.l.Logger(id, t.Name).Printf("USER-APPROVED")
+ http.Redirect(w, r, s.BaseLink("/"), http.StatusSeeOther)
+}
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go
index 40db6b3..f225f68 100644
--- a/internal/relui/web_test.go
+++ b/internal/relui/web_test.go
@@ -423,6 +423,16 @@
hourAgo := time.Now().Add(-1 * time.Hour)
wfID := uuid.New()
+ unchangedWorkflow := db.Workflow{
+ ID: wfID,
+ Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
+ Name: nullString(`echo`),
+ Finished: true,
+ Output: `{"some": "thing"}`,
+ Error: "internal explosion",
+ CreatedAt: hourAgo, // cmpopts.EquateApproxTime
+ UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
+ }
cases := []struct {
desc string
@@ -432,54 +442,21 @@
wantWorkflows []db.Workflow
}{
{
- desc: "no params",
- wantCode: http.StatusNotFound,
- wantWorkflows: []db.Workflow{
- {
- ID: wfID,
- Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
- Name: nullString(`echo`),
- Finished: true,
- Output: `{"some": "thing"}`,
- Error: "internal explosion",
- CreatedAt: hourAgo, // cmpopts.EquateApproxTime
- UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
- },
- },
+ desc: "no params",
+ wantCode: http.StatusNotFound,
+ wantWorkflows: []db.Workflow{unchangedWorkflow},
},
{
- desc: "invalid workflow id",
- params: map[string]string{"id": "invalid", "name": "greeting"},
- wantCode: http.StatusNotFound,
- wantWorkflows: []db.Workflow{
- {
- ID: wfID,
- Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
- Name: nullString(`echo`),
- Finished: true,
- Output: `{"some": "thing"}`,
- Error: "internal explosion",
- CreatedAt: hourAgo, // cmpopts.EquateApproxTime
- UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
- },
- },
+ desc: "invalid workflow id",
+ params: map[string]string{"id": "invalid", "name": "greeting"},
+ wantCode: http.StatusNotFound,
+ wantWorkflows: []db.Workflow{unchangedWorkflow},
},
{
- desc: "wrong workflow id",
- params: map[string]string{"id": uuid.New().String(), "name": "greeting"},
- wantCode: http.StatusNotFound,
- wantWorkflows: []db.Workflow{
- {
- ID: wfID,
- Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
- Name: nullString(`echo`),
- Finished: true,
- Output: `{"some": "thing"}`,
- Error: "internal explosion",
- CreatedAt: hourAgo, // cmpopts.EquateApproxTime
- UpdatedAt: hourAgo, // cmpopts.EquateApproxTime
- },
- },
+ desc: "wrong workflow id",
+ params: map[string]string{"id": uuid.New().String(), "name": "greeting"},
+ wantCode: http.StatusNotFound,
+ wantWorkflows: []db.Workflow{unchangedWorkflow},
},
{
desc: "invalid task name",
@@ -594,3 +571,112 @@
})
}
}
+
+func TestServerApproveTaskHandler(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ hourAgo := time.Now().Add(-1 * time.Hour)
+ wfID := uuid.New()
+
+ cases := []struct {
+ desc string
+ params map[string]string
+ wantCode int
+ wantHeaders map[string]string
+ wantLogs []db.TaskLog
+ }{
+ {
+ desc: "no params",
+ wantCode: http.StatusNotFound,
+ },
+ {
+ desc: "invalid workflow id",
+ params: map[string]string{"id": "invalid", "name": "greeting"},
+ wantCode: http.StatusNotFound,
+ },
+ {
+ desc: "wrong workflow id",
+ params: map[string]string{"id": uuid.New().String(), "name": "greeting"},
+ wantCode: http.StatusNotFound,
+ },
+ {
+ desc: "invalid task name",
+ params: map[string]string{"id": wfID.String(), "name": "invalid"},
+ wantCode: http.StatusNotFound,
+ },
+ {
+ desc: "successful reset",
+ params: map[string]string{"id": wfID.String(), "name": "APPROVE-please"},
+ wantCode: http.StatusSeeOther,
+ wantHeaders: map[string]string{
+ "Location": "/",
+ },
+ wantLogs: []db.TaskLog{{
+ WorkflowID: wfID,
+ TaskName: "APPROVE-please",
+ Body: "USER-APPROVED",
+ CreatedAt: time.Now(),
+ UpdatedAt: time.Now(),
+ }},
+ },
+ }
+ for _, c := range cases {
+ t.Run(c.desc, func(t *testing.T) {
+ p := testDB(ctx, t)
+ q := db.New(p)
+
+ wf := db.CreateWorkflowParams{
+ ID: wfID,
+ Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
+ Name: nullString(`echo`),
+ CreatedAt: hourAgo,
+ UpdatedAt: hourAgo,
+ }
+ if _, err := q.CreateWorkflow(ctx, wf); err != nil {
+ t.Fatalf("CreateWorkflow(_, %v) = _, %v, wanted no error", wf, err)
+ }
+ gtg := db.CreateTaskParams{
+ WorkflowID: wf.ID,
+ Name: "APPROVE-please",
+ Finished: true,
+ Error: nullString("internal explosion"),
+ CreatedAt: hourAgo,
+ UpdatedAt: hourAgo,
+ }
+ if _, err := q.CreateTask(ctx, gtg); err != nil {
+ t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", gtg, err)
+ }
+
+ req := httptest.NewRequest(http.MethodPost, path.Join("/workflows/", c.params["id"], "tasks", c.params["name"], "approve"), nil)
+ req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+ rec := httptest.NewRecorder()
+ s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil, SiteHeader{})
+
+ s.m.ServeHTTP(rec, req)
+ resp := rec.Result()
+
+ if resp.StatusCode != c.wantCode {
+ t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
+ }
+ for k, v := range c.wantHeaders {
+ if resp.Header.Get(k) != v {
+ t.Errorf("resp.Header.Get(%q) = %q, wanted %q", k, resp.Header.Get(k), v)
+ }
+ }
+ if c.wantCode == http.StatusBadRequest {
+ return
+ }
+ logs, err := q.TaskLogsForTask(ctx, db.TaskLogsForTaskParams{
+ WorkflowID: wfID,
+ TaskName: "APPROVE-please",
+ })
+ if err != nil {
+ t.Fatalf("q.TaskLogsForTask() = %v, %v, wanted no error", logs, err)
+ }
+ if diff := cmp.Diff(c.wantLogs, logs, SameUUIDVariant(), cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.TaskLog{}, "ID")); diff != "" {
+ t.Fatalf("q.TaskLogsForTask() mismatch (-want +got):\n%s", diff)
+ }
+ })
+ }
+}
diff --git a/internal/relui/workflows.go b/internal/relui/workflows.go
index 29b6824..1d9527d 100644
--- a/internal/relui/workflows.go
+++ b/internal/relui/workflows.go
@@ -12,14 +12,17 @@
"math/rand"
"net/http"
"path"
+ "strings"
"sync"
"time"
"cloud.google.com/go/storage"
+ "github.com/jackc/pgx/v4/pgxpool"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal/gcsfs"
"golang.org/x/build/internal/releasetargets"
+ "golang.org/x/build/internal/relui/db"
"golang.org/x/build/internal/task"
"golang.org/x/build/internal/workflow"
)
@@ -187,6 +190,64 @@
return arg, nil
}
+type AwaitConditionFunc func(ctx *workflow.TaskContext) (done bool, err error)
+
+// AwaitFunc is a workflow.Task that polls the provided awaitCondition
+// every period until it either returns true or returns an error.
+func AwaitFunc(ctx *workflow.TaskContext, period time.Duration, awaitCondition AwaitConditionFunc) (bool, error) {
+ ticker := time.NewTicker(period)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return false, ctx.Err()
+ case <-ticker.C:
+ ok, err := awaitCondition(ctx)
+ if ok || err != nil {
+ return ok, err
+ }
+ }
+ }
+}
+
+// AwaitAction is a workflow.Action that is a convenience wrapper
+// around AwaitFunc.
+func AwaitAction(ctx *workflow.TaskContext, period time.Duration, awaitCondition AwaitConditionFunc) error {
+ _, err := AwaitFunc(ctx, period, awaitCondition)
+ return err
+}
+
+func checkTaskApproved(ctx *workflow.TaskContext, p *pgxpool.Pool, taskName string) (bool, error) {
+ q := db.New(p)
+ logs, err := q.TaskLogsForTask(ctx, db.TaskLogsForTaskParams{
+ WorkflowID: ctx.WorkflowID,
+ TaskName: taskName,
+ })
+ if err != nil {
+ return false, err
+ }
+ for _, l := range logs {
+ if strings.Contains(l.Body, "USER-APPROVED") {
+ return true, nil
+ }
+ }
+ return false, nil
+}
+
+func approveActionDep(p *pgxpool.Pool, taskName string) func(*workflow.TaskContext, interface{}) error {
+ return func(ctx *workflow.TaskContext, _ interface{}) error {
+ return AwaitAction(ctx, 10*time.Second, func(ctx *workflow.TaskContext) (done bool, err error) {
+ return checkTaskApproved(ctx, p, taskName)
+ })
+ }
+}
+
+func ApproveActionDep(p *pgxpool.Pool) func(taskName string) func(*workflow.TaskContext, interface{}) error {
+ return func(taskName string) func(*workflow.TaskContext, interface{}) error {
+ return approveActionDep(p, taskName)
+ }
+}
+
func RegisterReleaseWorkflows(h *DefinitionHolder, build *BuildReleaseTasks, milestone *task.MilestoneTasks, version *task.VersionTasks) error {
createSingle := func(name, major string, kind task.ReleaseKind) error {
wd := workflow.New()
@@ -250,8 +311,11 @@
return err
}
+ verifiedName := "APPROVE-Wait for Release Coordinator Approval"
+ verified := wd.Action(verifiedName, build.ApproveActionFunc(verifiedName), signedAndTestedArtifacts)
+
// Tag version and upload to CDN/website.
- uploaded := wd.Action("Upload artifacts to CDN", build.uploadArtifacts, signedAndTestedArtifacts)
+ uploaded := wd.Action("Upload artifacts to CDN", build.uploadArtifacts, signedAndTestedArtifacts, verified)
tagCommit := branchHead
if branch != "master" {
@@ -322,6 +386,7 @@
DownloadURL string
PublishFile func(*WebsiteFile) error
CreateBuildlet func(string) (buildlet.Client, error)
+ ApproveActionFunc func(taskName string) func(*workflow.TaskContext, interface{}) error
}
func (b *BuildReleaseTasks) buildSource(ctx *workflow.TaskContext, revision, version string) (artifact, error) {
diff --git a/internal/relui/workflows_test.go b/internal/relui/workflows_test.go
new file mode 100644
index 0000000..cc326d8
--- /dev/null
+++ b/internal/relui/workflows_test.go
@@ -0,0 +1,156 @@
+package relui
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+ "github.com/google/uuid"
+ "golang.org/x/build/internal/relui/db"
+ "golang.org/x/build/internal/workflow"
+)
+
+func TestAwaitFunc(t *testing.T) {
+ cases := []struct {
+ desc string
+ want map[string]interface{}
+ wantErr bool
+ wantCancel bool
+ }{
+ {
+ desc: "success",
+ want: map[string]interface{}{"await": true},
+ },
+ {
+ desc: "error",
+ wantErr: true,
+ },
+ {
+ desc: "cancel",
+ wantCancel: true,
+ wantErr: true,
+ },
+ }
+ for _, c := range cases {
+ t.Run(c.desc, func(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ didWork := make(chan struct{}, 2)
+ success := make(chan interface{})
+ done := make(chan interface{})
+ cond := func(ctx *workflow.TaskContext) (bool, error) {
+ select {
+ case <-success:
+ if c.wantCancel {
+ cancel()
+ return false, ctx.Err()
+ } else if c.wantErr {
+ return false, errors.New("someError")
+ }
+ return true, nil
+ case <-ctx.Done():
+ return false, ctx.Err()
+ case didWork <- struct{}{}:
+ return false, nil
+ }
+ }
+ wd := workflow.New()
+ await := wd.Task("AwaitFunc", AwaitFunc, wd.Constant(10*time.Millisecond), wd.Constant(cond))
+ wd.Output("await", await)
+
+ w, err := workflow.Start(wd, nil)
+ if err != nil {
+ t.Fatalf("workflow.Start(%v, %v) = %v, %v, wanted no error", wd, nil, w, err)
+ }
+ go func() {
+ outputs, err := runWorkflow(t, ctx, w, nil)
+ if diff := cmp.Diff(c.want, outputs); diff != "" {
+ t.Errorf("runWorkflow() mismatch (-want +got):\n%s", diff)
+ }
+ if (err != nil) != c.wantErr {
+ t.Errorf("runworkflow() = _, %v, wantErr: %v", err, c.wantErr)
+ }
+ close(done)
+ }()
+
+ select {
+ case <-time.After(5 * time.Second):
+ t.Error("AwaitFunc() never called f, wanted at least one call")
+ case <-didWork:
+ // AwaitFunc() called f successfully.
+ }
+ select {
+ case <-done:
+ t.Errorf("AwaitFunc() finished early, wanted it to still be looping")
+ case <-didWork:
+ close(success)
+ }
+ <-done
+ })
+ }
+}
+
+func TestCheckTaskApproved(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ hourAgo := time.Now().Add(-1 * time.Hour)
+ p := testDB(ctx, t)
+ q := db.New(p)
+
+ wf := db.CreateWorkflowParams{
+ ID: uuid.New(),
+ Params: nullString(`{"farewell": "bye", "greeting": "hello"}`),
+ Name: nullString(`echo`),
+ CreatedAt: hourAgo,
+ UpdatedAt: hourAgo,
+ }
+ if _, err := q.CreateWorkflow(ctx, wf); err != nil {
+ t.Fatalf("CreateWorkflow(_, %v) = _, %v, wanted no error", wf, err)
+ }
+ gtg := db.CreateTaskParams{
+ WorkflowID: wf.ID,
+ Name: "APPROVE-please",
+ Finished: true,
+ Error: nullString("internal explosion"),
+ CreatedAt: hourAgo,
+ UpdatedAt: hourAgo,
+ }
+ if _, err := q.CreateTask(ctx, gtg); err != nil {
+ t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", gtg, err)
+ }
+ tctx := &workflow.TaskContext{Context: ctx, WorkflowID: wf.ID}
+
+ got, err := checkTaskApproved(tctx, p, gtg.Name)
+ if err != nil || got {
+ t.Errorf("checkTaskApproved(_, %v, %q) = %t, %v wanted %t, %v", p, gtg.Name, got, err, false, nil)
+ }
+
+ ctlp := db.CreateTaskLogParams{
+ WorkflowID: wf.ID,
+ TaskName: gtg.Name,
+ Body: "USER-APPROVED",
+ }
+ _, err = q.CreateTaskLog(ctx, ctlp)
+ if err != nil {
+ t.Errorf("CreateTaskLog(_, %v) = _, %v, wanted no error", ctlp, err)
+ }
+
+ got, err = checkTaskApproved(tctx, p, gtg.Name)
+ if err != nil || !got {
+ t.Errorf("checkTaskApproved(_, %v, %q) = %t, %v wanted %t, %v", p, gtg.Name, got, err, true, nil)
+ }
+}
+
+func runWorkflow(t *testing.T, ctx context.Context, w *workflow.Workflow, listener workflow.Listener) (map[string]interface{}, error) {
+ ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+ t.Helper()
+ if listener == nil {
+ listener = &verboseListener{t}
+ }
+ return w.Run(ctx, listener)
+}