Andrew Gerrand | 8cdbfaa | 2015-09-15 16:34:18 +1000 | [diff] [blame] | 1 | // Copyright 2015 The Go Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | // Package livelog provides a buffer that can be simultaneously written to by |
| 6 | // one writer and read from by many readers. |
| 7 | package livelog |
| 8 | |
| 9 | import ( |
| 10 | "io" |
| 11 | "sync" |
| 12 | ) |
| 13 | |
Michael Pratt | 7e22aac | 2021-05-05 10:15:25 -0400 | [diff] [blame] | 14 | const ( |
| 15 | // MaxBufferSize is the maximum buffer size, as it is more output than |
| 16 | // we expect from reasonable tests. |
| 17 | MaxBufferSize = 2 << 20 // 2 MB |
Andrew Gerrand | 8cdbfaa | 2015-09-15 16:34:18 +1000 | [diff] [blame] | 18 | |
Michael Pratt | 7e22aac | 2021-05-05 10:15:25 -0400 | [diff] [blame] | 19 | // truncationMessage is added to the end of the log when it reaches the |
| 20 | // maximum size. |
| 21 | truncationMessage = "\n\n... log truncated ..." |
| 22 | |
| 23 | // maxUserSize is the total user output we can place in the buffer |
| 24 | // while still leaving room for the truncation message. |
| 25 | maxUserSize = MaxBufferSize - len(truncationMessage) |
| 26 | ) |
| 27 | |
| 28 | // Buffer is an io.WriteCloser that provides multiple Readers that each yield |
| 29 | // the same data. |
| 30 | // |
| 31 | // It is safe to Write to a Buffer while Readers consume data. A Buffer has a |
| 32 | // maximum size of MaxBufferSize, after which Write will silently drop |
| 33 | // additional data and the buffer will contain a truncation note at the end. |
| 34 | // |
| 35 | // The zero value is a ready-to-use buffer. |
Andrew Gerrand | 8cdbfaa | 2015-09-15 16:34:18 +1000 | [diff] [blame] | 36 | type Buffer struct { |
| 37 | mu sync.Mutex // Guards the fields below. |
| 38 | wake *sync.Cond // Created on demand by reader. |
Michael Pratt | 7e22aac | 2021-05-05 10:15:25 -0400 | [diff] [blame] | 39 | buf []byte // Length is in the range [0, MaxBufferSize]. |
Andrew Gerrand | 8cdbfaa | 2015-09-15 16:34:18 +1000 | [diff] [blame] | 40 | eof bool |
| 41 | lastID int |
| 42 | } |
| 43 | |
| 44 | // Write appends data to the Buffer. |
| 45 | // It will wake any blocked Readers. |
| 46 | func (b *Buffer) Write(b2 []byte) (int, error) { |
| 47 | b.mu.Lock() |
| 48 | defer b.mu.Unlock() |
| 49 | |
Michael Pratt | 7e22aac | 2021-05-05 10:15:25 -0400 | [diff] [blame] | 50 | needTrunc := false |
Andrew Gerrand | 8cdbfaa | 2015-09-15 16:34:18 +1000 | [diff] [blame] | 51 | b2len := len(b2) |
Michael Pratt | 7e22aac | 2021-05-05 10:15:25 -0400 | [diff] [blame] | 52 | if len(b.buf) == MaxBufferSize { |
| 53 | // b.buf is full and truncationMessage was written. |
| 54 | b2 = nil |
| 55 | } else if len(b.buf)+b2len > maxUserSize { |
| 56 | b2 = b2[:maxUserSize-len(b.buf)] |
| 57 | needTrunc = true |
| 58 | // After this write, b.buf will reach MaxBufferSize length. |
Andrew Gerrand | 8cdbfaa | 2015-09-15 16:34:18 +1000 | [diff] [blame] | 59 | } |
| 60 | b.buf = append(b.buf, b2...) |
Michael Pratt | 7e22aac | 2021-05-05 10:15:25 -0400 | [diff] [blame] | 61 | if needTrunc { |
| 62 | b.buf = append(b.buf, []byte(truncationMessage)...) |
| 63 | } |
Andrew Gerrand | 8cdbfaa | 2015-09-15 16:34:18 +1000 | [diff] [blame] | 64 | b.wakeReaders() |
| 65 | return b2len, nil |
| 66 | } |
| 67 | |
| 68 | // Close signals EOF to all Readers. |
| 69 | func (b *Buffer) Close() error { |
| 70 | b.mu.Lock() |
| 71 | defer b.mu.Unlock() |
| 72 | |
| 73 | b.eof = true |
| 74 | b.wakeReaders() |
| 75 | return nil |
| 76 | } |
| 77 | |
Andrew Gerrand | f1a914c | 2015-09-16 04:12:50 +0000 | [diff] [blame] | 78 | // wakeReaders wakes any sleeping readers. |
Andrew Gerrand | 8cdbfaa | 2015-09-15 16:34:18 +1000 | [diff] [blame] | 79 | // b.mu must be held when calling. |
| 80 | func (b *Buffer) wakeReaders() { |
| 81 | if b.wake != nil { |
| 82 | b.wake.Broadcast() |
| 83 | } |
| 84 | } |
| 85 | |
| 86 | // Bytes returns a copy of the underlying buffer. |
| 87 | func (b *Buffer) Bytes() []byte { |
| 88 | b.mu.Lock() |
| 89 | defer b.mu.Unlock() |
| 90 | |
| 91 | return append([]byte(nil), b.buf...) |
| 92 | } |
| 93 | |
Matt Drollette | cd04c5a | 2015-09-16 11:27:02 -0500 | [diff] [blame] | 94 | // String returns a copy of the underlying buffer as a string. |
Andrew Gerrand | 8cdbfaa | 2015-09-15 16:34:18 +1000 | [diff] [blame] | 95 | func (b *Buffer) String() string { |
| 96 | b.mu.Lock() |
| 97 | defer b.mu.Unlock() |
| 98 | |
| 99 | return string(b.buf) |
| 100 | } |
| 101 | |
| 102 | // Reader initializes and returns a ReadCloser that will emit the entire buffer. |
| 103 | // It is safe to call Read and Close concurrently. |
| 104 | func (b *Buffer) Reader() io.ReadCloser { |
| 105 | b.mu.Lock() |
| 106 | defer b.mu.Unlock() |
| 107 | |
| 108 | b.lastID++ |
| 109 | return &reader{buf: b, id: b.lastID} |
| 110 | } |
| 111 | |
| 112 | type reader struct { |
| 113 | buf *Buffer |
| 114 | id int // Read-only. |
| 115 | read int // Bytes read; accessed by only the Read method. |
| 116 | closed bool // Guarded by buf.mu. |
| 117 | } |
| 118 | |
| 119 | func (r *reader) Read(b []byte) (int, error) { |
| 120 | r.buf.mu.Lock() |
| 121 | defer r.buf.mu.Unlock() |
| 122 | |
| 123 | // Wait for data or writer EOF or reader closed. |
| 124 | for len(r.buf.buf) == r.read && !r.buf.eof && !r.closed { |
| 125 | if r.buf.wake == nil { |
| 126 | r.buf.wake = sync.NewCond(&r.buf.mu) |
| 127 | } |
| 128 | r.buf.wake.Wait() |
| 129 | } |
| 130 | // Return EOF if writer reported EOF or this reader is closed. |
| 131 | if (len(r.buf.buf) == r.read && r.buf.eof) || r.closed { |
| 132 | return 0, io.EOF |
| 133 | } |
| 134 | // Emit some data. |
| 135 | n := copy(b, r.buf.buf[r.read:]) |
| 136 | r.read += n |
| 137 | return n, nil |
| 138 | } |
| 139 | |
| 140 | func (r *reader) Close() error { |
| 141 | r.buf.mu.Lock() |
| 142 | defer r.buf.mu.Unlock() |
| 143 | |
| 144 | r.closed = true |
| 145 | |
| 146 | // Wake any sleeping readers to unblock a pending read on this reader. |
| 147 | // (For other open readers this will be a no-op.) |
| 148 | r.buf.wakeReaders() |
| 149 | |
| 150 | return nil |
| 151 | } |