quic: inbound connection-level flow control
Track the peer's connection level flow control window.
Update the window with MAX_DATA frames as data is consumed by the user.
Adjust shouldUpdateFlowControl so that we can use the same algorithm
for both stream-level and connection-level flow control.
The new algorithm is to send an update when doing so extends the
peer's window by at least 1/8 of the maximum window size.
For golang/go#58547
Change-Id: I2d8d82d06f0cb4b2ac25b3396c3cf4126a96e9cc
Reviewed-on: https://go-review.googlesource.com/c/net/+/526716
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/quic/config.go b/internal/quic/config.go
index d68e2c7..b390d69 100644
--- a/internal/quic/config.go
+++ b/internal/quic/config.go
@@ -41,6 +41,12 @@
// If zero, the default value of 1MiB is used.
// If negative, the limit is zero.
MaxStreamWriteBufferSize int64
+
+ // MaxConnReadBufferSize is the maximum amount of data sent by the peer that a
+ // connection will buffer for reading, across all streams.
+ // If zero, the default value of 1MiB is used.
+ // If negative, the limit is zero.
+ MaxConnReadBufferSize int64
}
func configDefault(v, def, limit int64) int64 {
@@ -69,3 +75,7 @@
func (c *Config) maxStreamWriteBufferSize() int64 {
return configDefault(c.MaxStreamWriteBufferSize, 1<<20, maxVarint)
}
+
+func (c *Config) maxConnReadBufferSize() int64 {
+ return configDefault(c.MaxConnReadBufferSize, 1<<20, maxVarint)
+}
diff --git a/internal/quic/config_test.go b/internal/quic/config_test.go
index b99ffef..d292854 100644
--- a/internal/quic/config_test.go
+++ b/internal/quic/config_test.go
@@ -10,6 +10,7 @@
func TestConfigTransportParameters(t *testing.T) {
const (
+ wantInitialMaxData = int64(1)
wantInitialMaxStreamData = int64(2)
wantInitialMaxStreamsBidi = int64(3)
wantInitialMaxStreamsUni = int64(4)
@@ -18,12 +19,16 @@
c.MaxBidiRemoteStreams = wantInitialMaxStreamsBidi
c.MaxUniRemoteStreams = wantInitialMaxStreamsUni
c.MaxStreamReadBufferSize = wantInitialMaxStreamData
+ c.MaxConnReadBufferSize = wantInitialMaxData
})
tc.handshake()
if tc.sentTransportParameters == nil {
t.Fatalf("conn didn't send transport parameters during handshake")
}
p := tc.sentTransportParameters
+ if got, want := p.initialMaxData, wantInitialMaxData; got != want {
+ t.Errorf("initial_max_data = %v, want %v", got, want)
+ }
if got, want := p.initialMaxStreamDataBidiLocal, wantInitialMaxStreamData; got != want {
t.Errorf("initial_max_stream_data_bidi_local = %v, want %v", got, want)
}
diff --git a/internal/quic/conn.go b/internal/quic/conn.go
index 117364f..0ab6f69 100644
--- a/internal/quic/conn.go
+++ b/internal/quic/conn.go
@@ -117,6 +117,7 @@
maxUDPPayloadSize: maxUDPPayloadSize,
maxAckDelay: maxAckDelay,
disableActiveMigration: true,
+ initialMaxData: config.maxConnReadBufferSize(),
initialMaxStreamDataBidiLocal: config.maxStreamReadBufferSize(),
initialMaxStreamDataBidiRemote: config.maxStreamReadBufferSize(),
initialMaxStreamDataUni: config.maxStreamReadBufferSize(),
diff --git a/internal/quic/conn_flow.go b/internal/quic/conn_flow.go
new file mode 100644
index 0000000..790210b
--- /dev/null
+++ b/internal/quic/conn_flow.go
@@ -0,0 +1,111 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+ "sync/atomic"
+ "time"
+)
+
+// connInflow tracks connection-level flow control for data sent by the peer to us.
+//
+// There are four byte offsets of significance in the stream of data received from the peer,
+// each >= to the previous:
+//
+// - bytes read by the user
+// - bytes received from the peer
+// - limit sent to the peer in a MAX_DATA frame
+// - potential new limit to sent to the peer
+//
+// We maintain a flow control window, so as bytes are read by the user
+// the potential limit is extended correspondingly.
+//
+// We keep an atomic counter of bytes read by the user and not yet applied to the
+// potential limit (credit). When this count grows large enough, we update the
+// new limit to send and mark that we need to send a new MAX_DATA frame.
+type connInflow struct {
+ sent sentVal // set when we need to send a MAX_DATA update to the peer
+ usedLimit int64 // total bytes sent by the peer, must be less than sentLimit
+ sentLimit int64 // last MAX_DATA sent to the peer
+ newLimit int64 // new MAX_DATA to send
+
+ credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window
+}
+
+func (c *Conn) inflowInit() {
+ // The initial MAX_DATA limit is sent as a transport parameter.
+ c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize()
+ c.streams.inflow.newLimit = c.streams.inflow.sentLimit
+}
+
+// handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream.
+// We may extend the peer's flow control window.
+//
+// This is called indirectly by the user, via Read or CloseRead.
+func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
+ if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
+ // We should send a MAX_DATA update to the peer.
+ // Record this on the Conn's main loop.
+ c.sendMsg(func(now time.Time, c *Conn) {
+ c.sendMaxDataUpdate()
+ })
+ }
+}
+
+// handleStreamBytesReadOnLoop extends the peer's flow control window after
+// data has been discarded due to a RESET_STREAM frame.
+//
+// This is called on the conn's loop.
+func (c *Conn) handleStreamBytesReadOnLoop(n int64) {
+ if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
+ c.sendMaxDataUpdate()
+ }
+}
+
+func (c *Conn) sendMaxDataUpdate() {
+ c.streams.inflow.sent.setUnsent()
+ // Apply current credit to the limit.
+ // We don't strictly need to do this here
+ // since appendMaxDataFrame will do so as well,
+ // but this avoids redundant trips down this path
+ // if the MAX_DATA frame doesn't go out right away.
+ c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
+}
+
+func (c *Conn) shouldUpdateFlowControl(credit int64) bool {
+ return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit)
+}
+
+// handleStreamBytesReceived records that the peer has sent us stream data.
+func (c *Conn) handleStreamBytesReceived(n int64) error {
+ c.streams.inflow.usedLimit += n
+ if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit {
+ return localTransportError(errFlowControl)
+ }
+ return nil
+}
+
+// appendMaxDataFrame appends a MAX_DATA frame to the current packet.
+//
+// It returns true if no more frames need appending,
+// false if it could not fit a frame in the current packet.
+func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool {
+ if c.streams.inflow.sent.shouldSendPTO(pto) {
+ // Add any unapplied credit to the new limit now.
+ c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
+ if !w.appendMaxDataFrame(c.streams.inflow.newLimit) {
+ return false
+ }
+ c.streams.inflow.sent.setSent(pnum)
+ }
+ return true
+}
+
+// ackOrLossMaxData records the fate of a MAX_DATA frame.
+func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
+ c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
+}
diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go
new file mode 100644
index 0000000..f01a738
--- /dev/null
+++ b/internal/quic/conn_flow_test.go
@@ -0,0 +1,186 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import "testing"
+
+func TestConnInflowReturnOnRead(t *testing.T) {
+ ctx := canceledContext()
+ tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+ c.MaxConnReadBufferSize = 64
+ })
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: make([]byte, 64),
+ })
+ const readSize = 8
+ if n, err := s.ReadContext(ctx, make([]byte, readSize)); n != readSize || err != nil {
+ t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, readSize)
+ }
+ tc.wantFrame("available window increases, send a MAX_DATA",
+ packetType1RTT, debugFrameMaxData{
+ max: 64 + readSize,
+ })
+ if n, err := s.ReadContext(ctx, make([]byte, 64)); n != 64-readSize || err != nil {
+ t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, 64-readSize)
+ }
+ tc.wantFrame("available window increases, send a MAX_DATA",
+ packetType1RTT, debugFrameMaxData{
+ max: 128,
+ })
+}
+
+func TestConnInflowReturnOnClose(t *testing.T) {
+ tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+ c.MaxConnReadBufferSize = 64
+ })
+ tc.ignoreFrame(frameTypeStopSending)
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: make([]byte, 64),
+ })
+ s.CloseRead()
+ tc.wantFrame("closing stream updates connection-level flow control",
+ packetType1RTT, debugFrameMaxData{
+ max: 128,
+ })
+}
+
+func TestConnInflowReturnOnReset(t *testing.T) {
+ tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+ c.MaxConnReadBufferSize = 64
+ })
+ tc.ignoreFrame(frameTypeStopSending)
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: make([]byte, 32),
+ })
+ tc.writeFrames(packetType1RTT, debugFrameResetStream{
+ id: s.id,
+ finalSize: 64,
+ })
+ s.CloseRead()
+ tc.wantFrame("receiving stream reseet updates connection-level flow control",
+ packetType1RTT, debugFrameMaxData{
+ max: 128,
+ })
+}
+
+func TestConnInflowStreamViolation(t *testing.T) {
+ tc := newTestConn(t, serverSide, func(c *Config) {
+ c.MaxConnReadBufferSize = 100
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+ // Total MAX_DATA consumed: 50
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, bidiStream, 0),
+ data: make([]byte, 50),
+ })
+ // Total MAX_DATA consumed: 80
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, uniStream, 0),
+ off: 20,
+ data: make([]byte, 10),
+ })
+ // Total MAX_DATA consumed: 100
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, bidiStream, 0),
+ off: 70,
+ fin: true,
+ })
+ // This stream has already consumed quota for these bytes.
+ // Total MAX_DATA consumed: 100
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, uniStream, 0),
+ data: make([]byte, 20),
+ })
+ tc.wantIdle("peer has consumed all MAX_DATA quota")
+
+ // Total MAX_DATA consumed: 101
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, bidiStream, 2),
+ data: make([]byte, 1),
+ })
+ tc.wantFrame("peer violates MAX_DATA limit",
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: errFlowControl,
+ })
+}
+
+func TestConnInflowResetViolation(t *testing.T) {
+ tc := newTestConn(t, serverSide, func(c *Config) {
+ c.MaxConnReadBufferSize = 100
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, bidiStream, 0),
+ data: make([]byte, 100),
+ })
+ tc.wantIdle("peer has consumed all MAX_DATA quota")
+
+ tc.writeFrames(packetType1RTT, debugFrameResetStream{
+ id: newStreamID(clientSide, uniStream, 0),
+ finalSize: 0,
+ })
+ tc.wantIdle("stream reset does not consume MAX_DATA quota, no error")
+
+ tc.writeFrames(packetType1RTT, debugFrameResetStream{
+ id: newStreamID(clientSide, uniStream, 1),
+ finalSize: 1,
+ })
+ tc.wantFrame("RESET_STREAM final size violates MAX_DATA limit",
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: errFlowControl,
+ })
+}
+
+func TestConnInflowMultipleStreams(t *testing.T) {
+ ctx := canceledContext()
+ tc := newTestConn(t, serverSide, func(c *Config) {
+ c.MaxConnReadBufferSize = 128
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+
+ var streams []*Stream
+ for _, id := range []streamID{
+ newStreamID(clientSide, uniStream, 0),
+ newStreamID(clientSide, uniStream, 1),
+ newStreamID(clientSide, bidiStream, 0),
+ newStreamID(clientSide, bidiStream, 1),
+ } {
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: id,
+ data: make([]byte, 32),
+ })
+ s, err := tc.conn.AcceptStream(ctx)
+ if err != nil {
+ t.Fatalf("AcceptStream() = %v", err)
+ }
+ streams = append(streams, s)
+ if n, err := s.ReadContext(ctx, make([]byte, 1)); err != nil || n != 1 {
+ t.Fatalf("s.Read() = %v, %v; want 1, nil", n, err)
+ }
+ }
+ tc.wantIdle("streams have read data, but not enough to update MAX_DATA")
+
+ if n, err := streams[0].ReadContext(ctx, make([]byte, 32)); err != nil || n != 31 {
+ t.Fatalf("s.Read() = %v, %v; want 31, nil", n, err)
+ }
+ tc.wantFrame("read enough data to trigger a MAX_DATA update",
+ packetType1RTT, debugFrameMaxData{
+ max: 128 + 32 + 1 + 1 + 1,
+ })
+
+ streams[2].CloseRead()
+ tc.wantFrame("closed stream triggers another MAX_DATA update",
+ packetType1RTT, debugFrameMaxData{
+ max: 128 + 32 + 1 + 32 + 1,
+ })
+}
diff --git a/internal/quic/conn_loss.go b/internal/quic/conn_loss.go
index b8146a4..85bda31 100644
--- a/internal/quic/conn_loss.go
+++ b/internal/quic/conn_loss.go
@@ -44,6 +44,8 @@
case frameTypeCrypto:
start, end := sent.nextRange()
c.crypto[space].ackOrLoss(start, end, fate)
+ case frameTypeMaxData:
+ c.ackOrLossMaxData(sent.num, fate)
case frameTypeResetStream,
frameTypeStopSending,
frameTypeMaxStreamData,
diff --git a/internal/quic/conn_loss_test.go b/internal/quic/conn_loss_test.go
index f74ec7e..9b88462 100644
--- a/internal/quic/conn_loss_test.go
+++ b/internal/quic/conn_loss_test.go
@@ -289,18 +289,58 @@
tc.wantIdle("no more frames sent after packet loss")
}
+func TestLostMaxDataFrame(t *testing.T) {
+ // "An updated value is sent in a MAX_DATA frame if the packet
+ // containing the most recently sent MAX_DATA frame is declared lost [...]"
+ // https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.7
+ lostFrameTest(t, func(t *testing.T, pto bool) {
+ const maxWindowSize = 32
+ buf := make([]byte, maxWindowSize)
+ tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+ c.MaxConnReadBufferSize = 32
+ })
+
+ // We send MAX_DATA = 63.
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: make([]byte, maxWindowSize),
+ })
+ if n, err := s.Read(buf[:maxWindowSize-1]); err != nil || n != maxWindowSize-1 {
+ t.Fatalf("Read() = %v, %v; want %v, nil", n, err, maxWindowSize-1)
+ }
+ tc.wantFrame("conn window is extended after reading data",
+ packetType1RTT, debugFrameMaxData{
+ max: (maxWindowSize * 2) - 1,
+ })
+
+ // MAX_DATA = 64, which is only one more byte, so we don't send the frame.
+ if n, err := s.Read(buf); err != nil || n != 1 {
+ t.Fatalf("Read() = %v, %v; want %v, nil", n, err, 1)
+ }
+ tc.wantIdle("read doesn't extend window enough to send another MAX_DATA")
+
+ // The MAX_DATA = 63 packet was lost, so we send 64.
+ tc.triggerLossOrPTO(packetType1RTT, pto)
+ tc.wantFrame("resent MAX_DATA includes most current value",
+ packetType1RTT, debugFrameMaxData{
+ max: maxWindowSize * 2,
+ })
+ })
+}
+
func TestLostMaxStreamDataFrame(t *testing.T) {
// "[...] an updated value is sent when the packet containing
// the most recent MAX_STREAM_DATA frame for a stream is lost"
// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.8
lostFrameTest(t, func(t *testing.T, pto bool) {
- const maxWindowSize = 10
+ const maxWindowSize = 32
buf := make([]byte, maxWindowSize)
tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
c.MaxStreamReadBufferSize = maxWindowSize
})
- // We send MAX_STREAM_DATA = 19.
+ // We send MAX_STREAM_DATA = 63.
tc.writeFrames(packetType1RTT, debugFrameStream{
id: s.id,
off: 0,
@@ -315,13 +355,13 @@
max: (maxWindowSize * 2) - 1,
})
- // MAX_STREAM_DATA = 20, which is only one more byte, so we don't send the frame.
+ // MAX_STREAM_DATA = 64, which is only one more byte, so we don't send the frame.
if n, err := s.Read(buf); err != nil || n != 1 {
t.Fatalf("Read() = %v, %v; want %v, nil", n, err, 1)
}
tc.wantIdle("read doesn't extend window enough to send another MAX_STREAM_DATA")
- // The MAX_STREAM_DATA = 19 packet was lost, so we send 20.
+ // The MAX_STREAM_DATA = 63 packet was lost, so we send 64.
tc.triggerLossOrPTO(packetType1RTT, pto)
tc.wantFrame("resent MAX_STREAM_DATA includes most current value",
packetType1RTT, debugFrameMaxStreamData{
diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go
index 76e9bf9..0a72d26 100644
--- a/internal/quic/conn_streams.go
+++ b/internal/quic/conn_streams.go
@@ -27,6 +27,9 @@
peerInitialMaxStreamDataRemote [streamTypeCount]int64 // streams opened by us
peerInitialMaxStreamDataBidiLocal int64 // streams opened by them
+ // Connection-level flow control.
+ inflow connInflow
+
// Streams with frames to send are stored in a circular linked list.
// sendHead is the next stream to write, or nil if there are no streams
// with data to send. sendTail is the last stream to write.
@@ -43,6 +46,7 @@
c.streams.localLimit[uniStream].init()
c.streams.remoteLimit[bidiStream].init(c.config.maxBidiRemoteStreams())
c.streams.remoteLimit[uniStream].init(c.config.maxUniRemoteStreams())
+ c.inflowInit()
}
// AcceptStream waits for and returns the next stream created by the peer.
@@ -212,6 +216,11 @@
// It returns true if no more frames need appending,
// false if not everything fit in the current packet.
func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
+ // MAX_DATA
+ if !c.appendMaxDataFrame(w, pnum, pto) {
+ return false
+ }
+
// MAX_STREAM_DATA
if !c.streams.remoteLimit[uniStream].appendFrame(w, uniStream, pnum, pto) {
return false
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index fbc3633..84c437d 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -156,9 +156,10 @@
start := s.in.start
end := start + int64(len(b))
s.in.copy(start, b)
+ s.conn.handleStreamBytesReadOffLoop(int64(len(b)))
s.in.discardBefore(end)
if s.insize == -1 || s.insize > s.inwin {
- if shouldUpdateFlowControl(s.inwin-s.in.start, s.inmaxbuf) {
+ if shouldUpdateFlowControl(s.inmaxbuf, s.in.start+s.inmaxbuf-s.inwin) {
// Update stream flow control with a STREAM_MAX_DATA frame.
s.insendmax.setUnsent()
}
@@ -173,10 +174,8 @@
//
// We want to balance keeping the peer well-supplied with flow control with not sending
// many small updates.
-func shouldUpdateFlowControl(curwin, maxwin int64) bool {
- // Update flow control if doing so gives the peer at least 64k tokens,
- // or if it will double the current window.
- return maxwin-curwin >= 64<<10 || curwin*2 < maxwin
+func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool {
+ return addedWindow >= maxWindow/8
}
// Write writes data to the stream.
@@ -295,6 +294,7 @@
} else {
s.inclosed.set()
}
+ s.conn.handleStreamBytesReadOffLoop(s.in.end - s.in.start)
s.in.discardBefore(s.in.end)
}
@@ -470,6 +470,12 @@
// Either way, we can discard this frame.
return nil
}
+ if s.insize == -1 && end > s.in.end {
+ added := end - s.in.end
+ if err := s.conn.handleStreamBytesReceived(added); err != nil {
+ return err
+ }
+ }
s.in.writeAt(b, off)
s.inset.add(off, end)
if fin {
@@ -492,6 +498,13 @@
// The stream was already reset.
return nil
}
+ if s.insize == -1 {
+ added := finalSize - s.in.end
+ if err := s.conn.handleStreamBytesReceived(added); err != nil {
+ return err
+ }
+ }
+ s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start)
s.in.discardBefore(s.in.end)
s.inresetcode = int64(code)
s.insize = finalSize