internal/relui: add DefinitionHolder, remove package-scoped definitions
Up to this point, workflows have been simple and did not have external
configuration, so it was possible to define all of them them at package
initialization time in a package-scoped map. This doesn't scale well to
upcoming workflows that involve external service configuration being
provided to relui.
Make the trivial refactor to wrap the definitions map in a holder type
(similar to http.ServeMux) to get rid of the package-scoped variable,
without trying to make more invasive changes at this point.
A Worker now accepts a DefinitionHolder during creation, and uses it.
A Server already has a Worker, and uses its DefinitionHolder as needed.
Maybe in the future it'll make sense to merge the functionality of
DefinitionHolder into Worker, and it can stop being its own type.
Change-Id: Ia1539e0621db83c8fc5959b39445090d1f0034e1
Reviewed-on: https://go-review.googlesource.com/c/build/+/385614
Trust: Dmitri Shuralyov <dmitshur@golang.org>
Run-TryBot: Dmitri Shuralyov <dmitshur@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Alex Rakoczy <alex@golang.org>
diff --git a/cmd/relui/main.go b/cmd/relui/main.go
index cf516de..87d89db 100644
--- a/cmd/relui/main.go
+++ b/cmd/relui/main.go
@@ -39,12 +39,13 @@
}
return
}
+ dh := relui.NewDefinitionHolder()
db, err := pgxpool.Connect(ctx, *pgConnect)
if err != nil {
log.Fatal(err)
}
defer db.Close()
- w := relui.NewWorker(db, relui.NewPGListener(db))
+ w := relui.NewWorker(dh, db, relui.NewPGListener(db))
go w.Run(ctx)
if err := w.ResumeAll(ctx); err != nil {
log.Printf("w.ResumeAll() = %v", err)
diff --git a/internal/relui/web.go b/internal/relui/web.go
index d481f91..32aeada 100644
--- a/internal/relui/web.go
+++ b/internal/relui/web.go
@@ -186,7 +186,7 @@
func (s *Server) newWorkflowHandler(w http.ResponseWriter, r *http.Request) {
out := bytes.Buffer{}
resp := &newWorkflowResponse{
- Definitions: Definitions(),
+ Definitions: s.w.dh.Definitions(),
Name: r.FormValue("workflow.name"),
}
if err := s.newWorkflowTmpl.Execute(&out, resp); err != nil {
@@ -201,7 +201,7 @@
// starts the workflow in a goroutine.
func (s *Server) createWorkflowHandler(w http.ResponseWriter, r *http.Request) {
name := r.FormValue("workflow.name")
- d := Definition(name)
+ d := s.w.dh.Definition(name)
if d == nil {
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
diff --git a/internal/relui/web_test.go b/internal/relui/web_test.go
index 52dfe92..9dda486 100644
--- a/internal/relui/web_test.go
+++ b/internal/relui/web_test.go
@@ -116,7 +116,7 @@
req := httptest.NewRequest(http.MethodGet, "/", nil)
w := httptest.NewRecorder()
- s := NewServer(p, NewWorker(p, &PGListener{p}), nil)
+ s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil)
s.homeHandler(w, req)
resp := w.Result()
@@ -152,7 +152,7 @@
req := httptest.NewRequest(http.MethodGet, u.String(), nil)
w := httptest.NewRecorder()
- s := NewServer(nil, nil, nil)
+ s := NewServer(nil, NewWorker(NewDefinitionHolder(), nil, nil), nil)
s.newWorkflowHandler(w, req)
resp := w.Result()
@@ -219,7 +219,7 @@
rec := httptest.NewRecorder()
q := db.New(p)
- s := NewServer(p, NewWorker(p, &PGListener{p}), nil)
+ s := NewServer(p, NewWorker(NewDefinitionHolder(), p, &PGListener{p}), nil)
s.createWorkflowHandler(rec, req)
resp := rec.Result()
diff --git a/internal/relui/worker.go b/internal/relui/worker.go
index 45d1cc2..95a88ac 100644
--- a/internal/relui/worker.go
+++ b/internal/relui/worker.go
@@ -28,6 +28,8 @@
// Worker runs workflows, and persists their state.
type Worker struct {
+ dh *DefinitionHolder
+
db *pgxpool.Pool
l Listener
@@ -36,8 +38,9 @@
}
// NewWorker returns a Worker ready to accept and run workflows.
-func NewWorker(db *pgxpool.Pool, l Listener) *Worker {
+func NewWorker(dh *DefinitionHolder, db *pgxpool.Pool, l Listener) *Worker {
return &Worker{
+ dh: dh,
db: db,
l: l,
done: make(chan struct{}),
@@ -130,7 +133,7 @@
if err != nil {
return err
}
- d := Definition(wf.Name.String)
+ d := w.dh.Definition(wf.Name.String)
if d == nil {
return fmt.Errorf("no workflow named %q", wf.Name.String)
}
diff --git a/internal/relui/worker_test.go b/internal/relui/worker_test.go
index 5031f5a..b5094cd 100644
--- a/internal/relui/worker_test.go
+++ b/internal/relui/worker_test.go
@@ -26,13 +26,14 @@
dbp := testDB(ctx, t)
q := db.New(dbp)
wg := sync.WaitGroup{}
- w := NewWorker(dbp, &testWorkflowListener{
+ dh := NewDefinitionHolder()
+ w := NewWorker(dh, dbp, &testWorkflowListener{
Listener: &PGListener{dbp},
onFinished: wg.Done,
})
wd := newTestEchoWorkflow()
- RegisterDefinition(t.Name(), wd)
+ dh.RegisterDefinition(t.Name(), wd)
params := map[string]string{"echo": "greetings"}
wg.Add(1)
@@ -85,13 +86,14 @@
dbp := testDB(ctx, t)
q := db.New(dbp)
wg := sync.WaitGroup{}
- w := NewWorker(dbp, &testWorkflowListener{
+ dh := NewDefinitionHolder()
+ w := NewWorker(dh, dbp, &testWorkflowListener{
Listener: &PGListener{dbp},
onFinished: wg.Done,
})
wd := newTestEchoWorkflow()
- RegisterDefinition(t.Name(), wd)
+ dh.RegisterDefinition(t.Name(), wd)
wfid := createUnfinishedEchoWorkflow(t, ctx, q)
wg.Add(1)
@@ -124,7 +126,7 @@
defer cancel()
dbp := testDB(ctx, t)
q := db.New(dbp)
- w := NewWorker(dbp, &PGListener{dbp})
+ w := NewWorker(NewDefinitionHolder(), dbp, &PGListener{dbp})
cwp := db.CreateWorkflowParams{ID: uuid.New(), Name: nullString(t.Name()), Params: nullString("{}")}
if wf, err := q.CreateWorkflow(ctx, cwp); err != nil {
@@ -142,13 +144,14 @@
dbp := testDB(ctx, t)
q := db.New(dbp)
wg := sync.WaitGroup{}
- w := NewWorker(dbp, &testWorkflowListener{
+ dh := NewDefinitionHolder()
+ w := NewWorker(dh, dbp, &testWorkflowListener{
Listener: &PGListener{dbp},
onFinished: wg.Done,
})
wd := newTestEchoWorkflow()
- RegisterDefinition(t.Name(), wd)
+ dh.RegisterDefinition(t.Name(), wd)
wfid1 := createUnfinishedEchoWorkflow(t, ctx, q)
wfid2 := createUnfinishedEchoWorkflow(t, ctx, q)
@@ -193,10 +196,11 @@
defer cancel()
dbp := testDB(ctx, t)
q := db.New(dbp)
- w := NewWorker(dbp, &unimplementedListener{})
+ dh := NewDefinitionHolder()
+ w := NewWorker(dh, dbp, &unimplementedListener{})
wd := newTestEchoWorkflow()
- RegisterDefinition(t.Name(), wd)
+ dh.RegisterDefinition(t.Name(), wd)
wfid := createUnfinishedEchoWorkflow(t, ctx, q)
if err := w.Resume(ctx, wfid); err != nil {
diff --git a/internal/relui/workflows.go b/internal/relui/workflows.go
index c1a6c76..afc47d3 100644
--- a/internal/relui/workflows.go
+++ b/internal/relui/workflows.go
@@ -10,32 +10,45 @@
"golang.org/x/build/internal/workflow"
)
-var dmut sync.Mutex
-var definitions = map[string]*workflow.Definition{
- "echo": newEchoWorkflow(),
+// DefinitionHolder holds workflow definitions.
+type DefinitionHolder struct {
+ mu sync.Mutex
+ definitions map[string]*workflow.Definition
+}
+
+// NewDefinitionHolder creates a new DefinitionHolder,
+// initialized with a sample "echo" workflow.
+func NewDefinitionHolder() *DefinitionHolder {
+ return &DefinitionHolder{definitions: map[string]*workflow.Definition{
+ "echo": newEchoWorkflow(),
+ }}
}
// Definition returns the initialized workflow.Definition registered
// for a given name.
-func Definition(name string) *workflow.Definition {
- dmut.Lock()
- defer dmut.Unlock()
- return definitions[name]
+func (h *DefinitionHolder) Definition(name string) *workflow.Definition {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+ return h.definitions[name]
}
// RegisterDefinition registers a definition with a name.
-func RegisterDefinition(name string, d *workflow.Definition) {
- dmut.Lock()
- defer dmut.Unlock()
- definitions[name] = d
+// If a definition with the same name already exists, RegisterDefinition panics.
+func (h *DefinitionHolder) RegisterDefinition(name string, d *workflow.Definition) {
+ h.mu.Lock()
+ defer h.mu.Unlock()
+ if _, exist := h.definitions[name]; exist {
+ panic("relui: multiple registrations for " + name)
+ }
+ h.definitions[name] = d
}
// Definitions returns the names of all registered definitions.
-func Definitions() map[string]*workflow.Definition {
- dmut.Lock()
- defer dmut.Unlock()
+func (h *DefinitionHolder) Definitions() map[string]*workflow.Definition {
+ h.mu.Lock()
+ defer h.mu.Unlock()
defs := make(map[string]*workflow.Definition)
- for k, v := range definitions {
+ for k, v := range h.definitions {
defs[k] = v
}
return defs