internal/relui: lock concurrent runs of a workflow
This change adds a map containing a list of running workflow ids along
with a lock. Attempting to re-start a running workflow will log an error.
Updates golang/go#53165
Change-Id: I8eb144a2920ec8da17edbc5bbd722279721420f6
Reviewed-on: https://go-review.googlesource.com/c/build/+/411075
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Run-TryBot: Alex Rakoczy <alex@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/relui/worker.go b/internal/relui/worker.go
index cc5424d..100745a 100644
--- a/internal/relui/worker.go
+++ b/internal/relui/worker.go
@@ -10,6 +10,7 @@
"errors"
"fmt"
"log"
+ "sync"
"github.com/google/uuid"
"github.com/jackc/pgx/v4"
@@ -35,6 +36,12 @@
done chan struct{}
pending chan *workflow.Workflow
+
+ mu sync.Mutex
+ // running is a set of currently running Workflow ids. Run uses
+ // this set to prevent starting a simultaneous execution of a
+ // currently running Workflow.
+ running map[string]struct{}
}
// NewWorker returns a Worker ready to accept and run workflows.
@@ -45,6 +52,7 @@
l: l,
done: make(chan struct{}),
pending: make(chan *workflow.Workflow, 1),
+ running: make(map[string]struct{}),
}
}
@@ -64,6 +72,12 @@
return ctx.Err()
case wf := <-w.pending:
eg.Go(func() error {
+ if err := w.markRunning(wf); err != nil {
+ log.Println(err)
+ return nil
+ }
+ defer w.markStopped(wf)
+
outputs, err := wf.Run(ctx, w.l)
if wfErr := w.l.WorkflowFinished(ctx, wf.ID, outputs, err); wfErr != nil {
return fmt.Errorf("w.l.WorkflowFinished(_, %q, %v, %q) = %w", wf.ID, outputs, err, wfErr)
@@ -74,6 +88,22 @@
}
}
+func (w *Worker) markRunning(wf *workflow.Workflow) error {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ if _, ok := w.running[wf.ID.String()]; ok {
+ return fmt.Errorf("workflow %q already running", wf.ID)
+ }
+ w.running[wf.ID.String()] = struct{}{}
+ return nil
+}
+
+func (w *Worker) markStopped(wf *workflow.Workflow) {
+ w.mu.Lock()
+ defer w.mu.Unlock()
+ delete(w.running, wf.ID.String())
+}
+
func (w *Worker) run(wf *workflow.Workflow) error {
select {
case <-w.done:
diff --git a/internal/relui/worker_test.go b/internal/relui/worker_test.go
index 41b91fb..9431cca 100644
--- a/internal/relui/worker_test.go
+++ b/internal/relui/worker_test.go
@@ -191,27 +191,6 @@
}
}
-func TestWorkerRunListenerError(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- dbp := testDB(ctx, t)
- q := db.New(dbp)
- dh := NewDefinitionHolder()
- w := NewWorker(dh, dbp, &unimplementedListener{})
-
- wd := newTestEchoWorkflow()
- dh.RegisterDefinition(t.Name(), wd)
- wfid := createUnfinishedEchoWorkflow(t, ctx, q)
-
- if err := w.Resume(ctx, wfid); err != nil {
- t.Fatalf("w.Resume(_, %v) = %v, wanted no error", wfid, err)
- }
-
- if err := w.Run(ctx); err == nil {
- t.Fatalf("w.Run() = %v, wanted error", err)
- }
-}
-
func newTestEchoWorkflow() *workflow.Definition {
wd := workflow.New()
echo := func(ctx context.Context, arg string) (string, error) {