quic: limits on the number of open streams

Honor the peer's max stream limit. New stream creation blocks
until stream quota is available.

Enforce the number of open streams created by the peer.
Send updated stream quota as streams are closed locally.

Remove streams from the conn's set when they are fully closed.

For golang/go#58547

Change-Id: Iff969c5cb8e8e0c6ad91d217a92c38bceabef8ee
Reviewed-on: https://go-review.googlesource.com/c/net/+/524036
Reviewed-by: Jonathan Amsterdam <jba@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/internal/quic/config.go b/internal/quic/config.go
index df49357..f78e811 100644
--- a/internal/quic/config.go
+++ b/internal/quic/config.go
@@ -18,6 +18,18 @@
 	// It must be non-nil and include at least one certificate or else set GetCertificate.
 	TLSConfig *tls.Config
 
+	// MaxBidiRemoteStreams limits the number of simultaneous bidirectional streams
+	// a peer may open.
+	// If zero, the default value of 100 is used.
+	// If negative, the limit is zero.
+	MaxBidiRemoteStreams int64
+
+	// MaxUniRemoteStreams limits the number of simultaneous unidirectional streams
+	// a peer may open.
+	// If zero, the default value of 100 is used.
+	// If negative, the limit is zero.
+	MaxUniRemoteStreams int64
+
 	// StreamReadBufferSize is the maximum amount of data sent by the peer that a
 	// stream will buffer for reading.
 	// If zero, the default value of 1MiB is used.
@@ -31,15 +43,29 @@
 	StreamWriteBufferSize int64
 }
 
-func configDefault(v, def int64) int64 {
-	switch v {
-	case -1:
-		return 0
-	case 0:
+func configDefault(v, def, limit int64) int64 {
+	switch {
+	case v == 0:
 		return def
+	case v < 0:
+		return 0
+	default:
+		return min(v, limit)
 	}
-	return v
 }
 
-func (c *Config) streamReadBufferSize() int64  { return configDefault(c.StreamReadBufferSize, 1<<20) }
-func (c *Config) streamWriteBufferSize() int64 { return configDefault(c.StreamWriteBufferSize, 1<<20) }
+func (c *Config) maxBidiRemoteStreams() int64 {
+	return configDefault(c.MaxBidiRemoteStreams, 100, maxStreamsLimit)
+}
+
+func (c *Config) maxUniRemoteStreams() int64 {
+	return configDefault(c.MaxUniRemoteStreams, 100, maxStreamsLimit)
+}
+
+func (c *Config) streamReadBufferSize() int64 {
+	return configDefault(c.StreamReadBufferSize, 1<<20, maxVarint)
+}
+
+func (c *Config) streamWriteBufferSize() int64 {
+	return configDefault(c.StreamWriteBufferSize, 1<<20, maxVarint)
+}
diff --git a/internal/quic/config_test.go b/internal/quic/config_test.go
index cec57c5..8d67ef0 100644
--- a/internal/quic/config_test.go
+++ b/internal/quic/config_test.go
@@ -10,9 +10,13 @@
 
 func TestConfigTransportParameters(t *testing.T) {
 	const (
-		wantInitialMaxStreamData = int64(2)
+		wantInitialMaxStreamData  = int64(2)
+		wantInitialMaxStreamsBidi = int64(3)
+		wantInitialMaxStreamsUni  = int64(4)
 	)
 	tc := newTestConn(t, clientSide, func(c *Config) {
+		c.MaxBidiRemoteStreams = wantInitialMaxStreamsBidi
+		c.MaxUniRemoteStreams = wantInitialMaxStreamsUni
 		c.StreamReadBufferSize = wantInitialMaxStreamData
 	})
 	tc.handshake()
@@ -29,4 +33,10 @@
 	if got, want := p.initialMaxStreamDataUni, wantInitialMaxStreamData; got != want {
 		t.Errorf("initial_max_stream_data_uni = %v, want %v", got, want)
 	}
+	if got, want := p.initialMaxStreamsBidi, wantInitialMaxStreamsBidi; got != want {
+		t.Errorf("initial_max_stream_data_uni = %v, want %v", got, want)
+	}
+	if got, want := p.initialMaxStreamsUni, wantInitialMaxStreamsUni; got != want {
+		t.Errorf("initial_max_stream_data_uni = %v, want %v", got, want)
+	}
 }
diff --git a/internal/quic/conn.go b/internal/quic/conn.go
index 04dcd7b..642c507 100644
--- a/internal/quic/conn.go
+++ b/internal/quic/conn.go
@@ -121,6 +121,8 @@
 		initialMaxStreamDataBidiLocal:  config.streamReadBufferSize(),
 		initialMaxStreamDataBidiRemote: config.streamReadBufferSize(),
 		initialMaxStreamDataUni:        config.streamReadBufferSize(),
+		initialMaxStreamsBidi:          c.streams.remoteLimit[bidiStream].max,
+		initialMaxStreamsUni:           c.streams.remoteLimit[uniStream].max,
 		activeConnIDLimit:              activeConnIDLimit,
 	})
 
