blob: 3f69cbf549b64be7a1576dd0e2e75d8d9798f492 [file] [log] [blame]
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package pump provides an infinitely buffered event channel.
package pump
// Make returns a new Pump. Call Release to stop pumping events.
func Make() Pump {
p := Pump{
in: make(chan interface{}),
out: make(chan interface{}),
release: make(chan struct{}),
}
go p.run()
return p
}
// Pump is an event pump, such that calling Send(e) will eventually send e on
// the event channel, in order, but Send will always complete soon, even if
// nothing is receiving on the event channel. It is effectively an infinitely
// buffered channel.
//
// In particular, goroutine A calling p.Send will not deadlock even if
// goroutine B that's responsible for receiving on p.Events() is currently
// blocked trying to send to A on a separate channel.
type Pump struct {
in chan interface{}
out chan interface{}
release chan struct{}
}
// Events returns the event channel.
func (p *Pump) Events() <-chan interface{} {
return p.out
}
// Send sends an event on the event channel.
func (p *Pump) Send(event interface{}) {
select {
case p.in <- event:
case <-p.release:
}
}
// Release stops the event pump. Pending events may or may not be delivered on
// the event channel. Calling Release will not close the event channel.
func (p *Pump) Release() {
close(p.release)
}
func (p *Pump) run() {
// initialSize is the initial size of the circular buffer. It must be a
// power of 2.
const initialSize = 16
i, j, buf, mask := 0, 0, make([]interface{}, initialSize), initialSize-1
for {
maybeOut := p.out
if i == j {
maybeOut = nil
}
select {
case maybeOut <- buf[i&mask]:
buf[i&mask] = nil
i++
case e := <-p.in:
// Allocate a bigger buffer if necessary.
if i+len(buf) == j {
b := make([]interface{}, 2*len(buf))
n := copy(b, buf[j&mask:])
copy(b[n:], buf[:j&mask])
i, j = 0, len(buf)
buf, mask = b, len(b)-1
}
buf[j&mask] = e
j++
case <-p.release:
return
}
}
}