blob: e043c630470ce6c52893287536732e27e75079a1 [file] [log] [blame]
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/*
This package provides a facility to run a heterogeneous pool of workers.
Each worker is defined by an interface, and the pool execute's each worker's
Run method repeatedly concurrently. The worker may exit early by returning
the Done error. Each worker's Run method accepts a context.Context which is
passed to it through the pool. If this context is cancelled, it may cancel
workers and will always cancel the pool.
Each worker is guaranteed to start immediately when the pool's Run method is
called and not any sooner.
*/
package pool
import (
"context"
"errors"
"sync"
)
// Done is used to signal to the pool that the worker has no more useful work
// to do.
var Done error = errors.New("pool worker is done")
// Worker represents a stateful task which is executed repeatedly by calling
// its Run method. Any resources associated with the Worker may be freed with
// Close.
type Worker interface {
// Run executes a task once, returning an error on failure.
Run(context.Context) error
// Close releases any resources associated with the Worker.
Close() error
}
// P implements a heterogeneous pool of Workers.
type P struct {
workers []Worker
gun, cancel, done chan struct{}
errs chan error
}
// New creates a new pool of the given workers.
//
// The provided context will be passed to all workers' run methods.
func New(ctx context.Context, workers []Worker) *P {
gun := make(chan struct{})
cancel := make(chan struct{})
errs := make(chan error, len(workers))
ready := make(chan struct{}, len(workers))
// Spin up workers.
wg := sync.WaitGroup{}
for _, w := range workers {
go func(w Worker) {
wg.Add(1)
defer wg.Done()
ready <- struct{}{}
<-gun // wait for starting gun to close
loop:
for {
err := w.Run(ctx)
if err == Done {
break
} else if err != nil {
errs <- err
break
}
select {
case <-ctx.Done():
break loop
case <-cancel:
break loop
default:
}
}
}(w)
}
// Wait for all workers to be ready.
for range workers {
<-ready
}
// Spin up waiter.
done := make(chan struct{})
go func() {
wg.Wait()
done <- struct{}{}
}()
return &P{
workers: workers,
gun: gun,
cancel: cancel,
done: done,
errs: errs,
}
}
// Run signals all the workers to begin and waits for all of them to complete.
//
// Each Worker's Run method is called in a loop until the worker returns an
// error or the context passed to New is cancelled. If the error is Done, then
// it does not propagate to Run and instead the worker stops looping.
//
// If the context is cancelled for any reason no error is returned. Check the
// context for any errors in that case.
//
// Always cleans up the pool's workers by calling Close before returning.
//
// Returns the first error encountered from any worker that failed and cancels
// the rest immediately.
func (p *P) Run() error {
close(p.gun) // fire starting gun
defer func() {
// Clean up on exit.
for _, w := range p.workers {
w.Close()
}
}()
// Wait for completion.
select {
case <-p.done:
case err := <-p.errs:
// Broadcast cancel to all workers
// and wait until they're done.
close(p.cancel)
<-p.done
return err
}
return nil
}