internal/relui: lock concurrent runs of a workflow

This change adds a map containing a list of running workflow ids along
with a lock. Attempting to re-start a running workflow will log an error.

Updates golang/go#53165

Change-Id: I8eb144a2920ec8da17edbc5bbd722279721420f6
Reviewed-on: https://go-review.googlesource.com/c/build/+/411075
Reviewed-by: Dmitri Shuralyov <dmitshur@golang.org>
Run-TryBot: Alex Rakoczy <alex@golang.org>
Reviewed-by: Dmitri Shuralyov <dmitshur@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/relui/worker.go b/internal/relui/worker.go
index cc5424d..100745a 100644
--- a/internal/relui/worker.go
+++ b/internal/relui/worker.go
@@ -10,6 +10,7 @@
 	"errors"
 	"fmt"
 	"log"
+	"sync"
 
 	"github.com/google/uuid"
 	"github.com/jackc/pgx/v4"
@@ -35,6 +36,12 @@
 
 	done    chan struct{}
 	pending chan *workflow.Workflow
+
+	mu sync.Mutex
+	// running is a set of currently running Workflow ids. Run uses
+	// this set to prevent starting a simultaneous execution of a
+	// currently running Workflow.
+	running map[string]struct{}
 }
 
 // NewWorker returns a Worker ready to accept and run workflows.
@@ -45,6 +52,7 @@
 		l:       l,
 		done:    make(chan struct{}),
 		pending: make(chan *workflow.Workflow, 1),
+		running: make(map[string]struct{}),
 	}
 }
 
@@ -64,6 +72,12 @@
 			return ctx.Err()
 		case wf := <-w.pending:
 			eg.Go(func() error {
+				if err := w.markRunning(wf); err != nil {
+					log.Println(err)
+					return nil
+				}
+				defer w.markStopped(wf)
+
 				outputs, err := wf.Run(ctx, w.l)
 				if wfErr := w.l.WorkflowFinished(ctx, wf.ID, outputs, err); wfErr != nil {
 					return fmt.Errorf("w.l.WorkflowFinished(_, %q, %v, %q) = %w", wf.ID, outputs, err, wfErr)
@@ -74,6 +88,22 @@
 	}
 }
 
+func (w *Worker) markRunning(wf *workflow.Workflow) error {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	if _, ok := w.running[wf.ID.String()]; ok {
+		return fmt.Errorf("workflow %q already running", wf.ID)
+	}
+	w.running[wf.ID.String()] = struct{}{}
+	return nil
+}
+
+func (w *Worker) markStopped(wf *workflow.Workflow) {
+	w.mu.Lock()
+	defer w.mu.Unlock()
+	delete(w.running, wf.ID.String())
+}
+
 func (w *Worker) run(wf *workflow.Workflow) error {
 	select {
 	case <-w.done:
diff --git a/internal/relui/worker_test.go b/internal/relui/worker_test.go
index 41b91fb..9431cca 100644
--- a/internal/relui/worker_test.go
+++ b/internal/relui/worker_test.go
@@ -191,27 +191,6 @@
 	}
 }
 
-func TestWorkerRunListenerError(t *testing.T) {
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	dbp := testDB(ctx, t)
-	q := db.New(dbp)
-	dh := NewDefinitionHolder()
-	w := NewWorker(dh, dbp, &unimplementedListener{})
-
-	wd := newTestEchoWorkflow()
-	dh.RegisterDefinition(t.Name(), wd)
-	wfid := createUnfinishedEchoWorkflow(t, ctx, q)
-
-	if err := w.Resume(ctx, wfid); err != nil {
-		t.Fatalf("w.Resume(_, %v) = %v, wanted no error", wfid, err)
-	}
-
-	if err := w.Run(ctx); err == nil {
-		t.Fatalf("w.Run() = %v, wanted error", err)
-	}
-}
-
 func newTestEchoWorkflow() *workflow.Definition {
 	wd := workflow.New()
 	echo := func(ctx context.Context, arg string) (string, error) {