cmd/relui,internal/relui: create and run workflows

Create and run workflows, storing state in a postgres database. Generate
database queries using sqlc, which allows the author to have complete
control over the query structure.

The basic structure of creating and running a workflow works, but
there is still much work to do in terms of selecting which workflow to
create, recording logs, maintaining a list of currently running
workflows, and recording workflow-level results.

Task persistence needs to be updated, as the current implementation
cannot be resumed due to losing the type during marshalling.

For golang/go#47401

Change-Id: I9ccddf117d023f70568de655c482820a1152d9cb
Reviewed-on: https://go-review.googlesource.com/c/build/+/350449
Trust: Alexander Rakoczy <alex@golang.org>
Run-TryBot: Alexander Rakoczy <alex@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go
index bfed150..af6a7c7 100644
--- a/internal/relui/web_test.go
+++ b/internal/relui/web_test.go
@@ -8,11 +8,28 @@
 package relui
 
 import (
+	"context"
+	"database/sql"
 	"embed"
+	"fmt"
 	"io/ioutil"
+	"log"
 	"net/http"
 	"net/http/httptest"
+	"net/url"
+	"os"
+	"strings"
+	"sync"
 	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+	"github.com/google/go-cmp/cmp/cmpopts"
+	"github.com/google/uuid"
+	"github.com/jackc/pgx/v4"
+	"github.com/jackc/pgx/v4/pgxpool"
+	"golang.org/x/build/internal"
+	"golang.org/x/build/internal/relui/db"
 )
 
 // testStatic is our static web server content.
@@ -86,10 +103,24 @@
 }
 
 func TestServerHomeHandler(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+	p := testDB(ctx, t)
+
+	q := db.New(p)
+	wf := db.CreateWorkflowParams{ID: uuid.New()}
+	if _, err := q.CreateWorkflow(ctx, wf); err != nil {
+		t.Fatalf("CreateWorkflow(_, %v) = _, %v, wanted no error", wf, err)
+	}
+	tp := db.CreateTaskParams{WorkflowID: wf.ID, Name: "TestTask"}
+	if _, err := q.CreateTask(ctx, tp); err != nil {
+		t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", tp, err)
+	}
+
 	req := httptest.NewRequest(http.MethodGet, "/", nil)
 	w := httptest.NewRecorder()
 
-	s := &Server{}
+	s := NewServer(p)
 	s.homeHandler(w, req)
 	resp := w.Result()
 
@@ -110,3 +141,222 @@
 		t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, http.StatusOK)
 	}
 }
