sweet/benchmarks/internal/pool: use errgroup to manage errors and cancellation

Also simplify concurrency in TestPoolCancel: t.Fatalf was called in a
background goroutine, which is not allowed (because it invokes
runtime.Goexit), but the background goroutine isn't really necessary
anyway. (If the test deadlocks, it's more useful to just let it time
out anyway so that the test will give us a useful goroutine dump.)

Fixes golang/go#50096

Change-Id: Ica9abf427f662f4ed1f7cd64ef906b49744009ef
Reviewed-on: https://go-review.googlesource.com/c/benchmarks/+/370974
Trust: Bryan Mills <bcmills@google.com>
Run-TryBot: Bryan Mills <bcmills@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
diff --git a/go.mod b/go.mod
index f4b739b..0f45938 100644
--- a/go.mod
+++ b/go.mod
@@ -21,6 +21,7 @@
 	github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9
 	gitlab.com/golang-commonmark/markdown v0.0.0-20211110145824-bf3e522c626a
 	golang.org/x/oauth2 v0.0.0-20211104180415-d3ed0bb246c8
+	golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
 	golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359
 	google.golang.org/api v0.60.0
 )
diff --git a/go.sum b/go.sum
index 1f2ea2c..6a3614d 100644
--- a/go.sum
+++ b/go.sum
@@ -458,6 +458,7 @@
 golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
diff --git a/sweet/benchmarks/internal/pool/pool.go b/sweet/benchmarks/internal/pool/pool.go
index e043c63..b35e447 100644
--- a/sweet/benchmarks/internal/pool/pool.go
+++ b/sweet/benchmarks/internal/pool/pool.go
@@ -20,6 +20,8 @@
 	"context"
 	"errors"
 	"sync"
+
+	"golang.org/x/sync/errgroup"
 )
 
 // Done is used to signal to the pool that the worker has no more useful work
@@ -39,66 +41,45 @@
 
 // P implements a heterogeneous pool of Workers.
 type P struct {
-	workers           []Worker
-	gun, cancel, done chan struct{}
-	errs              chan error
+	workers []Worker
+	gun     chan struct{}
+	g       *errgroup.Group
 }
 
 // New creates a new pool of the given workers.
 //
 // The provided context will be passed to all workers' run methods.
 func New(ctx context.Context, workers []Worker) *P {
+	g, ctx := errgroup.WithContext(ctx)
 	gun := make(chan struct{})
-	cancel := make(chan struct{})
-	errs := make(chan error, len(workers))
-	ready := make(chan struct{}, len(workers))
+
+	var ready sync.WaitGroup
+	ready.Add(len(workers))
 
 	// Spin up workers.
-	wg := sync.WaitGroup{}
 	for _, w := range workers {
-		go func(w Worker) {
-			wg.Add(1)
-			defer wg.Done()
-			ready <- struct{}{}
+		w := w
+		g.Go(func() error {
+			ready.Done()
 			<-gun // wait for starting gun to close
-		loop:
 			for {
 				err := w.Run(ctx)
-				if err == Done {
-					break
+				if err == Done || ctx.Err() != nil {
+					return nil
 				} else if err != nil {
-					errs <- err
-					break
-				}
-				select {
-				case <-ctx.Done():
-					break loop
-				case <-cancel:
-					break loop
-				default:
+					return err
 				}
 			}
-		}(w)
+		})
 	}
 
 	// Wait for all workers to be ready.
-	for range workers {
-		<-ready
-	}
-
-	// Spin up waiter.
-	done := make(chan struct{})
-	go func() {
-		wg.Wait()
-		done <- struct{}{}
-	}()
+	ready.Wait()
 
 	return &P{
 		workers: workers,
 		gun:     gun,
-		cancel:  cancel,
-		done:    done,
-		errs:    errs,
+		g:       g,
 	}
 }
 
@@ -123,15 +104,5 @@
 			w.Close()
 		}
 	}()
-	// Wait for completion.
-	select {
-	case <-p.done:
-	case err := <-p.errs:
-		// Broadcast cancel to all workers
-		// and wait until they're done.
-		close(p.cancel)
-		<-p.done
-		return err
-	}
-	return nil
+	return p.g.Wait()
 }
diff --git a/sweet/benchmarks/internal/pool/pool_test.go b/sweet/benchmarks/internal/pool/pool_test.go
index 6647183..849af4a 100644
--- a/sweet/benchmarks/internal/pool/pool_test.go
+++ b/sweet/benchmarks/internal/pool/pool_test.go
@@ -8,7 +8,6 @@
 	"context"
 	"io"
 	"testing"
-	"time"
 )
 
 func TestEmptyPool(t *testing.T) {
@@ -96,17 +95,8 @@
 	ctx := context.Background()
 	ctx, cancel := context.WithCancel(ctx)
 	p := New(ctx, workers)
-	sig := make(chan struct{})
-	go func() {
-		if err := p.Run(); err != nil {
-			t.Fatalf("got error from good pool: %v", err)
-		}
-		sig <- struct{}{}
-	}()
 	cancel()
-	select {
-	case <-sig:
-	case <-time.After(3 * time.Second):
-		t.Fatal("test timed out after 3 seconds")
+	if err := p.Run(); err != nil {
+		t.Fatalf("got error from good pool: %v", err)
 	}
 }