quic: fast path for stream writes

Similar to the fast-path for reads, writes are buffered in an
unsynchronized []byte allowing for lock-free small writes.

For golang/go#58547

Change-Id: I305cb5f91eff662a473f44a4bc051acc7c213e4c
Reviewed-on: https://go-review.googlesource.com/c/net/+/564496
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/quic/pipe.go b/internal/quic/pipe.go
index 42a0049..75cf76d 100644
--- a/internal/quic/pipe.go
+++ b/internal/quic/pipe.go
@@ -148,6 +148,18 @@
 	return b[:min(int64(len(b)), n)]
 }
 
+// availableBuffer returns the available contiguous, allocated buffer space
+// following the pipe window.
+//
+// This is used by the stream write fast path, which makes multiple writes into the pipe buffer
+// without a lock, and then adjusts p.end at a later time with a lock held.
+func (p *pipe) availableBuffer() []byte {
+	if p.tail == nil {
+		return nil
+	}
+	return p.tail.b[p.end-p.tail.off:]
+}
+
 // discardBefore discards all data prior to off.
 func (p *pipe) discardBefore(off int64) {
 	for p.head != nil && p.head.end() < off {
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 17ca8b7..c5fafdf 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -58,8 +58,10 @@
 	outdone      chan struct{}   // closed when all data sent
 
 	// Unsynchronized buffers, used for lock-free fast path.
-	inbuf    []byte // received data
-	inbufoff int    // bytes of inbuf which have been consumed
+	inbuf     []byte // received data
+	inbufoff  int    // bytes of inbuf which have been consumed
+	outbuf    []byte // written data
+	outbufoff int    // bytes of outbuf which contain data to write
 
 	// Atomic stream state bits.
 	//
@@ -313,7 +315,14 @@
 	if s.IsReadOnly() {
 		return 0, errors.New("write to read-only stream")
 	}
+	if len(b) > 0 && len(s.outbuf)-s.outbufoff >= len(b) {
+		// Fast path: The data to write fits in s.outbuf.
+		copy(s.outbuf[s.outbufoff:], b)
+		s.outbufoff += len(b)
+		return len(b), nil
+	}
 	canWrite := s.outgate.lock()
+	s.flushFastOutputBuffer()
 	for {
 		// The first time through this loop, we may or may not be write blocked.
 		// We exit the loop after writing all data, so on subsequent passes through
@@ -373,17 +382,51 @@
 		// If we have bytes left to send, we're blocked.
 		canWrite = false
 	}
+	if lim := s.out.start + s.outmaxbuf - s.out.end - 1; lim > 0 {
+		// If s.out has space allocated and available to be written into,
+		// then reference it in s.outbuf for fast-path writes.
+		//
+		// It's perhaps a bit pointless to limit s.outbuf to the send buffer limit.
+		// We've already allocated this buffer so we aren't saving any memory
+		// by not using it.
+		// For now, we limit it anyway to make it easier to reason about limits.
+		//
+		// We set the limit to one less than the send buffer limit (the -1 above)
+		// so that a write which completely fills the buffer will overflow
+		// s.outbuf and trigger a flush.
+		s.outbuf = s.out.availableBuffer()
+		if int64(len(s.outbuf)) > lim {
+			s.outbuf = s.outbuf[:lim]
+		}
+	}
 	s.outUnlock()
 	return n, nil
 }
 
 // WriteBytes writes a single byte to the stream.
 func (s *Stream) WriteByte(c byte) error {
+	if s.outbufoff < len(s.outbuf) {
+		s.outbuf[s.outbufoff] = c
+		s.outbufoff++
+		return nil
+	}
 	b := [1]byte{c}
 	_, err := s.Write(b[:])
 	return err
 }
 
+func (s *Stream) flushFastOutputBuffer() {
+	if s.outbuf == nil {
+		return
+	}
+	// Commit data previously written to s.outbuf.
+	// s.outbuf is a reference to a buffer in s.out, so we just need to record
+	// that the output buffer has been extended.
+	s.out.end += int64(s.outbufoff)
+	s.outbuf = nil
+	s.outbufoff = 0
+}
+
 // Flush flushes data written to the stream.
 // It does not wait for the peer to acknowledge receipt of the data.
 // Use Close to wait for the peer's acknowledgement.
@@ -394,6 +437,7 @@
 }
 
 func (s *Stream) flushLocked() {
+	s.flushFastOutputBuffer()
 	s.outopened.set()
 	if s.outflushed < s.outwin {
 		s.outunsent.add(s.outflushed, min(s.outwin, s.out.end))
@@ -509,6 +553,8 @@
 	// extra RESET_STREAM in this case is harmless.
 	s.outreset.set()
 	s.outresetcode = code
+	s.outbuf = nil
+	s.outbufoff = 0
 	s.out.discardBefore(s.out.end)
 	s.outunsent = rangeset[int64]{}
 	s.outblocked.clear()
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index d1cfb34..9f857f2 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -100,6 +100,7 @@
 		if err != nil {
 			t.Fatalf("write with available output buffer: unexpected error: %v", err)
 		}
+		s.Flush()
 		tc.wantFrame("write blocked by flow control triggers a STREAM_DATA_BLOCKED frame",
 			packetType1RTT, debugFrameStreamDataBlocked{
 				id:  s.id,
@@ -111,6 +112,7 @@
 		if err != nil {
 			t.Fatalf("write with available output buffer: unexpected error: %v", err)
 		}
+		s.Flush()
 		tc.wantIdle("adding more blocked data does not trigger another STREAM_DATA_BLOCKED")
 
 		// Provide some flow control window.
@@ -1349,7 +1351,6 @@
 				id:   s.id,
 				data: want[0:4],
 			})
-
 	})
 }