quic: inbound connection-level flow control

Track the peer's connection level flow control window.
Update the window with MAX_DATA frames as data is consumed by the user.

Adjust shouldUpdateFlowControl so that we can use the same algorithm
for both stream-level and connection-level flow control.
The new algorithm is to send an update when doing so extends the
peer's window by at least 1/8 of the maximum window size.

For golang/go#58547

Change-Id: I2d8d82d06f0cb4b2ac25b3396c3cf4126a96e9cc
Reviewed-on: https://go-review.googlesource.com/c/net/+/526716
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/quic/config.go b/internal/quic/config.go
index d68e2c7..b390d69 100644
--- a/internal/quic/config.go
+++ b/internal/quic/config.go
@@ -41,6 +41,12 @@
 	// If zero, the default value of 1MiB is used.
 	// If negative, the limit is zero.
 	MaxStreamWriteBufferSize int64
+
+	// MaxConnReadBufferSize is the maximum amount of data sent by the peer that a
+	// connection will buffer for reading, across all streams.
+	// If zero, the default value of 1MiB is used.
+	// If negative, the limit is zero.
+	MaxConnReadBufferSize int64
 }
 
 func configDefault(v, def, limit int64) int64 {
@@ -69,3 +75,7 @@
 func (c *Config) maxStreamWriteBufferSize() int64 {
 	return configDefault(c.MaxStreamWriteBufferSize, 1<<20, maxVarint)
 }
+
+func (c *Config) maxConnReadBufferSize() int64 {
+	return configDefault(c.MaxConnReadBufferSize, 1<<20, maxVarint)
+}
diff --git a/internal/quic/config_test.go b/internal/quic/config_test.go
index b99ffef..d292854 100644
--- a/internal/quic/config_test.go
+++ b/internal/quic/config_test.go
@@ -10,6 +10,7 @@
 
 func TestConfigTransportParameters(t *testing.T) {
 	const (
+		wantInitialMaxData        = int64(1)
 		wantInitialMaxStreamData  = int64(2)
 		wantInitialMaxStreamsBidi = int64(3)
 		wantInitialMaxStreamsUni  = int64(4)
@@ -18,12 +19,16 @@
 		c.MaxBidiRemoteStreams = wantInitialMaxStreamsBidi
 		c.MaxUniRemoteStreams = wantInitialMaxStreamsUni
 		c.MaxStreamReadBufferSize = wantInitialMaxStreamData
+		c.MaxConnReadBufferSize = wantInitialMaxData
 	})
 	tc.handshake()
 	if tc.sentTransportParameters == nil {
 		t.Fatalf("conn didn't send transport parameters during handshake")
 	}
 	p := tc.sentTransportParameters
+	if got, want := p.initialMaxData, wantInitialMaxData; got != want {
+		t.Errorf("initial_max_data = %v, want %v", got, want)
+	}
 	if got, want := p.initialMaxStreamDataBidiLocal, wantInitialMaxStreamData; got != want {
 		t.Errorf("initial_max_stream_data_bidi_local = %v, want %v", got, want)
 	}
diff --git a/internal/quic/conn.go b/internal/quic/conn.go
index 117364f..0ab6f69 100644
--- a/internal/quic/conn.go
+++ b/internal/quic/conn.go
@@ -117,6 +117,7 @@
 		maxUDPPayloadSize:              maxUDPPayloadSize,
 		maxAckDelay:                    maxAckDelay,
 		disableActiveMigration:         true,
+		initialMaxData:                 config.maxConnReadBufferSize(),
 		initialMaxStreamDataBidiLocal:  config.maxStreamReadBufferSize(),
 		initialMaxStreamDataBidiRemote: config.maxStreamReadBufferSize(),
 		initialMaxStreamDataUni:        config.maxStreamReadBufferSize(),
diff --git a/internal/quic/conn_flow.go b/internal/quic/conn_flow.go
new file mode 100644
index 0000000..790210b
--- /dev/null
+++ b/internal/quic/conn_flow.go
@@ -0,0 +1,111 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+	"sync/atomic"
+	"time"
+)
+
+// connInflow tracks connection-level flow control for data sent by the peer to us.
+//
+// There are four byte offsets of significance in the stream of data received from the peer,
+// each >= to the previous:
+//
+//   - bytes read by the user
+//   - bytes received from the peer
+//   - limit sent to the peer in a MAX_DATA frame
+//   - potential new limit to sent to the peer
+//
+// We maintain a flow control window, so as bytes are read by the user
+// the potential limit is extended correspondingly.
+//
+// We keep an atomic counter of bytes read by the user and not yet applied to the
+// potential limit (credit). When this count grows large enough, we update the
+// new limit to send and mark that we need to send a new MAX_DATA frame.
+type connInflow struct {
+	sent      sentVal // set when we need to send a MAX_DATA update to the peer
+	usedLimit int64   // total bytes sent by the peer, must be less than sentLimit
+	sentLimit int64   // last MAX_DATA sent to the peer
+	newLimit  int64   // new MAX_DATA to send
+
+	credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window
+}
+
+func (c *Conn) inflowInit() {
+	// The initial MAX_DATA limit is sent as a transport parameter.
+	c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize()
+	c.streams.inflow.newLimit = c.streams.inflow.sentLimit
+}
+
+// handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream.
+// We may extend the peer's flow control window.
+//
+// This is called indirectly by the user, via Read or CloseRead.
+func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
+	if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
+		// We should send a MAX_DATA update to the peer.
+		// Record this on the Conn's main loop.
+		c.sendMsg(func(now time.Time, c *Conn) {
+			c.sendMaxDataUpdate()
+		})
+	}
+}
+
+// handleStreamBytesReadOnLoop extends the peer's flow control window after
+// data has been discarded due to a RESET_STREAM frame.
+//
+// This is called on the conn's loop.
+func (c *Conn) handleStreamBytesReadOnLoop(n int64) {
+	if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
+		c.sendMaxDataUpdate()
+	}
+}
+
+func (c *Conn) sendMaxDataUpdate() {
+	c.streams.inflow.sent.setUnsent()
+	// Apply current credit to the limit.
+	// We don't strictly need to do this here
+	// since appendMaxDataFrame will do so as well,
+	// but this avoids redundant trips down this path
+	// if the MAX_DATA frame doesn't go out right away.
+	c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
+}
+
+func (c *Conn) shouldUpdateFlowControl(credit int64) bool {
+	return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit)
+}
+
+// handleStreamBytesReceived records that the peer has sent us stream data.
+func (c *Conn) handleStreamBytesReceived(n int64) error {
+	c.streams.inflow.usedLimit += n
+	if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit {
+		return localTransportError(errFlowControl)
+	}
+	return nil
+}
+
+// appendMaxDataFrame appends a MAX_DATA frame to the current packet.
+//
+// It returns true if no more frames need appending,
+// false if it could not fit a frame in the current packet.
+func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool {
+	if c.streams.inflow.sent.shouldSendPTO(pto) {
+		// Add any unapplied credit to the new limit now.
+		c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
+		if !w.appendMaxDataFrame(c.streams.inflow.newLimit) {
+			return false
+		}
+		c.streams.inflow.sent.setSent(pnum)
+	}
+	return true
+}
+
+// ackOrLossMaxData records the fate of a MAX_DATA frame.
+func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
+	c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
+}
diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go
new file mode 100644
index 0000000..f01a738
--- /dev/null
+++ b/internal/quic/conn_flow_test.go
@@ -0,0 +1,186 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import "testing"
+
+func TestConnInflowReturnOnRead(t *testing.T) {
+	ctx := canceledContext()
+	tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+		c.MaxConnReadBufferSize = 64
+	})
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   s.id,
+		data: make([]byte, 64),
+	})
+	const readSize = 8
+	if n, err := s.ReadContext(ctx, make([]byte, readSize)); n != readSize || err != nil {
+		t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, readSize)
+	}
+	tc.wantFrame("available window increases, send a MAX_DATA",
+		packetType1RTT, debugFrameMaxData{
+			max: 64 + readSize,
+		})
+	if n, err := s.ReadContext(ctx, make([]byte, 64)); n != 64-readSize || err != nil {
+		t.Fatalf("s.Read() = %v, %v; want %v, nil", n, err, 64-readSize)
+	}
+	tc.wantFrame("available window increases, send a MAX_DATA",
+		packetType1RTT, debugFrameMaxData{
+			max: 128,
+		})
+}
+
+func TestConnInflowReturnOnClose(t *testing.T) {
+	tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+		c.MaxConnReadBufferSize = 64
+	})
+	tc.ignoreFrame(frameTypeStopSending)
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   s.id,
+		data: make([]byte, 64),
+	})
+	s.CloseRead()
+	tc.wantFrame("closing stream updates connection-level flow control",
+		packetType1RTT, debugFrameMaxData{
+			max: 128,
+		})
+}
+
+func TestConnInflowReturnOnReset(t *testing.T) {
+	tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+		c.MaxConnReadBufferSize = 64
+	})
+	tc.ignoreFrame(frameTypeStopSending)
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   s.id,
+		data: make([]byte, 32),
+	})
+	tc.writeFrames(packetType1RTT, debugFrameResetStream{
+		id:        s.id,
+		finalSize: 64,
+	})
+	s.CloseRead()
+	tc.wantFrame("receiving stream reseet updates connection-level flow control",
+		packetType1RTT, debugFrameMaxData{
+			max: 128,
+		})
+}
+
+func TestConnInflowStreamViolation(t *testing.T) {
+	tc := newTestConn(t, serverSide, func(c *Config) {
+		c.MaxConnReadBufferSize = 100
+	})
+	tc.handshake()
+	tc.ignoreFrame(frameTypeAck)
+	// Total MAX_DATA consumed: 50
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   newStreamID(clientSide, bidiStream, 0),
+		data: make([]byte, 50),
+	})
+	// Total MAX_DATA consumed: 80
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   newStreamID(clientSide, uniStream, 0),
+		off:  20,
+		data: make([]byte, 10),
+	})
+	// Total MAX_DATA consumed: 100
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:  newStreamID(clientSide, bidiStream, 0),
+		off: 70,
+		fin: true,
+	})
+	// This stream has already consumed quota for these bytes.
+	// Total MAX_DATA consumed: 100
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   newStreamID(clientSide, uniStream, 0),
+		data: make([]byte, 20),
+	})
+	tc.wantIdle("peer has consumed all MAX_DATA quota")
+
+	// Total MAX_DATA consumed: 101
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   newStreamID(clientSide, bidiStream, 2),
+		data: make([]byte, 1),
+	})
+	tc.wantFrame("peer violates MAX_DATA limit",
+		packetType1RTT, debugFrameConnectionCloseTransport{
+			code: errFlowControl,
+		})
+}
+
+func TestConnInflowResetViolation(t *testing.T) {
+	tc := newTestConn(t, serverSide, func(c *Config) {
+		c.MaxConnReadBufferSize = 100
+	})
+	tc.handshake()
+	tc.ignoreFrame(frameTypeAck)
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   newStreamID(clientSide, bidiStream, 0),
+		data: make([]byte, 100),
+	})
+	tc.wantIdle("peer has consumed all MAX_DATA quota")
+
+	tc.writeFrames(packetType1RTT, debugFrameResetStream{
+		id:        newStreamID(clientSide, uniStream, 0),
+		finalSize: 0,
+	})
+	tc.wantIdle("stream reset does not consume MAX_DATA quota, no error")
+
+	tc.writeFrames(packetType1RTT, debugFrameResetStream{
+		id:        newStreamID(clientSide, uniStream, 1),
+		finalSize: 1,
+	})
+	tc.wantFrame("RESET_STREAM final size violates MAX_DATA limit",
+		packetType1RTT, debugFrameConnectionCloseTransport{
+			code: errFlowControl,
+		})
+}
+
+func TestConnInflowMultipleStreams(t *testing.T) {
+	ctx := canceledContext()
+	tc := newTestConn(t, serverSide, func(c *Config) {
+		c.MaxConnReadBufferSize = 128
+	})
+	tc.handshake()
+	tc.ignoreFrame(frameTypeAck)
+
+	var streams []*Stream
+	for _, id := range []streamID{
+		newStreamID(clientSide, uniStream, 0),
+		newStreamID(clientSide, uniStream, 1),
+		newStreamID(clientSide, bidiStream, 0),
+		newStreamID(clientSide, bidiStream, 1),
+	} {
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   id,
+			data: make([]byte, 32),
+		})
+		s, err := tc.conn.AcceptStream(ctx)
+		if err != nil {
+			t.Fatalf("AcceptStream() = %v", err)
+		}
+		streams = append(streams, s)
+		if n, err := s.ReadContext(ctx, make([]byte, 1)); err != nil || n != 1 {
+			t.Fatalf("s.Read() = %v, %v; want 1, nil", n, err)
+		}
+	}
+	tc.wantIdle("streams have read data, but not enough to update MAX_DATA")
+
+	if n, err := streams[0].ReadContext(ctx, make([]byte, 32)); err != nil || n != 31 {
+		t.Fatalf("s.Read() = %v, %v; want 31, nil", n, err)
+	}
+	tc.wantFrame("read enough data to trigger a MAX_DATA update",
+		packetType1RTT, debugFrameMaxData{
+			max: 128 + 32 + 1 + 1 + 1,
+		})
+
+	streams[2].CloseRead()
+	tc.wantFrame("closed stream triggers another MAX_DATA update",
+		packetType1RTT, debugFrameMaxData{
+			max: 128 + 32 + 1 + 32 + 1,
+		})
+}
diff --git a/internal/quic/conn_loss.go b/internal/quic/conn_loss.go
index b8146a4..85bda31 100644
--- a/internal/quic/conn_loss.go
+++ b/internal/quic/conn_loss.go
@@ -44,6 +44,8 @@
 		case frameTypeCrypto:
 			start, end := sent.nextRange()
 			c.crypto[space].ackOrLoss(start, end, fate)
