blob: d7ce705ae706aa93af8803c9d91e47efbfbed27d [file] [log] [blame]
// Copyright 2021 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
/*
This package provides a facility to run a heterogeneous pool of workers.
Each worker is defined by an interface, and the pool execute's each worker's
Run method repeatedly concurrently. The worker may exit early by returning
the Done error. Each worker's Run method accepts a context.Context which is
passed to it through the pool. If this context is cancelled, it may cancel
workers and will always cancel the pool.
Each worker is guaranteed to start immediately when the pool's Run method is
called and not any sooner.
*/
package pool
import (
"context"
"errors"
"sync"
"golang.org/x/sync/errgroup"
)
// Done is used to signal to the pool that the worker has no more useful work
// to do.
var Done error = errors.New("pool worker is done")
// Worker represents a stateful task which is executed repeatedly by calling
// its Run method. Any resources associated with the Worker may be freed with
// Close.
type Worker interface {
// Run executes a task once, returning an error on failure.
Run(context.Context) error
// Close releases any resources associated with the Worker.
Close() error
}
// P implements a heterogeneous pool of Workers.
type P struct {
workers []Worker
gun chan struct{}
g *errgroup.Group
}
// New creates a new pool of the given workers.
//
// The provided context will be passed to all workers' run methods.
func New(ctx context.Context, workers []Worker) *P {
g, ctx := errgroup.WithContext(ctx)
gun := make(chan struct{})
var ready sync.WaitGroup
ready.Add(len(workers))
// Spin up workers.
for _, w := range workers {
w := w
g.Go(func() error {
ready.Done()
<-gun // wait for starting gun to close
for {
err := w.Run(ctx)
if err == Done || ctx.Err() != nil {
return nil
} else if err != nil {
return err
}
}
})
}
// Wait for all workers to be ready.
ready.Wait()
return &P{
workers: workers,
gun: gun,
g: g,
}
}
// Run signals all the workers to begin and waits for all of them to complete.
//
// Each Worker's Run method is called in a loop until the worker returns an
// error or the context passed to New is cancelled. If the error is Done, then
// it does not propagate to Run and instead the worker stops looping.
//
// If the context is cancelled for any reason no error is returned. Check the
// context for any errors in that case.
//
// Always cleans up the pool's workers by calling Close before returning.
//
// Returns the first error encountered from any worker that failed and cancels
// the rest immediately.
func (p *P) Run() error {
close(p.gun) // fire starting gun
defer func() {
// Clean up on exit.
for _, w := range p.workers {
w.Close()
}
}()
return p.g.Wait()
}