+
+func TestServerCreateWorkflowHandler(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	cases := []struct {
+		desc          string
+		params        url.Values
+		wantCode      int
+		wantHeaders   map[string]string
+		wantWorkflows []db.Workflow
+		wantTasks     []db.Task
+	}{
+		{
+			desc:     "bad request",
+			wantCode: http.StatusBadRequest,
+		},
+		{
+			desc:     "successful creation",
+			params:   url.Values{"workflow.params.revision": []string{"abc"}},
+			wantCode: http.StatusSeeOther,
+			wantHeaders: map[string]string{
+				"Location": "/",
+			},
+			wantWorkflows: []db.Workflow{
+				{
+					ID:        uuid.New(), // SameUUIDVariant
+					Params:    nullString(`{"greeting": "abc"}`),
+					Name:      nullString(`Echo`),
+					CreatedAt: time.Now(), // cmpopts.EquateApproxTime
+					UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
+				},
+			},
+			wantTasks: []db.Task{
+				{
+					Name:      "echo",
+					Finished:  false,
+					Error:     sql.NullString{},
+					CreatedAt: time.Now(), // cmpopts.EquateApproxTime
+					UpdatedAt: time.Now(), // cmpopts.EquateApproxTime
+				},
+			},
+		},
+	}
+	for _, c := range cases {
+		t.Run(c.desc, func(t *testing.T) {
+			p := testDB(ctx, t)
+			req := httptest.NewRequest(http.MethodPost, "/workflows/create", strings.NewReader(c.params.Encode()))
+			req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
+			w := httptest.NewRecorder()
+			q := db.New(p)
+
+			s := NewServer(p)
+			s.createWorkflowHandler(w, req)
+			resp := w.Result()
+
+			if resp.StatusCode != c.wantCode {
+				t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode)
+			}
+			for k, v := range c.wantHeaders {
+				if resp.Header.Get(k) != v {
+					t.Errorf("resp.Header.Get(%q) = %q, wanted %q", k, resp.Header.Get(k), v)
+				}
+			}
+			if c.wantCode == http.StatusBadRequest {
+				return
+			}
+			wfs, err := q.Workflows(ctx)
+			if err != nil {
+				t.Fatalf("q.Workflows() = %v, %v, wanted no error", wfs, err)
+			}
+			if diff := cmp.Diff(c.wantWorkflows, wfs, SameUUIDVariant(), cmpopts.EquateApproxTime(time.Minute)); diff != "" {
+				t.Fatalf("q.Workflows() mismatch (-want +got):\n%s", diff)
+			}
+			var tasks []db.Task
+			ctx, cancel := context.WithTimeout(ctx, time.Minute)
+			defer cancel()
+			internal.PeriodicallyDo(ctx, 10*time.Millisecond, func(ctx context.Context, _ time.Time) {
+				tasks, err = q.TasksForWorkflow(ctx, wfs[0].ID)
+				if err != nil {
+					t.Fatalf("q.TasksForWorkflow(_, %q) = %v, %v, wanted no error", wfs[0].ID, tasks, err)
+				}
+				if len(tasks) > 0 {
+					cancel()
+				}
+			})
+			if len(c.wantTasks) > 0 {
+				c.wantTasks[0].WorkflowID = wfs[0].ID
+			}
+			if diff := cmp.Diff(c.wantTasks, tasks, cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.Task{}, "Finished", "Result")); diff != "" {
+				t.Errorf("q.TasksForWorkflow(_, %q) mismatch (-want +got):\n%s", wfs[0].ID, diff)
+			}
+		})
+	}
+}
+
+// resetDB truncates the db connected to in the pgxpool.Pool
+// connection.
+//
+// All tables in the public schema of the connected database will be
+// truncated, with the exception of the migrations table.
+func resetDB(ctx context.Context, t *testing.T, p *pgxpool.Pool) {
+	t.Helper()
+	tableQuery := `SELECT table_name FROM information_schema.tables WHERE table_schema='public'`
+	rows, err := p.Query(ctx, tableQuery)
+	if err != nil {
+		t.Fatalf("p.Query(_, %q, %q) = %v, %v, wanted no error", tableQuery, "public", rows, err)
+	}
+	defer rows.Close()
+	for rows.Next() {
+		var name string
+		if err := rows.Scan(&name); err != nil {
+			t.Fatalf("rows.Scan() = %v, wanted no error", err)
+		}
+		if name == "migrations" {
+			continue
+		}
+		truncQ := fmt.Sprintf("TRUNCATE %s CASCADE", pgx.Identifier{name}.Sanitize())
+		c, err := p.Exec(ctx, truncQ)
+		if err != nil {
+			t.Fatalf("p.Exec(_, %q) = %v, %v", truncQ, c, err)
+		}
+	}
+	if err := rows.Err(); err != nil {
+		log.Fatalf("rows.Err() = %v, wanted no error", err)
+	}
+}
+
+var testPoolOnce sync.Once
+var testPool *pgxpool.Pool
+
+// testDB connects, creates, and migrates a database in preparation
+// for testing, and returns a connection pool to the prepared
+// database.
+//
+// The connection pool is closed as part of a t.Cleanup handler.
+// Database connections are expected to be configured through libpq
+// compatible environment variables. If no PGDATABASE is specified,
+// relui-test will be used.
+//
+// https://www.postgresql.org/docs/current/libpq-envars.html
+func testDB(ctx context.Context, t *testing.T) *pgxpool.Pool {
+	t.Helper()
+	if testing.Short() {
+		t.Skip("Skipping database tests in short mode.")
+	}
+	testPoolOnce.Do(func() {
+		pgdb := url.QueryEscape(os.Getenv("PGDATABASE"))
+		if pgdb == "" {
+			pgdb = "relui-test"
+		}
+		if err := InitDB(ctx, fmt.Sprintf("database=%v", pgdb)); err != nil {
+			t.Skipf("Skipping database integration test: %v", err)
+		}
+		p, err := pgxpool.Connect(ctx, fmt.Sprintf("database=%v", pgdb))
+		if err != nil {
+			t.Skipf("Skipping database integration test: %v", err)
+		}
+		testPool = p
+	})
+	if testPool == nil {
+		t.Skip("Skipping database integration test: testdb = nil. See first error for details.")
+		return nil
+	}
+	t.Cleanup(func() {
+		resetDB(context.Background(), t, testPool)
+	})
+	return testPool
+}
+
+// SameUUIDVariant considers UUIDs equal if they are both the same
+// uuid.Variant. Zero-value uuids are considered equal.
+func SameUUIDVariant() cmp.Option {
+	return cmp.Transformer("SameVariant", func(v uuid.UUID) uuid.Variant {
+		return v.Variant()
+	})
+}
+
+func TestSameUUIDVariant(t *testing.T) {
+	cases := []struct {
+		desc string
+		x    uuid.UUID
+		y    uuid.UUID
+		want bool
+	}{
+		{
+			desc: "both set",
+			x:    uuid.New(),
+			y:    uuid.New(),
+			want: true,
+		},
+		{
+			desc: "both unset",
+			want: true,
+		},
+		{
+			desc: "just x",
+			x:    uuid.New(),
+			want: false,
+		},
+		{
+			desc: "just y",
+			y:    uuid.New(),
+			want: false,
+		},
+	}
+	for _, c := range cases {
+		t.Run(c.desc, func(t *testing.T) {
+			if got := cmp.Equal(c.x, c.y, SameUUIDVariant()); got != c.want {
+				t.Fatalf("cmp.Equal(%v, %v, SameUUIDVariant()) = %t, wanted %t", c.x, c.y, got, c.want)
+			}
+		})
+	}
+}
+
+// nullString returns a sql.NullString for a string.
+func nullString(val string) sql.NullString {
+	return sql.NullString{String: val, Valid: true}
+}