blob: 62cfd21e3b93f55774518afa8d7bc9d2f6566d1e [file] [log] [blame]
// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package bundler supports bundling (batching) of items. Bundling amortizes an
// action with fixed costs over multiple items. For example, if an API provides
// an RPC that accepts a list of items as input, but clients would prefer
// adding items one at a time, then a Bundler can accept individual items from
// the client and bundle many of them into a single RPC.
//
// This package is experimental and subject to change without notice.
package bundler
import (
"errors"
"reflect"
"sync"
"time"
)
const (
DefaultDelayThreshold = time.Second
DefaultBundleCountThreshold = 10
DefaultBundleByteThreshold = 1e6 // 1M
DefaultBufferedByteLimit = 1e9 // 1G
)
var (
// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
ErrOverflow = errors.New("bundler reached buffered byte limit")
// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
)
// A Bundler collects items added to it into a bundle until the bundle
// exceeds a given size, then calls a user-provided function to handle the bundle.
type Bundler struct {
// Starting from the time that the first message is added to a bundle, once
// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
DelayThreshold time.Duration
// Once a bundle has this many items, handle the bundle. Since only one
// item at a time is added to a bundle, no bundle will exceed this
// threshold, so it also serves as a limit. The default is
// DefaultBundleCountThreshold.
BundleCountThreshold int
// Once the number of bytes in current bundle reaches this threshold, handle
// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
// but does not cap the total size of a bundle.
BundleByteThreshold int
// The maximum size of a bundle, in bytes. Zero means unlimited.
BundleByteLimit int
// The maximum number of bytes that the Bundler will keep in memory before
// returning ErrOverflow. The default is DefaultBufferedByteLimit.
BufferedByteLimit int
handler func(interface{}) // called to handle a bundle
itemSliceZero reflect.Value // nil (zero value) for slice of items
donec chan struct{} // closed when the Bundler is closed
handlec chan int // sent to when a bundle is ready for handling
timer *time.Timer // implements DelayThreshold
mu sync.Mutex
bufferedSize int // total bytes buffered
closedBundles []bundle // bundles waiting to be handled
curBundle bundle // incoming items added to this bundle
calledc chan struct{} // closed and re-created after handler is called
}
type bundle struct {
items reflect.Value // slice of item type
size int // size in bytes of all items
}
// NewBundler creates a new Bundler. When you are finished with a Bundler, call
// its Stop method.
//
// itemExample is a value of the type that will be bundled. For example, if you
// want to create bundles of *Entry, you could pass &Entry{} for itemExample.
//
// handler is a function that will be called on each bundle. If itemExample is
// of type T, the argument to handler is of type []T. handler is always called
// sequentially for each bundle, and never in parallel.
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
b := &Bundler{
DelayThreshold: DefaultDelayThreshold,
BundleCountThreshold: DefaultBundleCountThreshold,
BundleByteThreshold: DefaultBundleByteThreshold,
BufferedByteLimit: DefaultBufferedByteLimit,
handler: handler,
itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
donec: make(chan struct{}),
handlec: make(chan int, 1),
calledc: make(chan struct{}),
timer: time.NewTimer(1000 * time.Hour), // harmless initial timeout
}
b.curBundle.items = b.itemSliceZero
go b.background()
return b
}
// Add adds item to the current bundle. It marks the bundle for handling and
// starts a new one if any of the thresholds or limits are exceeded.
//
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
// the item can never be handled. Add returns ErrOversizedItem in this case.
//
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
// Add returns ErrOverflow.
//
// Add never blocks.
func (b *Bundler) Add(item interface{}, size int) error {
// If this item exceeds the maximum size of a bundle,
// we can never send it.
if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
return ErrOversizedItem
}
b.mu.Lock()
defer b.mu.Unlock()
// If adding this item would exceed our allotted memory
// footprint, we can't accept it.
if b.bufferedSize+size > b.BufferedByteLimit {
return ErrOverflow
}
// If adding this item to the current bundle would cause it to exceed the
// maximum bundle size, close the current bundle and start a new one.
if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
b.closeAndHandleBundle()
}
// Add the item.
b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
b.curBundle.size += size
b.bufferedSize += size
// If this is the first item in the bundle, restart the timer.
if b.curBundle.items.Len() == 1 {
b.timer.Reset(b.DelayThreshold)
}
// If the current bundle equals the count threshold, close it.
if b.curBundle.items.Len() == b.BundleCountThreshold {
b.closeAndHandleBundle()
}
// If the current bundle equals or exceeds the byte threshold, close it.
if b.curBundle.size >= b.BundleByteThreshold {
b.closeAndHandleBundle()
}
return nil
}
// Flush waits until all items in the Bundler have been handled (that is,
// until the last invocation of handler has returned).
func (b *Bundler) Flush() {
b.mu.Lock()
b.closeBundle()
// Unconditionally trigger the handling goroutine, to ensure calledc is closed
// even if there are no outstanding bundles.
select {
case b.handlec <- 1:
default:
}
calledc := b.calledc // remember locally, because it may change
b.mu.Unlock()
<-calledc
}
// Stop calls Flush, then shuts down the Bundler. Stop should always be
// called on a Bundler when it is no longer needed. You must wait for all calls
// to Add to complete before calling Stop. Calling Add concurrently with Stop
// may result in the added items being ignored.
func (b *Bundler) Stop() {
b.Flush()
b.mu.Lock()
b.timer.Stop()
b.mu.Unlock()
close(b.donec)
}
func (b *Bundler) closeAndHandleBundle() {
if b.closeBundle() {
// We have created a closed bundle.
// Send to handlec without blocking.
select {
case b.handlec <- 1:
default:
}
}
}
// closeBundle finishes the current bundle, adds it to the list of closed
// bundles and informs the background goroutine that there are bundles ready
// for processing.
//
// This should always be called with b.mu held.
func (b *Bundler) closeBundle() bool {
if b.curBundle.items.Len() == 0 {
return false
}
b.closedBundles = append(b.closedBundles, b.curBundle)
b.curBundle.items = b.itemSliceZero
b.curBundle.size = 0
return true
}
// background runs in a separate goroutine, waiting for events and handling
// bundles.
func (b *Bundler) background() {
done := false
for {
timedOut := false
// Wait for something to happen.
select {
case <-b.handlec:
case <-b.donec:
done = true
case <-b.timer.C:
timedOut = true
}
// Handle closed bundles.
b.mu.Lock()
if timedOut {
b.closeBundle()
}
buns := b.closedBundles
b.closedBundles = nil
// Closing calledc means we've sent all bundles. We need
// a new channel for the next set of bundles, which may start
// accumulating as soon as we release the lock.
calledc := b.calledc
b.calledc = make(chan struct{})
b.mu.Unlock()
for i, bun := range buns {
b.handler(bun.items.Interface())
// Drop the bundle's items, reducing our memory footprint.
buns[i].items = reflect.Value{} // buns[i] because bun is a copy
// Note immediately that we have more space, so Adds that occur
// during this loop will have a chance of succeeding.
b.mu.Lock()
b.bufferedSize -= bun.size
b.mu.Unlock()
}
// Signal that we've sent all outstanding bundles.
close(calledc)
if done {
break
}
}
}