blob: 0f954af88e6c8f75ed4e3e2b9652913fba7282be [file] [log] [blame]
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package livelog provides a buffer that can be simultaneously written to by
// one writer and read from by many readers.
package livelog
import (
"io"
"sync"
)
const (
// MaxBufferSize is the maximum buffer size, as it is more output than
// we expect from reasonable tests.
MaxBufferSize = 2 << 20 // 2 MB
// truncationMessage is added to the end of the log when it reaches the
// maximum size.
truncationMessage = "\n\n... log truncated ..."
// maxUserSize is the total user output we can place in the buffer
// while still leaving room for the truncation message.
maxUserSize = MaxBufferSize - len(truncationMessage)
)
// Buffer is an io.WriteCloser that provides multiple Readers that each yield
// the same data.
//
// It is safe to Write to a Buffer while Readers consume data. A Buffer has a
// maximum size of MaxBufferSize, after which Write will silently drop
// additional data and the buffer will contain a truncation note at the end.
//
// The zero value is a ready-to-use buffer.
type Buffer struct {
mu sync.Mutex // Guards the fields below.
wake *sync.Cond // Created on demand by reader.
buf []byte // Length is in the range [0, MaxBufferSize].
eof bool
lastID int
}
// Write appends data to the Buffer.
// It will wake any blocked Readers.
func (b *Buffer) Write(b2 []byte) (int, error) {
b.mu.Lock()
defer b.mu.Unlock()
needTrunc := false
b2len := len(b2)
if len(b.buf) == MaxBufferSize {
// b.buf is full and truncationMessage was written.
b2 = nil
} else if len(b.buf)+b2len > maxUserSize {
b2 = b2[:maxUserSize-len(b.buf)]
needTrunc = true
// After this write, b.buf will reach MaxBufferSize length.
}
b.buf = append(b.buf, b2...)
if needTrunc {
b.buf = append(b.buf, []byte(truncationMessage)...)
}
b.wakeReaders()
return b2len, nil
}
// Close signals EOF to all Readers.
func (b *Buffer) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
b.eof = true
b.wakeReaders()
return nil
}
// wakeReaders wakes any sleeping readers.
// b.mu must be held when calling.
func (b *Buffer) wakeReaders() {
if b.wake != nil {
b.wake.Broadcast()
}
}
// Bytes returns a copy of the underlying buffer.
func (b *Buffer) Bytes() []byte {
b.mu.Lock()
defer b.mu.Unlock()
return append([]byte(nil), b.buf...)
}
// String returns a copy of the underlying buffer as a string.
func (b *Buffer) String() string {
b.mu.Lock()
defer b.mu.Unlock()
return string(b.buf)
}
// Reader initializes and returns a ReadCloser that will emit the entire buffer.
// It is safe to call Read and Close concurrently.
func (b *Buffer) Reader() io.ReadCloser {
b.mu.Lock()
defer b.mu.Unlock()
b.lastID++
return &reader{buf: b, id: b.lastID}
}
type reader struct {
buf *Buffer
id int // Read-only.
read int // Bytes read; accessed by only the Read method.
closed bool // Guarded by buf.mu.
}
func (r *reader) Read(b []byte) (int, error) {
r.buf.mu.Lock()
defer r.buf.mu.Unlock()
// Wait for data or writer EOF or reader closed.
for len(r.buf.buf) == r.read && !r.buf.eof && !r.closed {
if r.buf.wake == nil {
r.buf.wake = sync.NewCond(&r.buf.mu)
}
r.buf.wake.Wait()
}
// Return EOF if writer reported EOF or this reader is closed.
if (len(r.buf.buf) == r.read && r.buf.eof) || r.closed {
return 0, io.EOF
}
// Emit some data.
n := copy(b, r.buf.buf[r.read:])
r.read += n
return n, nil
}
func (r *reader) Close() error {
r.buf.mu.Lock()
defer r.buf.mu.Unlock()
r.closed = true
// Wake any sleeping readers to unblock a pending read on this reader.
// (For other open readers this will be a no-op.)
r.buf.wakeReaders()
return nil
}