quic: fast path for stream writes
Similar to the fast-path for reads, writes are buffered in an
unsynchronized []byte allowing for lock-free small writes.
For golang/go#58547
Change-Id: I305cb5f91eff662a473f44a4bc051acc7c213e4c
Reviewed-on: https://go-review.googlesource.com/c/net/+/564496
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/quic/pipe.go b/internal/quic/pipe.go
index 42a0049..75cf76d 100644
--- a/internal/quic/pipe.go
+++ b/internal/quic/pipe.go
@@ -148,6 +148,18 @@
return b[:min(int64(len(b)), n)]
}
+// availableBuffer returns the available contiguous, allocated buffer space
+// following the pipe window.
+//
+// This is used by the stream write fast path, which makes multiple writes into the pipe buffer
+// without a lock, and then adjusts p.end at a later time with a lock held.
+func (p *pipe) availableBuffer() []byte {
+ if p.tail == nil {
+ return nil
+ }
+ return p.tail.b[p.end-p.tail.off:]
+}
+
// discardBefore discards all data prior to off.
func (p *pipe) discardBefore(off int64) {
for p.head != nil && p.head.end() < off {
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 17ca8b7..c5fafdf 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -58,8 +58,10 @@
outdone chan struct{} // closed when all data sent
// Unsynchronized buffers, used for lock-free fast path.
- inbuf []byte // received data
- inbufoff int // bytes of inbuf which have been consumed
+ inbuf []byte // received data
+ inbufoff int // bytes of inbuf which have been consumed
+ outbuf []byte // written data
+ outbufoff int // bytes of outbuf which contain data to write
// Atomic stream state bits.
//
@@ -313,7 +315,14 @@
if s.IsReadOnly() {
return 0, errors.New("write to read-only stream")
}
+ if len(b) > 0 && len(s.outbuf)-s.outbufoff >= len(b) {
+ // Fast path: The data to write fits in s.outbuf.
+ copy(s.outbuf[s.outbufoff:], b)
+ s.outbufoff += len(b)
+ return len(b), nil
+ }
canWrite := s.outgate.lock()
+ s.flushFastOutputBuffer()
for {
// The first time through this loop, we may or may not be write blocked.
// We exit the loop after writing all data, so on subsequent passes through
@@ -373,17 +382,51 @@
// If we have bytes left to send, we're blocked.
canWrite = false
}
+ if lim := s.out.start + s.outmaxbuf - s.out.end - 1; lim > 0 {
+ // If s.out has space allocated and available to be written into,
+ // then reference it in s.outbuf for fast-path writes.
+ //
+ // It's perhaps a bit pointless to limit s.outbuf to the send buffer limit.
+ // We've already allocated this buffer so we aren't saving any memory
+ // by not using it.
+ // For now, we limit it anyway to make it easier to reason about limits.
+ //
+ // We set the limit to one less than the send buffer limit (the -1 above)
+ // so that a write which completely fills the buffer will overflow
+ // s.outbuf and trigger a flush.
+ s.outbuf = s.out.availableBuffer()
+ if int64(len(s.outbuf)) > lim {
+ s.outbuf = s.outbuf[:lim]
+ }
+ }
s.outUnlock()
return n, nil
}
// WriteBytes writes a single byte to the stream.
func (s *Stream) WriteByte(c byte) error {
+ if s.outbufoff < len(s.outbuf) {
+ s.outbuf[s.outbufoff] = c
+ s.outbufoff++
+ return nil
+ }
b := [1]byte{c}
_, err := s.Write(b[:])
return err
}
+func (s *Stream) flushFastOutputBuffer() {
+ if s.outbuf == nil {
+ return
+ }
+ // Commit data previously written to s.outbuf.
+ // s.outbuf is a reference to a buffer in s.out, so we just need to record
+ // that the output buffer has been extended.
+ s.out.end += int64(s.outbufoff)
+ s.outbuf = nil
+ s.outbufoff = 0
+}
+
// Flush flushes data written to the stream.
// It does not wait for the peer to acknowledge receipt of the data.
// Use Close to wait for the peer's acknowledgement.
@@ -394,6 +437,7 @@
}
func (s *Stream) flushLocked() {
+ s.flushFastOutputBuffer()
s.outopened.set()
if s.outflushed < s.outwin {
s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
@@ -509,6 +553,8 @@
// extra RESET_STREAM in this case is harmless.
s.outreset.set()
s.outresetcode = code
+ s.outbuf = nil
+ s.outbufoff = 0
s.out.discardBefore(s.out.end)
s.outunsent = rangeset[int64]{}
s.outblocked.clear()
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index d1cfb34..9f857f2 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -100,6 +100,7 @@
if err != nil {
t.Fatalf("write with available output buffer: unexpected error: %v", err)
}
+ s.Flush()
tc.wantFrame("write blocked by flow control triggers a STREAM_DATA_BLOCKED frame",
packetType1RTT, debugFrameStreamDataBlocked{
id: s.id,
@@ -111,6 +112,7 @@
if err != nil {
t.Fatalf("write with available output buffer: unexpected error: %v", err)
}
+ s.Flush()
tc.wantIdle("adding more blocked data does not trigger another STREAM_DATA_BLOCKED")
// Provide some flow control window.
@@ -1349,7 +1351,6 @@
id: s.id,
data: want[0:4],
})
-
})
}