blob: 0f954af88e6c8f75ed4e3e2b9652913fba7282be [file] [log] [blame]
Andrew Gerrand8cdbfaa2015-09-15 16:34:18 +10001// Copyright 2015 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5// Package livelog provides a buffer that can be simultaneously written to by
6// one writer and read from by many readers.
7package livelog
8
9import (
10 "io"
11 "sync"
12)
13
Michael Pratt7e22aac2021-05-05 10:15:25 -040014const (
15 // MaxBufferSize is the maximum buffer size, as it is more output than
16 // we expect from reasonable tests.
17 MaxBufferSize = 2 << 20 // 2 MB
Andrew Gerrand8cdbfaa2015-09-15 16:34:18 +100018
Michael Pratt7e22aac2021-05-05 10:15:25 -040019 // truncationMessage is added to the end of the log when it reaches the
20 // maximum size.
21 truncationMessage = "\n\n... log truncated ..."
22
23 // maxUserSize is the total user output we can place in the buffer
24 // while still leaving room for the truncation message.
25 maxUserSize = MaxBufferSize - len(truncationMessage)
26)
27
28// Buffer is an io.WriteCloser that provides multiple Readers that each yield
29// the same data.
30//
31// It is safe to Write to a Buffer while Readers consume data. A Buffer has a
32// maximum size of MaxBufferSize, after which Write will silently drop
33// additional data and the buffer will contain a truncation note at the end.
34//
35// The zero value is a ready-to-use buffer.
Andrew Gerrand8cdbfaa2015-09-15 16:34:18 +100036type Buffer struct {
37 mu sync.Mutex // Guards the fields below.
38 wake *sync.Cond // Created on demand by reader.
Michael Pratt7e22aac2021-05-05 10:15:25 -040039 buf []byte // Length is in the range [0, MaxBufferSize].
Andrew Gerrand8cdbfaa2015-09-15 16:34:18 +100040 eof bool
41 lastID int
42}
43
44// Write appends data to the Buffer.
45// It will wake any blocked Readers.
46func (b *Buffer) Write(b2 []byte) (int, error) {
47 b.mu.Lock()
48 defer b.mu.Unlock()
49
Michael Pratt7e22aac2021-05-05 10:15:25 -040050 needTrunc := false
Andrew Gerrand8cdbfaa2015-09-15 16:34:18 +100051 b2len := len(b2)
Michael Pratt7e22aac2021-05-05 10:15:25 -040052 if len(b.buf) == MaxBufferSize {
53 // b.buf is full and truncationMessage was written.
54 b2 = nil
55 } else if len(b.buf)+b2len > maxUserSize {
56 b2 = b2[:maxUserSize-len(b.buf)]
57 needTrunc = true
58 // After this write, b.buf will reach MaxBufferSize length.
Andrew Gerrand8cdbfaa2015-09-15 16:34:18 +100059 }
60 b.buf = append(b.buf, b2...)
Michael Pratt7e22aac2021-05-05 10:15:25 -040061 if needTrunc {
62 b.buf = append(b.buf, []byte(truncationMessage)...)
63 }
Andrew Gerrand8cdbfaa2015-09-15 16:34:18 +100064 b.wakeReaders()
65 return b2len, nil
66}
67
68// Close signals EOF to all Readers.
69func (b *Buffer) Close() error {
70 b.mu.Lock()
71 defer b.mu.Unlock()
72
73 b.eof = true
74 b.wakeReaders()
75 return nil
76}
77
Andrew Gerrandf1a914c2015-09-16 04:12:50 +000078// wakeReaders wakes any sleeping readers.
Andrew Gerrand8cdbfaa2015-09-15 16:34:18 +100079// b.mu must be held when calling.
80func (b *Buffer) wakeReaders() {
81 if b.wake != nil {
82 b.wake.Broadcast()
83 }
84}
85
86// Bytes returns a copy of the underlying buffer.
87func (b *Buffer) Bytes() []byte {
88 b.mu.Lock()
89 defer b.mu.Unlock()
90
91 return append([]byte(nil), b.buf...)
92}
93
Matt Drollettecd04c5a2015-09-16 11:27:02 -050094// String returns a copy of the underlying buffer as a string.
Andrew Gerrand8cdbfaa2015-09-15 16:34:18 +100095func (b *Buffer) String() string {
96 b.mu.Lock()
97 defer b.mu.Unlock()
98
99 return string(b.buf)
100}
101
102// Reader initializes and returns a ReadCloser that will emit the entire buffer.
103// It is safe to call Read and Close concurrently.
104func (b *Buffer) Reader() io.ReadCloser {
105 b.mu.Lock()
106 defer b.mu.Unlock()
107
108 b.lastID++
109 return &reader{buf: b, id: b.lastID}
110}
111
112type reader struct {
113 buf *Buffer
114 id int // Read-only.
115 read int // Bytes read; accessed by only the Read method.
116 closed bool // Guarded by buf.mu.
117}
118
119func (r *reader) Read(b []byte) (int, error) {
120 r.buf.mu.Lock()
121 defer r.buf.mu.Unlock()
122
123 // Wait for data or writer EOF or reader closed.
124 for len(r.buf.buf) == r.read && !r.buf.eof && !r.closed {
125 if r.buf.wake == nil {
126 r.buf.wake = sync.NewCond(&r.buf.mu)
127 }
128 r.buf.wake.Wait()
129 }
130 // Return EOF if writer reported EOF or this reader is closed.
131 if (len(r.buf.buf) == r.read && r.buf.eof) || r.closed {
132 return 0, io.EOF
133 }
134 // Emit some data.
135 n := copy(b, r.buf.buf[r.read:])
136 r.read += n
137 return n, nil
138}
139
140func (r *reader) Close() error {
141 r.buf.mu.Lock()
142 defer r.buf.mu.Unlock()
143
144 r.closed = true
145
146 // Wake any sleeping readers to unblock a pending read on this reader.
147 // (For other open readers this will be a no-op.)
148 r.buf.wakeReaders()
149
150 return nil
151}