cmd/relui,internal/relui: create and run workflows Create and run workflows, storing state in a postgres database. Generate database queries using sqlc, which allows the author to have complete control over the query structure. The basic structure of creating and running a workflow works, but there is still much work to do in terms of selecting which workflow to create, recording logs, maintaining a list of currently running workflows, and recording workflow-level results. Task persistence needs to be updated, as the current implementation cannot be resumed due to losing the type during marshalling. For golang/go#47401 Change-Id: I9ccddf117d023f70568de655c482820a1152d9cb Reviewed-on: https://go-review.googlesource.com/c/build/+/350449 Trust: Alexander Rakoczy <alex@golang.org> Run-TryBot: Alexander Rakoczy <alex@golang.org> TryBot-Result: Go Bot <gobot@golang.org> Reviewed-by: Heschi Kreinick <heschi@google.com>
diff --git a/cmd/relui/Makefile b/cmd/relui/Makefile index 6427cc5..82e1b56 100644 --- a/cmd/relui/Makefile +++ b/cmd/relui/Makefile
@@ -18,7 +18,7 @@ .PHONY: dev dev: postgres-dev docker - docker run --rm --name=relui-dev -v $(POSTGRES_RUN_DEV) -p 8080:8080 $(DOCKER_TAG) + docker run --rm --name=relui-dev -v $(POSTGRES_RUN_DEV) -e PGUSER=$(POSTGRES_USER) -e PGDATABASE=relui-dev -p 8080:8080 $(DOCKER_TAG) .PHONY: postgres-dev postgres-dev: $(DEV_CFG)/pgpass @@ -38,7 +38,7 @@ .PHONY: test test: postgres-dev docker-test - docker run --rm --name=relui-test -v $(POSTGRES_RUN_DEV) -e RELUI_TEST_DATABASE="host=/var/run/postgresql user=postgres database=relui-test" golang/relui-test:$(VERSION) + docker run --rm --name=relui-test -v $(POSTGRES_RUN_DEV) -e PGUSER=$(POSTGRES_USER) -e PGDATABASE=relui-test golang/relui-test:$(VERSION) .PHONY: docker docker:
diff --git a/cmd/relui/README.md b/cmd/relui/README.md index 0f5d73e..d40048a 100644 --- a/cmd/relui/README.md +++ b/cmd/relui/README.md
@@ -9,3 +9,44 @@ ``` relui is a web interface for managing the release process of Go. + +## Development + +Run the command with the appropriate +[libpq-style environment variables](https://www.postgresql.org/docs/current/libpq-envars.html) +set. + +```bash +PGHOST=localhost PGDATABASE=relui-dev PGUSER=postgres go run ./ +``` + +Alternatively, using docker: + +```bash +make dev +``` + +### Updating Queries + +Create or edit SQL files in `internal/relui/queries`. +After editing the query, run `sqlc generate` in this directory. The +`internal/relui/db` package contains the generated code. + +See [sqlc documentation](https://docs.sqlc.dev/en/stable/) for further +details. + +## Testing + +Run go test with the appropriate +[libpq-style environment variables](https://www.postgresql.org/docs/current/libpq-envars.html) +set. If the database connection fails, database integration tests will +be skipped. If PGDATABSE is unset, relui-test is used by default. + +```bash +PGHOST=localhost PGUSER=postgres go test -v ./... ../../internal/relui/... +``` + +Alternatively, using docker: +```bash +make test +```
diff --git a/cmd/relui/main.go b/cmd/relui/main.go index c347549..121932a 100644 --- a/cmd/relui/main.go +++ b/cmd/relui/main.go
@@ -14,11 +14,12 @@ "log" "os" + "github.com/jackc/pgx/v4/pgxpool" "golang.org/x/build/internal/relui" ) var ( - pgConnect = flag.String("pg-connect", "host=/var/run/postgresql user=postgres database=relui-dev", "Postgres connection string or URI") + pgConnect = flag.String("pg-connect", "", "Postgres connection string or URI. If empty, libpq connection defaults are used.") onlyMigrate = flag.Bool("only-migrate", false, "Exit after running migrations. Migrations are run by default.") ) @@ -31,12 +32,12 @@ if *onlyMigrate { return } - d := new(relui.PgStore) - if err := d.Connect(ctx, *pgConnect); err != nil { + db, err := pgxpool.Connect(ctx, *pgConnect) + if err != nil { log.Fatal(err) } - defer d.Close() - s, err := relui.NewServer(ctx, d) + defer db.Close() + s := relui.NewServer(db) if err != nil { log.Fatalf("relui.NewServer() = %v", err) }
diff --git a/cmd/relui/sqlc.yaml b/cmd/relui/sqlc.yaml new file mode 100644 index 0000000..6e54f54 --- /dev/null +++ b/cmd/relui/sqlc.yaml
@@ -0,0 +1,12 @@ +version: "1" +packages: + - name: "db" + path: "../../internal/relui/db" + queries: "../../internal/relui/queries" + schema: "schema.sql" + engine: "postgresql" + sql_package: "pgx/v4" +overrides: + - go_type: "database/sql.NullString" + db_type: "jsonb" + nullable: true
diff --git a/go.mod b/go.mod index a29f62c..cce0575 100644 --- a/go.mod +++ b/go.mod
@@ -24,6 +24,7 @@ github.com/googleapis/gax-go/v2 v2.0.5 github.com/googleapis/google-cloud-go-testing v0.0.0-20200911160855-bcd43fbb19e8 github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 + github.com/jackc/pgconn v1.10.0 github.com/jackc/pgx/v4 v4.13.0 github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1 github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07
diff --git a/internal/relui/db/db.go b/internal/relui/db/db.go new file mode 100644 index 0000000..ba9b109 --- /dev/null +++ b/internal/relui/db/db.go
@@ -0,0 +1,30 @@ +// Code generated by sqlc. DO NOT EDIT. + +package db + +import ( + "context" + + "github.com/jackc/pgconn" + "github.com/jackc/pgx/v4" +) + +type DBTX interface { + Exec(context.Context, string, ...interface{}) (pgconn.CommandTag, error) + Query(context.Context, string, ...interface{}) (pgx.Rows, error) + QueryRow(context.Context, string, ...interface{}) pgx.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx pgx.Tx) *Queries { + return &Queries{ + db: tx, + } +}
diff --git a/internal/relui/db/models.go b/internal/relui/db/models.go new file mode 100644 index 0000000..d5027b2 --- /dev/null +++ b/internal/relui/db/models.go
@@ -0,0 +1,33 @@ +// Code generated by sqlc. DO NOT EDIT. + +package db + +import ( + "database/sql" + "time" + + "github.com/google/uuid" +) + +type Migration struct { + Version int64 + Dirty bool +} + +type Task struct { + WorkflowID uuid.UUID + Name string + Finished bool + Result sql.NullString + Error sql.NullString + CreatedAt time.Time + UpdatedAt time.Time +} + +type Workflow struct { + ID uuid.UUID + Params sql.NullString + Name sql.NullString + CreatedAt time.Time + UpdatedAt time.Time +}
diff --git a/internal/relui/db/workflows.sql.go b/internal/relui/db/workflows.sql.go new file mode 100644 index 0000000..f665e34 --- /dev/null +++ b/internal/relui/db/workflows.sql.go
@@ -0,0 +1,249 @@ +// Code generated by sqlc. DO NOT EDIT. +// source: workflows.sql + +package db + +import ( + "context" + "database/sql" + "time" + + "github.com/google/uuid" +) + +const createTask = `-- name: CreateTask :one +INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at) +VALUES ($1, $2, $3, $4, $5, $6, $7) +RETURNING workflow_id, name, finished, result, error, created_at, updated_at +` + +type CreateTaskParams struct { + WorkflowID uuid.UUID + Name string + Finished bool + Result sql.NullString + Error sql.NullString + CreatedAt time.Time + UpdatedAt time.Time +} + +func (q *Queries) CreateTask(ctx context.Context, arg CreateTaskParams) (Task, error) { + row := q.db.QueryRow(ctx, createTask, + arg.WorkflowID, + arg.Name, + arg.Finished, + arg.Result, + arg.Error, + arg.CreatedAt, + arg.UpdatedAt, + ) + var i Task + err := row.Scan( + &i.WorkflowID, + &i.Name, + &i.Finished, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const createWorkflow = `-- name: CreateWorkflow :one +INSERT INTO workflows (id, params, name, created_at, updated_at) +VALUES ($1, $2, $3, $4, $5) +RETURNING id, params, name, created_at, updated_at +` + +type CreateWorkflowParams struct { + ID uuid.UUID + Params sql.NullString + Name sql.NullString + CreatedAt time.Time + UpdatedAt time.Time +} + +func (q *Queries) CreateWorkflow(ctx context.Context, arg CreateWorkflowParams) (Workflow, error) { + row := q.db.QueryRow(ctx, createWorkflow, + arg.ID, + arg.Params, + arg.Name, + arg.CreatedAt, + arg.UpdatedAt, + ) + var i Workflow + err := row.Scan( + &i.ID, + &i.Params, + &i.Name, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const tasks = `-- name: Tasks :many +SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at +FROM tasks +ORDER BY created_at +` + +func (q *Queries) Tasks(ctx context.Context) ([]Task, error) { + rows, err := q.db.Query(ctx, tasks) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Task + for rows.Next() { + var i Task + if err := rows.Scan( + &i.WorkflowID, + &i.Name, + &i.Finished, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const tasksForWorkflow = `-- name: TasksForWorkflow :many +SELECT tasks.workflow_id, tasks.name, tasks.finished, tasks.result, tasks.error, tasks.created_at, tasks.updated_at +FROM tasks +WHERE workflow_id=$1 +ORDER BY created_At +` + +func (q *Queries) TasksForWorkflow(ctx context.Context, workflowID uuid.UUID) ([]Task, error) { + rows, err := q.db.Query(ctx, tasksForWorkflow, workflowID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Task + for rows.Next() { + var i Task + if err := rows.Scan( + &i.WorkflowID, + &i.Name, + &i.Finished, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const upsertTask = `-- name: UpsertTask :one +INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at) +VALUES ($1, $2, $3, $4, $5, $6, $7) +ON CONFLICT (workflow_id, name) DO UPDATE + SET workflow_id = excluded.workflow_id, + name = excluded.name, + finished = excluded.finished, + result = excluded.result, + updated_at = excluded.updated_at +RETURNING workflow_id, name, finished, result, error, created_at, updated_at +` + +type UpsertTaskParams struct { + WorkflowID uuid.UUID + Name string + Finished bool + Result sql.NullString + Error sql.NullString + CreatedAt time.Time + UpdatedAt time.Time +} + +func (q *Queries) UpsertTask(ctx context.Context, arg UpsertTaskParams) (Task, error) { + row := q.db.QueryRow(ctx, upsertTask, + arg.WorkflowID, + arg.Name, + arg.Finished, + arg.Result, + arg.Error, + arg.CreatedAt, + arg.UpdatedAt, + ) + var i Task + err := row.Scan( + &i.WorkflowID, + &i.Name, + &i.Finished, + &i.Result, + &i.Error, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const workflow = `-- name: Workflow :one +SELECT id, params, name, created_at, updated_at +FROM workflows +WHERE id = $1 +` + +func (q *Queries) Workflow(ctx context.Context, id uuid.UUID) (Workflow, error) { + row := q.db.QueryRow(ctx, workflow, id) + var i Workflow + err := row.Scan( + &i.ID, + &i.Params, + &i.Name, + &i.CreatedAt, + &i.UpdatedAt, + ) + return i, err +} + +const workflows = `-- name: Workflows :many +SELECT id, params, name, created_at, updated_at +FROM workflows +ORDER BY created_at DESC +` + +func (q *Queries) Workflows(ctx context.Context) ([]Workflow, error) { + rows, err := q.db.Query(ctx, workflows) + if err != nil { + return nil, err + } + defer rows.Close() + var items []Workflow + for rows.Next() { + var i Workflow + if err := rows.Scan( + &i.ID, + &i.Params, + &i.Name, + &i.CreatedAt, + &i.UpdatedAt, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +}
diff --git a/internal/relui/queries/workflows.sql b/internal/relui/queries/workflows.sql new file mode 100644 index 0000000..fe58679 --- /dev/null +++ b/internal/relui/queries/workflows.sql
@@ -0,0 +1,45 @@ +-- Copyright 2021 The Go Authors. All rights reserved. +-- Use of this source code is governed by a BSD-style +-- license that can be found in the LICENSE file. + +-- name: Workflows :many +SELECT id, params, name, created_at, updated_at +FROM workflows +ORDER BY created_at DESC; + +-- name: Workflow :one +SELECT id, params, name, created_at, updated_at +FROM workflows +WHERE id = $1; + +-- name: CreateWorkflow :one +INSERT INTO workflows (id, params, name, created_at, updated_at) +VALUES ($1, $2, $3, $4, $5) +RETURNING *; + +-- name: CreateTask :one +INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at) +VALUES ($1, $2, $3, $4, $5, $6, $7) +RETURNING *; + +-- name: UpsertTask :one +INSERT INTO tasks (workflow_id, name, finished, result, error, created_at, updated_at) +VALUES ($1, $2, $3, $4, $5, $6, $7) +ON CONFLICT (workflow_id, name) DO UPDATE + SET workflow_id = excluded.workflow_id, + name = excluded.name, + finished = excluded.finished, + result = excluded.result, + updated_at = excluded.updated_at +RETURNING *; + +-- name: Tasks :many +SELECT tasks.* +FROM tasks +ORDER BY created_at; + +-- name: TasksForWorkflow :many +SELECT tasks.* +FROM tasks +WHERE workflow_id=$1 +ORDER BY created_At;
diff --git a/internal/relui/store.go b/internal/relui/store.go index 3df8061..7f6d383 100644 --- a/internal/relui/store.go +++ b/internal/relui/store.go
@@ -17,40 +17,10 @@ dbpgx "github.com/golang-migrate/migrate/v4/database/pgx" "github.com/golang-migrate/migrate/v4/source/iofs" "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/pgxpool" ) var errDBNotExist = errors.New("database does not exist") -// store is a persistence interface for saving data. -type store interface { -} - -var _ store = (*PgStore)(nil) - -// PgStore is a store backed by a Postgres database. -type PgStore struct { - db *pgxpool.Pool -} - -// Connect connects to the database using the credentials supplied in -// the provided connString. -// -// Any key/value or URI string compatible with libpq is valid. -func (p *PgStore) Connect(ctx context.Context, connString string) error { - pool, err := pgxpool.Connect(ctx, connString) - if err != nil { - return err - } - p.db = pool - return nil -} - -// Close closes the pgxpool.Pool. -func (p *PgStore) Close() { - p.db.Close() -} - // InitDB creates and applies all migrations to the database specified // in conn. //
diff --git a/internal/relui/store_test.go b/internal/relui/store_test.go index a26c831..05d3407 100644 --- a/internal/relui/store_test.go +++ b/internal/relui/store_test.go
@@ -7,29 +7,15 @@ import ( "context" "errors" - "os" "testing" - - "github.com/jackc/pgx/v4" ) -const dbEnvKey = "RELUI_TEST_DATABASE" - func TestCreateDBIfNotExists(t *testing.T) { - dbEnv := os.Getenv(dbEnvKey) - if dbEnv == "" { - t.Skipf("%q is not set. Skipping.", dbEnvKey) - } - if testing.Short() { - t.Skip("Skipping database tests in short mode.") - } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + db := testDB(ctx, t) - ctx := context.Background() - cfg, err := pgx.ParseConfig(dbEnv) - if err != nil { - t.Fatalf("pgx.ParseConfig(os.Getenv(%q)) = %v, wanted no error", dbEnvKey, err) - } - testCfg := cfg.Copy() + testCfg := db.Config().ConnConfig.Copy() testCfg.Database = "relui-test-nonexistent" if err := DropDB(ctx, testCfg); err != nil && !errors.Is(err, errDBNotExist) { t.Fatalf("p.DropDB() = %v, wanted %q or nil", err, errDBNotExist)
diff --git a/internal/relui/templates/home.html b/internal/relui/templates/home.html index 9240f95..3d87afd 100644 --- a/internal/relui/templates/home.html +++ b/internal/relui/templates/home.html
@@ -12,14 +12,16 @@ <ul class="WorkflowList"> {{range $workflow := .Workflows}} <li class="WorkflowList-item"> - <h3>{{$workflow.Name}} - {{index $workflow.GitSource}}</h3> + <h3>{{$workflow.Name.String}} - {{index $workflow.ID}}</h3> <h4 class="WorkflowList-sectionTitle">Tasks</h4> <ul class="TaskList"> - {{range $task := $workflow.BuildableTasks}} + {{$tasks := index $.WorkflowTasks $workflow.ID}} + {{range $task := $tasks}} <li class="TaskList-item"> <span class="TaskList-itemTitle">{{$task.Name}}</span> - Status: {{$task.Status}} - ID: {{$task.Id}} + Finished: {{$task.Finished}} + Result: {{$task.Result.String}} + Name: {{$task.Name}} </li> {{end}} <li class="TaskList-item">
diff --git a/internal/relui/templates/new_workflow.html b/internal/relui/templates/new_workflow.html index 227e493..a3e71bd 100644 --- a/internal/relui/templates/new_workflow.html +++ b/internal/relui/templates/new_workflow.html
@@ -9,7 +9,7 @@ <form action="/workflows/create" method="post"> <label> Revision - <input name="workflow.revision" value="master" /> + <input name="workflow.params.revision" value="main" /> </label> <input name="workflow.create" type="submit" value="Create" /> </form>
diff --git a/internal/relui/web.go b/internal/relui/web.go index 368fc71..42b0dcd 100644 --- a/internal/relui/web.go +++ b/internal/relui/web.go
@@ -10,6 +10,9 @@ import ( "bytes" "context" + "database/sql" + "encoding/json" + "fmt" "html/template" "io" "io/fs" @@ -17,6 +20,13 @@ "mime" "net/http" "path" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "golang.org/x/build/internal/relui/db" + "golang.org/x/build/internal/workflow" ) // fileServerHandler returns a http.Handler rooted at root. It will @@ -45,33 +55,53 @@ // Server implements the http handlers for relui. type Server struct { - // store is for persisting application state. - store store + db *pgxpool.Pool + m *http.ServeMux } -// NewServer initializes a server with a database connection specified -// by pgConn. Any libpq compatible connection strings or URIs are -// supported. -func NewServer(ctx context.Context, st store) (*Server, error) { - s := &Server{store: st} - http.Handle("/workflows/create", http.HandlerFunc(s.createWorkflowHandler)) - http.Handle("/workflows/new", http.HandlerFunc(s.newWorkflowHandler)) - http.Handle("/", fileServerHandler(static, http.HandlerFunc(s.homeHandler))) - return s, nil +// NewServer initializes a server with the provided connection pool. +func NewServer(p *pgxpool.Pool) *Server { + s := &Server{db: p, m: &http.ServeMux{}} + s.m.Handle("/workflows/create", http.HandlerFunc(s.createWorkflowHandler)) + s.m.Handle("/workflows/new", http.HandlerFunc(s.newWorkflowHandler)) + s.m.Handle("/", fileServerHandler(static, http.HandlerFunc(s.homeHandler))) + return s +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) { + s.m.ServeHTTP(w, r) } func (s *Server) Serve(port string) error { - return http.ListenAndServe(":"+port, http.DefaultServeMux) + return http.ListenAndServe(":"+port, s.m) } type homeResponse struct { - Workflows []interface{} + Workflows []db.Workflow + WorkflowTasks map[uuid.UUID][]db.Task } // homeHandler renders the homepage. -func (s *Server) homeHandler(w http.ResponseWriter, _ *http.Request) { +func (s *Server) homeHandler(w http.ResponseWriter, r *http.Request) { + q := db.New(s.db) + ws, err := q.Workflows(r.Context()) + if err != nil { + log.Printf("homeHandler: %v", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + tasks, err := q.Tasks(r.Context()) + if err != nil { + log.Printf("homeHandler: %v", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + wfTasks := make(map[uuid.UUID][]db.Task, len(ws)) + for _, t := range tasks { + wfTasks[t.WorkflowID] = append(wfTasks[t.WorkflowID], t) + } out := bytes.Buffer{} - if err := homeTmpl.Execute(&out, homeResponse{}); err != nil { + if err := homeTmpl.Execute(&out, homeResponse{Workflows: ws, WorkflowTasks: wfTasks}); err != nil { log.Printf("homeHandler: %v", err) http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) return @@ -90,7 +120,116 @@ io.Copy(w, &out) } -// createWorkflowHandler persists a new workflow in the datastore. -func (s *Server) createWorkflowHandler(w http.ResponseWriter, _ *http.Request) { - http.Error(w, "Unable to create workflow: no workflows configured", http.StatusInternalServerError) +// createWorkflowHandler persists a new workflow in the datastore, and +// starts the workflow in a goroutine. +func (s *Server) createWorkflowHandler(w http.ResponseWriter, r *http.Request) { + if err := r.ParseForm(); err != nil { + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + ref := r.Form.Get("workflow.params.revision") + if ref == "" { + http.Error(w, "workflow revision is required", http.StatusBadRequest) + return + } + params := map[string]string{"greeting": ref} + wf, err := workflow.Start(newEchoWorkflow(ref), params) + if err != nil { + log.Printf("createWorkflowHandler: %v", err) + http.Error(w, http.StatusText(http.StatusInternalServerError), http.StatusInternalServerError) + return + } + err = s.db.BeginFunc(r.Context(), func(tx pgx.Tx) error { + q := db.New(tx) + m, err := json.Marshal(params) + if err != nil { + return err + } + updated := time.Now() + _, err = q.CreateWorkflow(r.Context(), db.CreateWorkflowParams{ + ID: wf.ID, + Name: sql.NullString{String: "Echo", Valid: true}, + Params: sql.NullString{String: string(m), Valid: len(m) > 0}, + CreatedAt: updated, + UpdatedAt: updated, + }) + if err != nil { + return err + } + return nil + }) + if err != nil { + log.Printf("createWorkflowHandler: %v", err) + http.Error(w, "Error creating workflow", http.StatusInternalServerError) + } + go func(wf *workflow.Workflow, db *pgxpool.Pool) { + result, err := wf.Run(context.TODO(), &listener{db}) + log.Printf("wf.Run() = %v, %v", result, err) + }(wf, s.db) + http.Redirect(w, r, "/", http.StatusSeeOther) +} + +// listener implements workflow.Listener for recording workflow state. +type listener struct { + db *pgxpool.Pool +} + +// TaskStateChanged is called whenever a task is updated by the +// workflow. The workflow.TaskState is persisted as a db.Task, +// creating or updating a row as necessary. +func (l *listener) TaskStateChanged(workflowID uuid.UUID, taskID string, state *workflow.TaskState) error { + log.Printf("TaskStateChanged(%q, %q, %v)", workflowID, taskID, state) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + result, err := json.Marshal(state.Result) + if err != nil { + return err + } + err = l.db.BeginFunc(ctx, func(tx pgx.Tx) error { + q := db.New(tx) + updated := time.Now() + _, err := q.UpsertTask(ctx, db.UpsertTaskParams{ + WorkflowID: workflowID, + Name: taskID, + Finished: state.Finished, + Result: sql.NullString{String: string(result), Valid: len(result) > 0}, + Error: sql.NullString{}, + CreatedAt: updated, + UpdatedAt: updated, + }) + if err != nil { + return err + } + return nil + }) + if err != nil { + log.Printf("TaskStateChanged(%q, %q, %v) = %v", workflowID, taskID, state, err) + } + return err +} + +func (l *listener) Logger(workflowID uuid.UUID, taskID string) workflow.Logger { + return &stdoutLogger{WorkflowID: workflowID, TaskID: taskID} +} + +type stdoutLogger struct { + WorkflowID uuid.UUID + TaskID string +} + +func (l *stdoutLogger) Printf(format string, v ...interface{}) { + log.Printf("%q(%q): %v", l.WorkflowID, l.TaskID, fmt.Sprintf(format, v...)) +} + +// newEchoWorkflow returns a runnable workflow.Definition for +// development. +func newEchoWorkflow(greeting string) *workflow.Definition { + wd := workflow.New() + gt := wd.Task("echo", echo, wd.Constant(greeting)) + wd.Output("greeting", gt) + return wd +} + +func echo(_ context.Context, arg string) (string, error) { + return arg, nil }
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go index bfed150..af6a7c7 100644 --- a/internal/relui/web_test.go +++ b/internal/relui/web_test.go
@@ -8,11 +8,28 @@ package relui import ( + "context" + "database/sql" "embed" + "fmt" "io/ioutil" + "log" "net/http" "net/http/httptest" + "net/url" + "os" + "strings" + "sync" "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/pgxpool" + "golang.org/x/build/internal" + "golang.org/x/build/internal/relui/db" ) // testStatic is our static web server content. @@ -86,10 +103,24 @@ } func TestServerHomeHandler(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + p := testDB(ctx, t) + + q := db.New(p) + wf := db.CreateWorkflowParams{ID: uuid.New()} + if _, err := q.CreateWorkflow(ctx, wf); err != nil { + t.Fatalf("CreateWorkflow(_, %v) = _, %v, wanted no error", wf, err) + } + tp := db.CreateTaskParams{WorkflowID: wf.ID, Name: "TestTask"} + if _, err := q.CreateTask(ctx, tp); err != nil { + t.Fatalf("CreateTask(_, %v) = _, %v, wanted no error", tp, err) + } + req := httptest.NewRequest(http.MethodGet, "/", nil) w := httptest.NewRecorder() - s := &Server{} + s := NewServer(p) s.homeHandler(w, req) resp := w.Result() @@ -110,3 +141,222 @@ t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, http.StatusOK) } } + +func TestServerCreateWorkflowHandler(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cases := []struct { + desc string + params url.Values + wantCode int + wantHeaders map[string]string + wantWorkflows []db.Workflow + wantTasks []db.Task + }{ + { + desc: "bad request", + wantCode: http.StatusBadRequest, + }, + { + desc: "successful creation", + params: url.Values{"workflow.params.revision": []string{"abc"}}, + wantCode: http.StatusSeeOther, + wantHeaders: map[string]string{ + "Location": "/", + }, + wantWorkflows: []db.Workflow{ + { + ID: uuid.New(), // SameUUIDVariant + Params: nullString(`{"greeting": "abc"}`), + Name: nullString(`Echo`), + CreatedAt: time.Now(), // cmpopts.EquateApproxTime + UpdatedAt: time.Now(), // cmpopts.EquateApproxTime + }, + }, + wantTasks: []db.Task{ + { + Name: "echo", + Finished: false, + Error: sql.NullString{}, + CreatedAt: time.Now(), // cmpopts.EquateApproxTime + UpdatedAt: time.Now(), // cmpopts.EquateApproxTime + }, + }, + }, + } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + p := testDB(ctx, t) + req := httptest.NewRequest(http.MethodPost, "/workflows/create", strings.NewReader(c.params.Encode())) + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") + w := httptest.NewRecorder() + q := db.New(p) + + s := NewServer(p) + s.createWorkflowHandler(w, req) + resp := w.Result() + + if resp.StatusCode != c.wantCode { + t.Errorf("rep.StatusCode = %d, wanted %d", resp.StatusCode, c.wantCode) + } + for k, v := range c.wantHeaders { + if resp.Header.Get(k) != v { + t.Errorf("resp.Header.Get(%q) = %q, wanted %q", k, resp.Header.Get(k), v) + } + } + if c.wantCode == http.StatusBadRequest { + return + } + wfs, err := q.Workflows(ctx) + if err != nil { + t.Fatalf("q.Workflows() = %v, %v, wanted no error", wfs, err) + } + if diff := cmp.Diff(c.wantWorkflows, wfs, SameUUIDVariant(), cmpopts.EquateApproxTime(time.Minute)); diff != "" { + t.Fatalf("q.Workflows() mismatch (-want +got):\n%s", diff) + } + var tasks []db.Task + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + internal.PeriodicallyDo(ctx, 10*time.Millisecond, func(ctx context.Context, _ time.Time) { + tasks, err = q.TasksForWorkflow(ctx, wfs[0].ID) + if err != nil { + t.Fatalf("q.TasksForWorkflow(_, %q) = %v, %v, wanted no error", wfs[0].ID, tasks, err) + } + if len(tasks) > 0 { + cancel() + } + }) + if len(c.wantTasks) > 0 { + c.wantTasks[0].WorkflowID = wfs[0].ID + } + if diff := cmp.Diff(c.wantTasks, tasks, cmpopts.EquateApproxTime(time.Minute), cmpopts.IgnoreFields(db.Task{}, "Finished", "Result")); diff != "" { + t.Errorf("q.TasksForWorkflow(_, %q) mismatch (-want +got):\n%s", wfs[0].ID, diff) + } + }) + } +} + +// resetDB truncates the db connected to in the pgxpool.Pool +// connection. +// +// All tables in the public schema of the connected database will be +// truncated, with the exception of the migrations table. +func resetDB(ctx context.Context, t *testing.T, p *pgxpool.Pool) { + t.Helper() + tableQuery := `SELECT table_name FROM information_schema.tables WHERE table_schema='public'` + rows, err := p.Query(ctx, tableQuery) + if err != nil { + t.Fatalf("p.Query(_, %q, %q) = %v, %v, wanted no error", tableQuery, "public", rows, err) + } + defer rows.Close() + for rows.Next() { + var name string + if err := rows.Scan(&name); err != nil { + t.Fatalf("rows.Scan() = %v, wanted no error", err) + } + if name == "migrations" { + continue + } + truncQ := fmt.Sprintf("TRUNCATE %s CASCADE", pgx.Identifier{name}.Sanitize()) + c, err := p.Exec(ctx, truncQ) + if err != nil { + t.Fatalf("p.Exec(_, %q) = %v, %v", truncQ, c, err) + } + } + if err := rows.Err(); err != nil { + log.Fatalf("rows.Err() = %v, wanted no error", err) + } +} + +var testPoolOnce sync.Once +var testPool *pgxpool.Pool + +// testDB connects, creates, and migrates a database in preparation +// for testing, and returns a connection pool to the prepared +// database. +// +// The connection pool is closed as part of a t.Cleanup handler. +// Database connections are expected to be configured through libpq +// compatible environment variables. If no PGDATABASE is specified, +// relui-test will be used. +// +// https://www.postgresql.org/docs/current/libpq-envars.html +func testDB(ctx context.Context, t *testing.T) *pgxpool.Pool { + t.Helper() + if testing.Short() { + t.Skip("Skipping database tests in short mode.") + } + testPoolOnce.Do(func() { + pgdb := url.QueryEscape(os.Getenv("PGDATABASE")) + if pgdb == "" { + pgdb = "relui-test" + } + if err := InitDB(ctx, fmt.Sprintf("database=%v", pgdb)); err != nil { + t.Skipf("Skipping database integration test: %v", err) + } + p, err := pgxpool.Connect(ctx, fmt.Sprintf("database=%v", pgdb)) + if err != nil { + t.Skipf("Skipping database integration test: %v", err) + } + testPool = p + }) + if testPool == nil { + t.Skip("Skipping database integration test: testdb = nil. See first error for details.") + return nil + } + t.Cleanup(func() { + resetDB(context.Background(), t, testPool) + }) + return testPool +} + +// SameUUIDVariant considers UUIDs equal if they are both the same +// uuid.Variant. Zero-value uuids are considered equal. +func SameUUIDVariant() cmp.Option { + return cmp.Transformer("SameVariant", func(v uuid.UUID) uuid.Variant { + return v.Variant() + }) +} + +func TestSameUUIDVariant(t *testing.T) { + cases := []struct { + desc string + x uuid.UUID + y uuid.UUID + want bool + }{ + { + desc: "both set", + x: uuid.New(), + y: uuid.New(), + want: true, + }, + { + desc: "both unset", + want: true, + }, + { + desc: "just x", + x: uuid.New(), + want: false, + }, + { + desc: "just y", + y: uuid.New(), + want: false, + }, + } + for _, c := range cases { + t.Run(c.desc, func(t *testing.T) { + if got := cmp.Equal(c.x, c.y, SameUUIDVariant()); got != c.want { + t.Fatalf("cmp.Equal(%v, %v, SameUUIDVariant()) = %t, wanted %t", c.x, c.y, got, c.want) + } + }) + } +} + +// nullString returns a sql.NullString for a string. +func nullString(val string) sql.NullString { + return sql.NullString{String: val, Valid: true} +}
diff --git a/internal/workflow/workflow.go b/internal/workflow/workflow.go index 2720a37..016b14e 100644 --- a/internal/workflow/workflow.go +++ b/internal/workflow/workflow.go
@@ -195,9 +195,9 @@ type Listener interface { // TaskStateChanged is called when the state of a task changes. // state is safe to store or modify. - TaskStateChanged(workflowID, taskID string, state *TaskState) error + TaskStateChanged(workflowID uuid.UUID, taskID string, state *TaskState) error // Logger is called to obtain a Logger for a particular task. - Logger(workflowID, taskID string) Logger + Logger(workflowID uuid.UUID, taskID string) Logger } // TaskState contains the state of a task in a running workflow. Once Finished @@ -212,7 +212,7 @@ // WorkflowState contains the shallow state of a running workflow. type WorkflowState struct { - ID string + ID uuid.UUID Params map[string]string } @@ -245,7 +245,7 @@ // A Workflow is an instantiated workflow instance, ready to run. type Workflow struct { - ID string + ID uuid.UUID def *Definition params map[string]string @@ -291,7 +291,7 @@ // Start instantiates a workflow with the given parameters. func Start(def *Definition, params map[string]string) (*Workflow, error) { w := &Workflow{ - ID: uuid.New().String(), + ID: uuid.New(), def: def, params: params, tasks: map[*taskDefinition]*taskState{}, @@ -438,11 +438,11 @@ type defaultListener struct{} -func (s *defaultListener) TaskStateChanged(_, _ string, _ *TaskState) error { +func (s *defaultListener) TaskStateChanged(_ uuid.UUID, _ string, _ *TaskState) error { return nil } -func (s *defaultListener) Logger(_, task string) Logger { +func (s *defaultListener) Logger(_ uuid.UUID, task string) Logger { return &defaultLogger{} }
diff --git a/internal/workflow/workflow_test.go b/internal/workflow/workflow_test.go index bacf4c1..4044ddf 100644 --- a/internal/workflow/workflow_test.go +++ b/internal/workflow/workflow_test.go
@@ -16,6 +16,7 @@ "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" "golang.org/x/build/internal/workflow" ) @@ -141,7 +142,7 @@ logger workflow.Logger } -func (l *logTestListener) Logger(_, _ string) workflow.Logger { +func (l *logTestListener) Logger(_ uuid.UUID, _ string) workflow.Logger { return l.logger } @@ -219,12 +220,12 @@ type mapListener struct { workflow.Listener - states map[string]map[string]*workflow.TaskState + states map[uuid.UUID]map[string]*workflow.TaskState } -func (l *mapListener) TaskStateChanged(workflowID, taskID string, state *workflow.TaskState) error { +func (l *mapListener) TaskStateChanged(workflowID uuid.UUID, taskID string, state *workflow.TaskState) error { if l.states == nil { - l.states = map[string]map[string]*workflow.TaskState{} + l.states = map[uuid.UUID]map[string]*workflow.TaskState{} } if l.states[workflowID] == nil { l.states[workflowID] = map[string]*workflow.TaskState{} @@ -262,7 +263,7 @@ type verboseListener struct{ t *testing.T } -func (l *verboseListener) TaskStateChanged(_, _ string, st *workflow.TaskState) error { +func (l *verboseListener) TaskStateChanged(_ uuid.UUID, _ string, st *workflow.TaskState) error { switch { case !st.Finished: l.t.Logf("task %-10v: started", st.Name) @@ -274,7 +275,7 @@ return nil } -func (l *verboseListener) Logger(_, task string) workflow.Logger { +func (l *verboseListener) Logger(_ uuid.UUID, task string) workflow.Logger { return &testLogger{t: l.t, task: task} }