blob: 438a0180fbc5eff3d0095f8eadd156c3e21fdb7a [file] [log] [blame]
type Request struct {
fn func() int // The operation to perform.
c chan int // The channel to return the result.
}
func requester(work chan<- Request) {
c := make(chan int)
for {
// Kill some time (fake load).
Sleep(rand.Int63n(nWorker * 2 * Second))
work <- Request{workFn, c} // send request
result := <-c // wait for answer
furtherProcess(result)
}
}
func (w *Worker) work(done chan *Worker) {
for {
req := <-w.requests // get Request from balancer
req.c <- req.fn() // call fn and send result
done <- w // we've finished this request
}
}
type Pool []*Worker
type Balancer struct {
pool Pool
done chan *Worker
}
func (b *Balancer) balance(work chan Request) {
for {
select {
case req := <-work: // received a Request...
b.dispatch(req) // ...so send it to a Worker
case w := <-b.done: // a worker has finished ...
b.completed(w) // ...so update its info
}
}
}
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}
type Worker struct {
requests chan Request // work to do (buffered channel)
pending int // count of pending tasks
index int // index in the heap
}
// Send Request to worker
func (b *Balancer) dispatch(req Request) {
// Grab the least loaded worker...
w := heap.Pop(&b.pool).(*Worker)
// ...send it the task.
w.requests <- req
// One more in its work queue.
w.pending++
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}
// Job is complete; update heap
func (b *Balancer) completed(w *Worker) {
// One fewer in the queue.
w.pending--
// Remove it from heap.
heap.Remove(&b.pool, w.index)
// Put it into its place on the heap.
heap.Push(&b.pool, w)
}
func (p Pool) Less(i, j int) bool {
return p[i].pending < p[j].pending
}