// Copyright 2016 Google Inc. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package bundler supports bundling (batching) of items. Bundling amortizes an
// action with fixed costs over multiple items. For example, if an API provides
// an RPC that accepts a list of items as input, but clients would prefer
// adding items one at a time, then a Bundler can accept individual items from
// the client and bundle many of them into a single RPC.
//
// This package is experimental and subject to change without notice.
package bundler

import (
	"errors"
	"reflect"
	"sync"
	"time"
)

const (
	DefaultDelayThreshold       = time.Second
	DefaultBundleCountThreshold = 10
	DefaultBundleByteThreshold  = 1e6 // 1M
	DefaultBufferedByteLimit    = 1e9 // 1G
)

var (
	// ErrOverflow indicates that Bundler's stored bytes exceeds its BufferedByteLimit.
	ErrOverflow = errors.New("bundler reached buffered byte limit")

	// ErrOversizedItem indicates that an item's size exceeds the maximum bundle size.
	ErrOversizedItem = errors.New("item size exceeds bundle byte limit")
)

// A Bundler collects items added to it into a bundle until the bundle
// exceeds a given size, then calls a user-provided function to handle the bundle.
type Bundler struct {
	// Starting from the time that the first message is added to a bundle, once
	// this delay has passed, handle the bundle. The default is DefaultDelayThreshold.
	DelayThreshold time.Duration

	// Once a bundle has this many items, handle the bundle. Since only one
	// item at a time is added to a bundle, no bundle will exceed this
	// threshold, so it also serves as a limit. The default is
	// DefaultBundleCountThreshold.
	BundleCountThreshold int

	// Once the number of bytes in current bundle reaches this threshold, handle
	// the bundle. The default is DefaultBundleByteThreshold. This triggers handling,
	// but does not cap the total size of a bundle.
	BundleByteThreshold int

	// The maximum size of a bundle, in bytes. Zero means unlimited.
	BundleByteLimit int

	// The maximum number of bytes that the Bundler will keep in memory before
	// returning ErrOverflow. The default is DefaultBufferedByteLimit.
	BufferedByteLimit int

	handler       func(interface{}) // called to handle a bundle
	itemSliceZero reflect.Value     // nil (zero value) for slice of items
	donec         chan struct{}     // closed when the Bundler is closed
	handlec       chan int          // sent to when a bundle is ready for handling
	timer         *time.Timer       // implements DelayThreshold

	mu            sync.Mutex
	bufferedSize  int           // total bytes buffered
	closedBundles []bundle      // bundles waiting to be handled
	curBundle     bundle        // incoming items added to this bundle
	calledc       chan struct{} // closed and re-created after handler is called
}

type bundle struct {
	items reflect.Value // slice of item type
	size  int           // size in bytes of all items
}

// NewBundler creates a new Bundler. When you are finished with a Bundler, call
// its Stop method.
//
// itemExample is a value of the type that will be bundled. For example, if you
// want to create bundles of *Entry, you could pass &Entry{} for itemExample.
//
// handler is a function that will be called on each bundle. If itemExample is
// of type T, the argument to handler is of type []T. handler is always called
// sequentially for each bundle, and never in parallel.
func NewBundler(itemExample interface{}, handler func(interface{})) *Bundler {
	b := &Bundler{
		DelayThreshold:       DefaultDelayThreshold,
		BundleCountThreshold: DefaultBundleCountThreshold,
		BundleByteThreshold:  DefaultBundleByteThreshold,
		BufferedByteLimit:    DefaultBufferedByteLimit,

		handler:       handler,
		itemSliceZero: reflect.Zero(reflect.SliceOf(reflect.TypeOf(itemExample))),
		donec:         make(chan struct{}),
		handlec:       make(chan int, 1),
		calledc:       make(chan struct{}),
		timer:         time.NewTimer(1000 * time.Hour), // harmless initial timeout
	}
	b.curBundle.items = b.itemSliceZero
	go b.background()
	return b
}