+		case frameTypeMaxData:
+			c.ackOrLossMaxData(sent.num, fate)
 		case frameTypeResetStream,
 			frameTypeStopSending,
 			frameTypeMaxStreamData,
diff --git a/internal/quic/conn_loss_test.go b/internal/quic/conn_loss_test.go
index f74ec7e..9b88462 100644
--- a/internal/quic/conn_loss_test.go
+++ b/internal/quic/conn_loss_test.go
@@ -289,18 +289,58 @@
 	tc.wantIdle("no more frames sent after packet loss")
 }
 
+func TestLostMaxDataFrame(t *testing.T) {
+	// "An updated value is sent in a MAX_DATA frame if the packet
+	// containing the most recently sent MAX_DATA frame is declared lost [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.7
+	lostFrameTest(t, func(t *testing.T, pto bool) {
+		const maxWindowSize = 32
+		buf := make([]byte, maxWindowSize)
+		tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+			c.MaxConnReadBufferSize = 32
+		})
+
+		// We send MAX_DATA = 63.
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  0,
+			data: make([]byte, maxWindowSize),
+		})
+		if n, err := s.Read(buf[:maxWindowSize-1]); err != nil || n != maxWindowSize-1 {
+			t.Fatalf("Read() = %v, %v; want %v, nil", n, err, maxWindowSize-1)
+		}
+		tc.wantFrame("conn window is extended after reading data",
+			packetType1RTT, debugFrameMaxData{
+				max: (maxWindowSize * 2) - 1,
+			})
+
+		// MAX_DATA = 64, which is only one more byte, so we don't send the frame.
+		if n, err := s.Read(buf); err != nil || n != 1 {
+			t.Fatalf("Read() = %v, %v; want %v, nil", n, err, 1)
+		}
+		tc.wantIdle("read doesn't extend window enough to send another MAX_DATA")
+
+		// The MAX_DATA = 63 packet was lost, so we send 64.
+		tc.triggerLossOrPTO(packetType1RTT, pto)
+		tc.wantFrame("resent MAX_DATA includes most current value",
+			packetType1RTT, debugFrameMaxData{
+				max: maxWindowSize * 2,
+			})
+	})
+}
+
 func TestLostMaxStreamDataFrame(t *testing.T) {
 	// "[...] an updated value is sent when the packet containing
 	// the most recent MAX_STREAM_DATA frame for a stream is lost"
 	// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.8
 	lostFrameTest(t, func(t *testing.T, pto bool) {
-		const maxWindowSize = 10
+		const maxWindowSize = 32
 		buf := make([]byte, maxWindowSize)
 		tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
 			c.MaxStreamReadBufferSize = maxWindowSize
 		})
 
-		// We send MAX_STREAM_DATA = 19.
+		// We send MAX_STREAM_DATA = 63.
 		tc.writeFrames(packetType1RTT, debugFrameStream{
 			id:   s.id,
 			off:  0,
@@ -315,13 +355,13 @@
 				max: (maxWindowSize * 2) - 1,
 			})
 
-		// MAX_STREAM_DATA = 20, which is only one more byte, so we don't send the frame.
+		// MAX_STREAM_DATA = 64, which is only one more byte, so we don't send the frame.
 		if n, err := s.Read(buf); err != nil || n != 1 {
 			t.Fatalf("Read() = %v, %v; want %v, nil", n, err, 1)
 		}
 		tc.wantIdle("read doesn't extend window enough to send another MAX_STREAM_DATA")
 
-		// The MAX_STREAM_DATA = 19 packet was lost, so we send 20.
+		// The MAX_STREAM_DATA = 63 packet was lost, so we send 64.
 		tc.triggerLossOrPTO(packetType1RTT, pto)
 		tc.wantFrame("resent MAX_STREAM_DATA includes most current value",
 			packetType1RTT, debugFrameMaxStreamData{
diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go
index 76e9bf9..0a72d26 100644
--- a/internal/quic/conn_streams.go
+++ b/internal/quic/conn_streams.go
@@ -27,6 +27,9 @@
 	peerInitialMaxStreamDataRemote    [streamTypeCount]int64 // streams opened by us
 	peerInitialMaxStreamDataBidiLocal int64                  // streams opened by them
 
+	// Connection-level flow control.
+	inflow connInflow
+
 	// Streams with frames to send are stored in a circular linked list.
 	// sendHead is the next stream to write, or nil if there are no streams
 	// with data to send. sendTail is the last stream to write.
@@ -43,6 +46,7 @@
 	c.streams.localLimit[uniStream].init()
 	c.streams.remoteLimit[bidiStream].init(c.config.maxBidiRemoteStreams())
 	c.streams.remoteLimit[uniStream].init(c.config.maxUniRemoteStreams())
+	c.inflowInit()
 }
 
 // AcceptStream waits for and returns the next stream created by the peer.
@@ -212,6 +216,11 @@
 // It returns true if no more frames need appending,
 // false if not everything fit in the current packet.
 func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
+	// MAX_DATA
+	if !c.appendMaxDataFrame(w, pnum, pto) {
+		return false
+	}
+
 	// MAX_STREAM_DATA
 	if !c.streams.remoteLimit[uniStream].appendFrame(w, uniStream, pnum, pto) {
 		return false
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index fbc3633..84c437d 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -156,9 +156,10 @@
 	start := s.in.start
 	end := start + int64(len(b))
 	s.in.copy(start, b)
+	s.conn.handleStreamBytesReadOffLoop(int64(len(b)))
 	s.in.discardBefore(end)
 	if s.insize == -1 || s.insize > s.inwin {
-		if shouldUpdateFlowControl(s.inwin-s.in.start, s.inmaxbuf) {
+		if shouldUpdateFlowControl(s.inmaxbuf, s.in.start+s.inmaxbuf-s.inwin) {
 			// Update stream flow control with a STREAM_MAX_DATA frame.
 			s.insendmax.setUnsent()
 		}
@@ -173,10 +174,8 @@
 //
 // We want to balance keeping the peer well-supplied with flow control with not sending
 // many small updates.
-func shouldUpdateFlowControl(curwin, maxwin int64) bool {
-	// Update flow control if doing so gives the peer at least 64k tokens,
-	// or if it will double the current window.
-	return maxwin-curwin >= 64<<10 || curwin*2 < maxwin
+func shouldUpdateFlowControl(maxWindow, addedWindow int64) bool {
+	return addedWindow >= maxWindow/8
 }
 
 // Write writes data to the stream.
@@ -295,6 +294,7 @@
 	} else {
 		s.inclosed.set()
 	}
+	s.conn.handleStreamBytesReadOffLoop(s.in.end - s.in.start)
 	s.in.discardBefore(s.in.end)
 }
 
@@ -470,6 +470,12 @@
 		// Either way, we can discard this frame.
 		return nil
 	}
+	if s.insize == -1 && end > s.in.end {
+		added := end - s.in.end
+		if err := s.conn.handleStreamBytesReceived(added); err != nil {
+			return err
+		}
+	}
 	s.in.writeAt(b, off)
 	s.inset.add(off, end)
 	if fin {
@@ -492,6 +498,13 @@
 		// The stream was already reset.
 		return nil
 	}
+	if s.insize == -1 {
+		added := finalSize - s.in.end
+		if err := s.conn.handleStreamBytesReceived(added); err != nil {
+			return err
+		}
+	}
+	s.conn.handleStreamBytesReadOnLoop(finalSize - s.in.start)
 	s.in.discardBefore(s.in.end)
 	s.inresetcode = int64(code)
 	s.insize = finalSize