@@ -167,6 +169,8 @@
 
 // receiveTransportParameters applies transport parameters sent by the peer.
 func (c *Conn) receiveTransportParameters(p transportParameters) error {
+	c.streams.localLimit[bidiStream].setMax(p.initialMaxStreamsBidi)
+	c.streams.localLimit[uniStream].setMax(p.initialMaxStreamsUni)
 	c.streams.peerInitialMaxStreamDataBidiLocal = p.initialMaxStreamDataBidiLocal
 	c.streams.peerInitialMaxStreamDataRemote[bidiStream] = p.initialMaxStreamDataBidiRemote
 	c.streams.peerInitialMaxStreamDataRemote[uniStream] = p.initialMaxStreamDataUni
diff --git a/internal/quic/conn_loss.go b/internal/quic/conn_loss.go
index 103db9f..b8146a4 100644
--- a/internal/quic/conn_loss.go
+++ b/internal/quic/conn_loss.go
@@ -64,6 +64,10 @@
 			}
 			fin := f&streamFinBit != 0
 			s.ackOrLossData(sent.num, start, end, fin, fate)
+		case frameTypeMaxStreamsBidi:
+			c.streams.remoteLimit[bidiStream].sendMax.ackLatestOrLoss(sent.num, fate)
+		case frameTypeMaxStreamsUni:
+			c.streams.remoteLimit[uniStream].sendMax.ackLatestOrLoss(sent.num, fate)
 		case frameTypeNewConnectionID:
 			seq := int64(sent.nextInt())
 			c.connIDState.ackOrLossNewConnectionID(sent.num, seq, fate)
diff --git a/internal/quic/conn_loss_test.go b/internal/quic/conn_loss_test.go
index bb43030..d426aa6 100644
--- a/internal/quic/conn_loss_test.go
+++ b/internal/quic/conn_loss_test.go
@@ -174,9 +174,7 @@
 	// be retransmitted if lost.
 	lostFrameTest(t, func(t *testing.T, pto bool) {
 		ctx := canceledContext()
-		tc := newTestConn(t, clientSide, func(p *transportParameters) {
-			p.initialMaxStreamDataBidiRemote = 100
-		})
+		tc := newTestConn(t, clientSide, permissiveTransportParameters)
 		tc.handshake()
 		tc.ignoreFrame(frameTypeAck)
 
@@ -370,6 +368,85 @@
 	})
 }
 