// Add adds item to the current bundle. It marks the bundle for handling and
// starts a new one if any of the thresholds or limits are exceeded.
//
// If the item's size exceeds the maximum bundle size (Bundler.BundleByteLimit), then
// the item can never be handled. Add returns ErrOversizedItem in this case.
//
// If adding the item would exceed the maximum memory allowed (Bundler.BufferedByteLimit),
// Add returns ErrOverflow.
//
// Add never blocks.
func (b *Bundler) Add(item interface{}, size int) error {
	// If this item exceeds the maximum size of a bundle,
	// we can never send it.
	if b.BundleByteLimit > 0 && size > b.BundleByteLimit {
		return ErrOversizedItem
	}
	b.mu.Lock()
	defer b.mu.Unlock()
	// If adding this item would exceed our allotted memory
	// footprint, we can't accept it.
	if b.bufferedSize+size > b.BufferedByteLimit {
		return ErrOverflow
	}
	// If adding this item to the current bundle would cause it to exceed the
	// maximum bundle size, close the current bundle and start a new one.
	if b.BundleByteLimit > 0 && b.curBundle.size+size > b.BundleByteLimit {
		b.closeAndHandleBundle()
	}
	// Add the item.
	b.curBundle.items = reflect.Append(b.curBundle.items, reflect.ValueOf(item))
	b.curBundle.size += size
	b.bufferedSize += size
	// If this is the first item in the bundle, restart the timer.
	if b.curBundle.items.Len() == 1 {
		b.timer.Reset(b.DelayThreshold)
	}
	// If the current bundle equals the count threshold, close it.
	if b.curBundle.items.Len() == b.BundleCountThreshold {
		b.closeAndHandleBundle()
	}
	// If the current bundle equals or exceeds the byte threshold, close it.
	if b.curBundle.size >= b.BundleByteThreshold {
		b.closeAndHandleBundle()
	}
	return nil
}

// Flush waits until all items in the Bundler have been handled (that is,
// until the last invocation of handler has returned).
func (b *Bundler) Flush() {
	b.mu.Lock()
	b.closeBundle()
	// Unconditionally trigger the handling goroutine, to ensure calledc is closed
	// even if there are no outstanding bundles.
	select {
	case b.handlec <- 1:
	default:
	}
	calledc := b.calledc // remember locally, because it may change
	b.mu.Unlock()
	<-calledc
}

// Stop calls Flush, then shuts down the Bundler. Stop should always be
// called on a Bundler when it is no longer needed. You must wait for all calls
// to Add to complete before calling Stop. Calling Add concurrently with Stop
// may result in the added items being ignored.
func (b *Bundler) Stop() {
	b.Flush()
	b.mu.Lock()
	b.timer.Stop()
	b.mu.Unlock()
	close(b.donec)
}

func (b *Bundler) closeAndHandleBundle() {
	if b.closeBundle() {
		// We have created a closed bundle.
		// Send to handlec without blocking.
		select {
		case b.handlec <- 1:
		default:
		}
	}
}

// closeBundle finishes the current bundle, adds it to the list of closed
// bundles and informs the background goroutine that there are bundles ready
// for processing.
//
// This should always be called with b.mu held.
func (b *Bundler) closeBundle() bool {
	if b.curBundle.items.Len() == 0 {
		return false
	}
	b.closedBundles = append(b.closedBundles, b.curBundle)
	b.curBundle.items = b.itemSliceZero
	b.curBundle.size = 0
	return true
}

// background runs in a separate goroutine, waiting for events and handling
// bundles.
func (b *Bundler) background() {
	done := false
	for {
		timedOut := false
		// Wait for something to happen.
		select {
		case <-b.handlec:
		case <-b.donec:
			done = true
		case <-b.timer.C:
			timedOut = true
		}
		// Handle closed bundles.
		b.mu.Lock()
		if timedOut {
			b.closeBundle()
		}
		buns := b.closedBundles
		b.closedBundles = nil
		// Closing calledc means we've sent all bundles. We need
		// a new channel for the next set of bundles, which may start
		// accumulating as soon as we release the lock.
		calledc := b.calledc
		b.calledc = make(chan struct{})
		b.mu.Unlock()
		for i, bun := range buns {
			b.handler(bun.items.Interface())
			// Drop the bundle's items, reducing our memory footprint.
			buns[i].items = reflect.Value{} // buns[i] because bun is a copy
			// Note immediately that we have more space, so Adds that occur
			// during this loop will have a chance of succeeding.
			b.mu.Lock()
			b.bufferedSize -= bun.size
			b.mu.Unlock()
		}
		// Signal that we've sent all outstanding bundles.
		close(calledc)
		if done {
			break
		}
	}
}