+func TestLostMaxStreamsFrameMostRecent(t *testing.T) {
+	// "[...] an updated value is sent when a packet containing the
+	// most recent MAX_STREAMS for a stream type frame is declared lost [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.9
+	lostFrameTest(t, func(t *testing.T, pto bool) {
+		ctx := canceledContext()
+		tc := newTestConn(t, serverSide, func(c *Config) {
+			c.MaxUniRemoteStreams = 1
+		})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:  newStreamID(clientSide, uniStream, 0),
+			fin: true,
+		})
+		s, err := tc.conn.AcceptStream(ctx)
+		if err != nil {
+			t.Fatalf("AcceptStream() = %v", err)
+		}
+		if err := s.CloseContext(ctx); err != nil {
+			t.Fatalf("stream.Close() = %v", err)
+		}
+		tc.wantFrame("closing stream updates peer's MAX_STREAMS",
+			packetType1RTT, debugFrameMaxStreams{
+				streamType: uniStream,
+				max:        2,
+			})
+
+		tc.triggerLossOrPTO(packetType1RTT, pto)
+		tc.wantFrame("lost MAX_STREAMS is resent",
+			packetType1RTT, debugFrameMaxStreams{
+				streamType: uniStream,
+				max:        2,
+			})
+	})
+}
+
+func TestLostMaxStreamsFrameNotMostRecent(t *testing.T) {
+	// Send two MAX_STREAMS frames, lose the first one.
+	//
+	// No PTO mode for this test: The ack that causes the first frame
+	// to be lost arms the loss timer for the second, so the PTO timer is not armed.
+	const pto = false
+	ctx := canceledContext()
+	tc := newTestConn(t, serverSide, func(c *Config) {
+		c.MaxUniRemoteStreams = 2
+	})
+	tc.handshake()
+	tc.ignoreFrame(frameTypeAck)
+	for i := int64(0); i < 2; i++ {
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:  newStreamID(clientSide, uniStream, i),
+			fin: true,
+		})
+		s, err := tc.conn.AcceptStream(ctx)
+		if err != nil {
+			t.Fatalf("AcceptStream() = %v", err)
+		}
+		if err := s.CloseContext(ctx); err != nil {
+			t.Fatalf("stream.Close() = %v", err)
+		}
+		tc.wantFrame("closing stream updates peer's MAX_STREAMS",
+			packetType1RTT, debugFrameMaxStreams{
+				streamType: uniStream,
+				max:        3 + i,
+			})
+	}
+
+	// The second MAX_STREAMS frame is acked.
+	tc.writeAckForLatest()
+
+	// The first MAX_STREAMS frame is lost.
+	tc.conn.ping(appDataSpace)
+	tc.wantFrame("connection should send a PING frame",
+		packetType1RTT, debugFramePing{})
+	tc.triggerLossOrPTO(packetType1RTT, pto)
+	tc.wantIdle("superseded MAX_DATA is not resent on loss")
+}
+
 func TestLostStreamDataBlockedFrame(t *testing.T) {
 	// "A new [STREAM_DATA_BLOCKED] frame is sent if a packet containing
 	// the most recent frame for a scope is lost [...]"
diff --git a/internal/quic/conn_recv.go b/internal/quic/conn_recv.go
index e0a91ab..faf3a37 100644
--- a/internal/quic/conn_recv.go
+++ b/internal/quic/conn_recv.go
@@ -196,7 +196,7 @@
 			if !frameOK(c, ptype, __01) {
 				return
 			}
-			_, _, n = consumeMaxStreamsFrame(payload)
+			n = c.handleMaxStreamsFrame(now, payload)
 		case frameTypeStreamsBlockedBidi, frameTypeStreamsBlockedUni:
 			if !frameOK(c, ptype, __01) {
 				return
@@ -282,6 +282,9 @@
 
 func (c *Conn) handleMaxStreamDataFrame(now time.Time, payload []byte) int {
 	id, maxStreamData, n := consumeMaxStreamDataFrame(payload)
+	if n < 0 {
+		return -1
+	}
 	if s := c.streamForFrame(now, id, sendStream); s != nil {
 		if err := s.handleMaxStreamData(maxStreamData); err != nil {
 			c.abort(now, err)
@@ -291,6 +294,15 @@
 	return n
 }
 
+func (c *Conn) handleMaxStreamsFrame(now time.Time, payload []byte) int {
+	styp, max, n := consumeMaxStreamsFrame(payload)
+	if n < 0 {
+		return -1
+	}
+	c.streams.localLimit[styp].setMax(max)
+	return n
+}
+
 func (c *Conn) handleResetStreamFrame(now time.Time, space numberSpace, payload []byte) int {
 	id, code, finalSize, n := consumeResetStreamFrame(payload)
 	if n < 0 {
diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go
index 0ede284..716ed2d 100644
--- a/internal/quic/conn_streams.go
+++ b/internal/quic/conn_streams.go
@@ -18,7 +18,10 @@
 
 	streamsMu sync.Mutex
 	streams   map[streamID]*Stream
-	opened    [streamTypeCount]int64 // number of streams opened by us
+
+	// Limits on the number of streams, indexed by streamType.
+	localLimit  [streamTypeCount]localStreamLimits
+	remoteLimit [streamTypeCount]remoteStreamLimits
 
 	// Peer configuration provided in transport parameters.
 	peerInitialMaxStreamDataRemote    [streamTypeCount]int64 // streams opened by us
@@ -36,6 +39,10 @@
 func (c *Conn) streamsInit() {
 	c.streams.streams = make(map[streamID]*Stream)
 	c.streams.queue = newQueue[*Stream]()
+	c.streams.localLimit[bidiStream].init()
+	c.streams.localLimit[uniStream].init()
+	c.streams.remoteLimit[bidiStream].init(c.config.maxBidiRemoteStreams())
+	c.streams.remoteLimit[uniStream].init(c.config.maxUniRemoteStreams())
 }
 
 // AcceptStream waits for and returns the next stream created by the peer.
@@ -60,12 +67,13 @@
 }
 
 func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, error) {
-	// TODO: Stream limits.
 	c.streams.streamsMu.Lock()
 	defer c.streams.streamsMu.Unlock()
 
-	num := c.streams.opened[styp]
-	c.streams.opened[styp]++
+	num, err := c.streams.localLimit[styp].open(ctx, c)
+	if err != nil {
+		return nil, err
+	}
 
 	s := newStream(c, newStreamID(c.side, styp, num))
 	s.outmaxbuf = c.config.streamWriteBufferSize()
@@ -122,16 +130,46 @@
 
 	c.streams.streamsMu.Lock()
 	defer c.streams.streamsMu.Unlock()
-	if s := c.streams.streams[id]; s != nil {
+	s, isOpen := c.streams.streams[id]
+	if s != nil {
 		return s
 	}
-	// TODO: Check for closed streams, once we support closing streams.
+
+	num := id.num()
+	styp := id.streamType()
 	if id.initiator() == c.side {
+		if num < c.streams.localLimit[styp].opened {
+			// This stream was created by us, and has been closed.
+			return nil
+		}
+		// Received a frame for a stream that should be originated by us,
+		// but which we never created.
 		c.abort(now, localTransportError(errStreamState))
 		return nil
+	} else {
+		// if isOpen, this is a stream that was implicitly opened by a
+		// previous frame for a larger-numbered stream, but we haven't
+		// actually created it yet.
+		if !isOpen && num < c.streams.remoteLimit[styp].opened {
+			// This stream was created by the peer, and has been closed.
+			return nil
+		}
+	}
+
+	prevOpened := c.streams.remoteLimit[styp].opened
+	if err := c.streams.remoteLimit[styp].open(id); err != nil {
+		c.abort(now, err)
+		return nil
 	}
 
-	s := newStream(c, id)
+	// Receiving a frame for a stream implicitly creates all streams
+	// with the same initiator and type and a lower number.
+	// Add a nil entry to the streams map for each implicitly created stream.
+	for n := newStreamID(id.initiator(), id.streamType(), prevOpened); n < id; n += 4 {
+		c.streams.streams[n] = nil
+	}
+
+	s = newStream(c, id)
 	s.inmaxbuf = c.config.streamReadBufferSize()
 	s.inwin = c.config.streamReadBufferSize()
 	if id.streamType() == bidiStream {
@@ -174,6 +212,8 @@
 // It returns true if no more frames need appending,
 // false if not everything fit in the current packet.
 func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
+	c.streams.remoteLimit[uniStream].appendFrame(w, uniStream, pnum, pto)
+	c.streams.remoteLimit[bidiStream].appendFrame(w, bidiStream, pnum, pto)
 	if pto {
 		return c.appendStreamFramesPTO(w, pnum)
 	}
@@ -222,7 +262,11 @@
 			s.state.set(streamConnRemoved, streamConnRemoved)
 			delete(c.streams.streams, s.id)
 
-			// TODO: Provide the peer with additional stream quota (MAX_STREAMS).
+			// Record finalization of remote streams, to know when
+			// to extend the peer's stream limit.
+			if s.id.initiator() != c.side {
+				c.streams.remoteLimit[s.id.streamType()].close()
+			}
 		}
 
 		next := s.next
@@ -251,6 +295,7 @@
 func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool {
 	c.streams.sendMu.Lock()
 	defer c.streams.sendMu.Unlock()
+	const pto = true
 	for _, s := range c.streams.streams {
 		const pto = true
 		s.ingate.lock()
@@ -259,6 +304,7 @@
 		if !inOK {
 			return false
 		}
+
 		s.outgate.lock()
 		outOK := s.appendOutFramesLocked(w, pnum, pto)
 		s.outUnlockNoQueue()
diff --git a/internal/quic/conn_streams_test.go b/internal/quic/conn_streams_test.go
index 9bbc994..ab1df1a 100644
--- a/internal/quic/conn_streams_test.go
+++ b/internal/quic/conn_streams_test.go
@@ -10,15 +10,13 @@
 	"context"
 	"fmt"
 	"io"
+	"math"
 	"testing"
 )
 
 func TestStreamsCreate(t *testing.T) {
 	ctx := canceledContext()
-	tc := newTestConn(t, clientSide, func(p *transportParameters) {
-		p.initialMaxStreamDataBidiLocal = 100
-		p.initialMaxStreamDataBidiRemote = 100
-	})
+	tc := newTestConn(t, clientSide, permissiveTransportParameters)
 	tc.handshake()
 
 	c, err := tc.conn.NewStream(ctx)
@@ -126,7 +124,7 @@
 	}
 }
 
-func TestStreamsStreamNotCreated(t *testing.T) {
+func TestStreamsLocalStreamNotCreated(t *testing.T) {
 	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
 	// if it receives a STREAM frame for a locally initiated stream that has
 	// not yet been created [...]"
@@ -144,13 +142,39 @@
 		})
 }
 
+func TestStreamsLocalStreamClosed(t *testing.T) {
+	tc, s := newTestConnAndLocalStream(t, clientSide, uniStream, permissiveTransportParameters)
+	s.CloseWrite()
+	tc.wantFrame("FIN for closed stream",
+		packetType1RTT, debugFrameStream{
+			id:   newStreamID(clientSide, uniStream, 0),
+			fin:  true,
+			data: []byte{},
+		})
+	tc.writeAckForAll()
+
+	tc.writeFrames(packetType1RTT, debugFrameStopSending{
+		id: newStreamID(clientSide, uniStream, 0),
+	})
+	tc.wantIdle("frame for finalized stream is ignored")
+
+	// ACKing the last stream packet should have cleaned up the stream.
+	// Check that we don't have any state left.
+	if got := len(tc.conn.streams.streams); got != 0 {
+		t.Fatalf("after close, len(tc.conn.streams.streams) = %v, want 0", got)
+	}
+	if tc.conn.streams.sendHead != nil {
+		t.Fatalf("after close, stream send queue is not empty; should be")
+	}
+}
+
 func TestStreamsStreamSendOnly(t *testing.T) {
 	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
 	// if it receives a STREAM frame for a locally initiated stream that has
 	// not yet been created [...]"
 	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
 	ctx := canceledContext()
-	tc := newTestConn(t, serverSide)
+	tc := newTestConn(t, serverSide, permissiveTransportParameters)
 	tc.handshake()
 
 	c, err := tc.conn.NewSendOnlyStream(ctx)
@@ -342,3 +366,115 @@
 		})
 	}
 }
+
+func TestStreamsCreateAndCloseRemote(t *testing.T) {
+	// This test exercises creating new streams in response to frames
+	// from the peer, and cleaning up after streams are fully closed.
+	//
+	// It's overfitted to the current implementation, but works through
+	// a number of corner cases in that implementation.
+	//
+	// Disable verbose logging in this test: It sends a lot of packets,
+	// and they're not especially interesting on their own.
+	defer func(vv bool) {
+		*testVV = vv
+	}(*testVV)
+	*testVV = false
+	ctx := canceledContext()
+	tc := newTestConn(t, serverSide, permissiveTransportParameters)
+	tc.handshake()
+	tc.ignoreFrame(frameTypeAck)
+	type op struct {
+		id streamID
+	}
+	type streamOp op
+	type resetOp op
+	type acceptOp op
+	const noStream = math.MaxInt64
+	stringID := func(id streamID) string {
+		return fmt.Sprintf("%v/%v", id.streamType(), id.num())
+	}
+	for _, op := range []any{
+		"opening bidi/5 implicitly opens bidi/0-4",
+		streamOp{newStreamID(clientSide, bidiStream, 5)},
+		acceptOp{newStreamID(clientSide, bidiStream, 5)},
+		"bidi/3 was implicitly opened",
+		streamOp{newStreamID(clientSide, bidiStream, 3)},
+		acceptOp{newStreamID(clientSide, bidiStream, 3)},
+		resetOp{newStreamID(clientSide, bidiStream, 3)},
+		"bidi/3 is done, frames for it are discarded",
+		streamOp{newStreamID(clientSide, bidiStream, 3)},
+		"open and close some uni streams as well",
+		streamOp{newStreamID(clientSide, uniStream, 0)},
+		acceptOp{newStreamID(clientSide, uniStream, 0)},
+		streamOp{newStreamID(clientSide, uniStream, 1)},
+		acceptOp{newStreamID(clientSide, uniStream, 1)},
+		streamOp{newStreamID(clientSide, uniStream, 2)},
+		acceptOp{newStreamID(clientSide, uniStream, 2)},
+		resetOp{newStreamID(clientSide, uniStream, 1)},
+		resetOp{newStreamID(clientSide, uniStream, 0)},
+		resetOp{newStreamID(clientSide, uniStream, 2)},
+		"closing an implicitly opened stream causes us to accept it",
+		resetOp{newStreamID(clientSide, bidiStream, 0)},
+		acceptOp{newStreamID(clientSide, bidiStream, 0)},
+		resetOp{newStreamID(clientSide, bidiStream, 1)},
+		acceptOp{newStreamID(clientSide, bidiStream, 1)},
+		resetOp{newStreamID(clientSide, bidiStream, 2)},
+		acceptOp{newStreamID(clientSide, bidiStream, 2)},
+		"stream bidi/3 was reset previously",
+		resetOp{newStreamID(clientSide, bidiStream, 3)},
+		resetOp{newStreamID(clientSide, bidiStream, 4)},
+		acceptOp{newStreamID(clientSide, bidiStream, 4)},
+		"stream bidi/5 was reset previously",
+		resetOp{newStreamID(clientSide, bidiStream, 5)},
+		"stream bidi/6 was not implicitly opened",
+		resetOp{newStreamID(clientSide, bidiStream, 6)},
+		acceptOp{newStreamID(clientSide, bidiStream, 6)},
+	} {
+		if _, ok := op.(acceptOp); !ok {
+			if s, err := tc.conn.AcceptStream(ctx); err == nil {
+				t.Fatalf("accepted stream %v, want none", stringID(s.id))
+			}
+		}
+		switch op := op.(type) {
+		case string:
+			t.Log("# " + op)
+		case streamOp:
+			t.Logf("open stream %v", stringID(op.id))
+			tc.writeFrames(packetType1RTT, debugFrameStream{
+				id: streamID(op.id),
+			})
+		case resetOp:
+			t.Logf("reset stream %v", stringID(op.id))
+			tc.writeFrames(packetType1RTT, debugFrameResetStream{
+				id: op.id,
+			})
+		case acceptOp:
+			s, err := tc.conn.AcceptStream(ctx)
+			if err != nil {
+				t.Fatalf("AcceptStream() = %q; want stream %v", err, stringID(op.id))
+			}
+			if s.id != op.id {
+				t.Fatalf("accepted stram %v; want stream %v", err, stringID(op.id))
+			}
+			t.Logf("accepted stream %v", stringID(op.id))
+			// Immediately close the stream, so the stream becomes done when the
+			// peer closes its end.
+			s.CloseContext(ctx)
+		}
+		p := tc.readPacket()
+		if p != nil {
+			tc.writeFrames(p.ptype, debugFrameAck{
+				ranges: []i64range[packetNumber]{{0, p.num + 1}},
+			})
+		}
+	}
+	// Every stream should be fully closed now.
+	// Check that we don't have any state left.
+	if got := len(tc.conn.streams.streams); got != 0 {
+		t.Fatalf("after test, len(tc.conn.streams.streams) = %v, want 0", got)
+	}
+	if tc.conn.streams.sendHead != nil {
+		t.Fatalf("after test, stream send queue is not empty; should be")
+	}
+}
diff --git a/internal/quic/quic.go b/internal/quic/quic.go
index 71738e1..cf4137e 100644
--- a/internal/quic/quic.go
+++ b/internal/quic/quic.go
@@ -59,6 +59,13 @@
 // https://www.rfc-editor.org/rfc/rfc9000.html#section-4.6-2
 const maxStreamsLimit = 1 << 60
 
+// Maximum number of streams we will allow the peer to create implicitly.
+// A stream ID that is used out of order results in all streams of that type
+// with lower-numbered IDs also being opened. To limit the amount of work we
+// will do in response to a single frame, we cap the peer's stream limit to
+// this value.
+const implicitStreamLimit = 100
+
 // A connSide distinguishes between the client and server sides of a connection.
 type connSide int8
 
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 2dbf446..b759e40 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -330,6 +330,10 @@
 	s.resetInternal(code, userClosed)
 }
 
+// resetInternal resets the send side of the stream.
+//
+// If userClosed is true, this is s.Reset.
+// If userClosed is false, this is a reaction to a STOP_SENDING frame.
 func (s *Stream) resetInternal(code uint64, userClosed bool) {
 	s.outgate.lock()
 	defer s.outUnlock()
diff --git a/internal/quic/stream_limits.go b/internal/quic/stream_limits.go
new file mode 100644
index 0000000..5ea7146
--- /dev/null
+++ b/internal/quic/stream_limits.go
@@ -0,0 +1,109 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+	"context"
+)
+
+// Limits on the number of open streams.
+// Every connection has separate limits for bidirectional and unidirectional streams.
+//
+// Note that the MAX_STREAMS limit includes closed as well as open streams.
+// Closing a stream doesn't enable an endpoint to open a new one;
+// only an increase in the MAX_STREAMS limit does.
+
+// localStreamLimits are limits on the number of open streams created by us.
+type localStreamLimits struct {
+	gate   gate
+	max    int64 // peer-provided MAX_STREAMS
+	opened int64 // number of streams opened by us
+}
+
+func (lim *localStreamLimits) init() {
+	lim.gate = newGate()
+}
+
+// open creates a new local stream, blocking until MAX_STREAMS quota is available.
+func (lim *localStreamLimits) open(ctx context.Context, c *Conn) (num int64, err error) {
+	// TODO: Send a STREAMS_BLOCKED when blocked.
+	if err := c.waitAndLockGate(ctx, &lim.gate); err != nil {
+		return 0, err
+	}
+	n := lim.opened
+	lim.opened++
+	lim.gate.unlock(lim.opened < lim.max)
+	return n, nil
+}
+
+// setMax sets the MAX_STREAMS provided by the peer.
+func (lim *localStreamLimits) setMax(maxStreams int64) {
+	lim.gate.lock()
+	lim.max = max(lim.max, maxStreams)
+	lim.gate.unlock(lim.opened < lim.max)
+}
+
+// remoteStreamLimits are limits on the number of open streams created by the peer.
+type remoteStreamLimits struct {
+	max     int64   // last MAX_STREAMS sent to the peer
+	opened  int64   // number of streams opened by the peer (including subsequently closed ones)
+	closed  int64   // number of peer streams in the "closed" state
+	maxOpen int64   // how many streams we want to let the peer simultaneously open
+	sendMax sentVal // set when we should send MAX_STREAMS
+}
+
+func (lim *remoteStreamLimits) init(maxOpen int64) {
+	lim.maxOpen = maxOpen
+	lim.max = min(maxOpen, implicitStreamLimit) // initial limit sent in transport parameters
+	lim.opened = 0
+}
+
+// open handles the peer opening a new stream.
+func (lim *remoteStreamLimits) open(id streamID) error {
+	num := id.num()
+	if num >= lim.max {
+		return localTransportError(errStreamLimit)
+	}
+	if num >= lim.opened {
+		lim.opened = num + 1
+		lim.maybeUpdateMax()
+	}
+	return nil
+}
+
+// close handles the peer closing an open stream.
+func (lim *remoteStreamLimits) close() {
+	lim.closed++
+	lim.maybeUpdateMax()
+}
+
+// maybeUpdateMax updates the MAX_STREAMS value we will send to the peer.
+func (lim *remoteStreamLimits) maybeUpdateMax() {
+	newMax := min(
+		// Max streams the peer can have open at once.
+		lim.closed+lim.maxOpen,
+		// Max streams the peer can open with a single frame.
+		lim.opened+implicitStreamLimit,
+	)
+	avail := lim.max - lim.opened
+	if newMax > lim.max && (avail < 8 || newMax-lim.max >= 2*avail) {
+		// If the peer has less than 8 streams, or if increasing the peer's
+		// stream limit would double it, then send a MAX_STREAMS.
+		lim.max = newMax
+		lim.sendMax.setUnsent()
+	}
+}
+
+// appendFrame appends a MAX_DATA frame if necessary.
+func (lim *remoteStreamLimits) appendFrame(w *packetWriter, typ streamType, pnum packetNumber, pto bool) {
+	if !lim.sendMax.shouldSendPTO(pto) {
+		return
+	}
+	if w.appendMaxStreamsFrame(typ, lim.max) {
+		lim.sendMax.setSent(pnum)
+	}
+}
diff --git a/internal/quic/stream_limits_test.go b/internal/quic/stream_limits_test.go
new file mode 100644
index 0000000..3f291e9
--- /dev/null
+++ b/internal/quic/stream_limits_test.go
@@ -0,0 +1,269 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+	"context"
+	"crypto/tls"
+	"testing"
+)
+
+func TestStreamLimitNewStreamBlocked(t *testing.T) {
+	// "An endpoint that receives a frame with a stream ID exceeding the limit
+	// it has sent MUST treat this as a connection error of type STREAM_LIMIT_ERROR [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-4.6-3
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		ctx := canceledContext()
+		tc := newTestConn(t, clientSide,
+			permissiveTransportParameters,
+			func(p *transportParameters) {
+				p.initialMaxStreamsBidi = 0
+				p.initialMaxStreamsUni = 0
+			})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+		opening := runAsync(tc, func(ctx context.Context) (*Stream, error) {
+			return tc.conn.newLocalStream(ctx, styp)
+		})
+		if _, err := opening.result(); err != errNotDone {
+			t.Fatalf("new stream blocked by limit: %v, want errNotDone", err)
+		}
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreams{
+			streamType: styp,
+			max:        1,
+		})
+		if _, err := opening.result(); err != nil {
+			t.Fatalf("new stream not created after limit raised: %v", err)
+		}
+		if _, err := tc.conn.newLocalStream(ctx, styp); err == nil {
+			t.Fatalf("new stream blocked by raised limit: %v, want error", err)
+		}
+	})
+}
+
+func TestStreamLimitMaxStreamsDecreases(t *testing.T) {
+	// "MAX_STREAMS frames that do not increase the stream limit MUST be ignored."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-4.6-4
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		ctx := canceledContext()
+		tc := newTestConn(t, clientSide,
+			permissiveTransportParameters,
+			func(p *transportParameters) {
+				p.initialMaxStreamsBidi = 0
+				p.initialMaxStreamsUni = 0
+			})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreams{
+			streamType: styp,
+			max:        2,
+		})
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreams{
+			streamType: styp,
+			max:        1,
+		})
+		if _, err := tc.conn.newLocalStream(ctx, styp); err != nil {
+			t.Fatalf("open stream 1, limit 2, got error: %v", err)
+		}
+		if _, err := tc.conn.newLocalStream(ctx, styp); err != nil {
+			t.Fatalf("open stream 2, limit 2, got error: %v", err)
+		}
+		if _, err := tc.conn.newLocalStream(ctx, styp); err == nil {
+			t.Fatalf("open stream 3, limit 2, got error: %v", err)
+		}
+	})
+}
+
+func TestStreamLimitViolated(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc := newTestConn(t, serverSide,
+			func(c *Config) {
+				if styp == bidiStream {
+					c.MaxBidiRemoteStreams = 10
+				} else {
+					c.MaxUniRemoteStreams = 10
+				}
+			})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id: newStreamID(clientSide, styp, 9),
+		})
+		tc.wantIdle("stream number 9 is within the limit")
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id: newStreamID(clientSide, styp, 10),
+		})
+		tc.wantFrame("stream number 10 is beyond the limit",
+			packetType1RTT, debugFrameConnectionCloseTransport{
+				code: errStreamLimit,
+			},
+		)
+	})
+}
+
+func TestStreamLimitImplicitStreams(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc := newTestConn(t, serverSide,
+			func(c *Config) {
+				c.MaxBidiRemoteStreams = 1 << 60
+				c.MaxUniRemoteStreams = 1 << 60
+			})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+		if got, want := tc.sentTransportParameters.initialMaxStreamsBidi, int64(implicitStreamLimit); got != want {
+			t.Errorf("sent initial_max_streams_bidi = %v, want %v", got, want)
+		}
+		if got, want := tc.sentTransportParameters.initialMaxStreamsUni, int64(implicitStreamLimit); got != want {
+			t.Errorf("sent initial_max_streams_uni = %v, want %v", got, want)
+		}
+
+		// Create stream 0.
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id: newStreamID(clientSide, styp, 0),
+		})
+		tc.wantIdle("max streams not increased enough to send a new frame")
+
+		// Create streams [0, implicitStreamLimit).
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id: newStreamID(clientSide, styp, implicitStreamLimit-1),
+		})
+		tc.wantFrame("max streams increases to implicit stream limit",
+			packetType1RTT, debugFrameMaxStreams{
+				streamType: styp,
+				max:        2 * implicitStreamLimit,
+			})
+
+		// Create a stream past the limit.
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id: newStreamID(clientSide, styp, 2*implicitStreamLimit),
+		})
+		tc.wantFrame("stream is past the limit",
+			packetType1RTT, debugFrameConnectionCloseTransport{
+				code: errStreamLimit,
+			},
+		)
+	})
+}
+
+func TestStreamLimitMaxStreamsTransportParameterTooLarge(t *testing.T) {
+	// "If a max_streams transport parameter [...] is received with
+	// a value greater than 2^60 [...] the connection MUST be closed
+	// immediately with a connection error of type TRANSPORT_PARAMETER_ERROR [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-4.6-2
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc := newTestConn(t, serverSide,
+			func(p *transportParameters) {
+				if styp == bidiStream {
+					p.initialMaxStreamsBidi = 1<<60 + 1
+				} else {
+					p.initialMaxStreamsUni = 1<<60 + 1
+				}
+			})
+		tc.writeFrames(packetTypeInitial, debugFrameCrypto{
+			data: tc.cryptoDataIn[tls.QUICEncryptionLevelInitial],
+		})
+		tc.wantFrame("max streams transport parameter is too large",
+			packetTypeInitial, debugFrameConnectionCloseTransport{
+				code: errTransportParameter,
+			},
+		)
+	})
+}
+
+func TestStreamLimitMaxStreamsFrameTooLarge(t *testing.T) {
+	// "If [...] a MAX_STREAMS frame is received with a value
+	// greater than 2^60 [...] the connection MUST be closed immediately
+	// with a connection error [...] of type FRAME_ENCODING_ERROR [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-4.6-2
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc := newTestConn(t, serverSide)
+		tc.handshake()
+		tc.writeFrames(packetTypeInitial,
+			debugFrameCrypto{
+				data: tc.cryptoDataIn[tls.QUICEncryptionLevelInitial],
+			})
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreams{
+			streamType: styp,
+			max:        1<<60 + 1,
+		})
+		tc.wantFrame("MAX_STREAMS value is too large",
+			packetType1RTT, debugFrameConnectionCloseTransport{
+				code: errFrameEncoding,
+			},
+		)
+	})
+}
+
+func TestStreamLimitSendUpdatesMaxStreams(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		ctx := canceledContext()
+		tc := newTestConn(t, serverSide, func(c *Config) {
+			if styp == uniStream {
+				c.MaxUniRemoteStreams = 4
+				c.MaxBidiRemoteStreams = 0
+			} else {
+				c.MaxUniRemoteStreams = 0
+				c.MaxBidiRemoteStreams = 4
+			}
+		})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+		var streams []*Stream
+		for i := 0; i < 4; i++ {
+			tc.writeFrames(packetType1RTT, debugFrameStream{
+				id:  newStreamID(clientSide, styp, int64(i)),
+				fin: true,
+			})
+			s, err := tc.conn.AcceptStream(ctx)
+			if err != nil {
+				t.Fatalf("AcceptStream = %v", err)
+			}
+			streams = append(streams, s)
+		}
+		streams[3].CloseContext(ctx)
+		if styp == bidiStream {
+			tc.wantFrame("stream is closed",
+				packetType1RTT, debugFrameStream{
+					id:   streams[3].id,
+					fin:  true,
+					data: []byte{},
+				})
+			tc.writeAckForAll()
+		}
+		tc.wantFrame("closing a stream when peer is at limit immediately extends the limit",
+			packetType1RTT, debugFrameMaxStreams{
+				streamType: styp,
+				max:        5,
+			})
+	})
+}
+
+func TestStreamLimitStopSendingDoesNotUpdateMaxStreams(t *testing.T) {
+	tc, s := newTestConnAndRemoteStream(t, serverSide, bidiStream, func(c *Config) {
+		c.MaxBidiRemoteStreams = 1
+	})
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:  s.id,
+		fin: true,
+	})
+	s.CloseRead()
+	tc.writeFrames(packetType1RTT, debugFrameStopSending{
+		id: s.id,
+	})
+	tc.wantFrame("recieved STOP_SENDING, send RESET_STREAM",
+		packetType1RTT, debugFrameResetStream{
+			id: s.id,
+		})
+	tc.writeAckForAll()
+	tc.wantIdle("MAX_STREAMS is not extended until the user fully closes the stream")
+	s.CloseWrite()
+	tc.wantFrame("user closing the stream triggers MAX_STREAMS update",
+		packetType1RTT, debugFrameMaxStreams{
+			streamType: bidiStream,
+			max:        2,
+		})
+}
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index e22e043..fb21255 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -649,7 +649,7 @@
 // to the conn and expects a STREAM_STATE_ERROR.
 func testStreamSendFrameInvalidState(t *testing.T, f func(sid streamID) debugFrame) {
 	testSides(t, "stream_not_created", func(t *testing.T, side connSide) {
-		tc := newTestConn(t, side)
+		tc := newTestConn(t, side, permissiveTransportParameters)
 		tc.handshake()
 		tc.writeFrames(packetType1RTT, f(newStreamID(side, bidiStream, 0)))
 		tc.wantFrame("frame for local stream which has not been created",
@@ -659,7 +659,7 @@
 	})
 	testSides(t, "uni_stream", func(t *testing.T, side connSide) {
 		ctx := canceledContext()
-		tc := newTestConn(t, side)
+		tc := newTestConn(t, side, permissiveTransportParameters)
 		tc.handshake()
 		sid := newStreamID(side, uniStream, 0)
 		s, err := tc.conn.NewSendOnlyStream(ctx)
@@ -796,7 +796,7 @@
 }
 
 func TestStreamReadFromWriteOnlyStream(t *testing.T) {
-	_, s := newTestConnAndLocalStream(t, serverSide, uniStream)
+	_, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
 	buf := make([]byte, 10)
 	wantErr := "read from write-only stream"
 	if n, err := s.Read(buf); err == nil || !strings.Contains(err.Error(), wantErr) {
@@ -1112,7 +1112,7 @@
 }
 
 func TestStreamResetInvalidCode(t *testing.T) {
-	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream)
+	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
 	s.Reset(1 << 62)
 	tc.wantFrame("reset with invalid code sends a RESET_STREAM anyway",
 		packetType1RTT, debugFrameResetStream{