quic: send and receive stream data

Send and receive data in STREAM frames.

Write-close streams and communicate the final size in a
STREAM frame with the FIN bit. Return io.EOF on reads
at the end of a stream.

Handle stream-level flow control. Send window updates
in MAX_STREAM_DATA frames, send STREAM_DATA_BLOCKED
when flow control is not available.

Does not include connection-level flow control,
read-closing, aborting, or removing streams from
a conn after both sides have closed the stream.

For golang/go#58547

Change-Id: Ib2b449bf54eb6cf200c4f6e2dd2c33274dda3387
Reviewed-on: https://go-review.googlesource.com/c/net/+/515815
Run-TryBot: Damien Neil <dneil@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/quic/config.go b/internal/quic/config.go
index 7d1b743..df49357 100644
--- a/internal/quic/config.go
+++ b/internal/quic/config.go
@@ -17,4 +17,29 @@
 	// TLSConfig is the endpoint's TLS configuration.
 	// It must be non-nil and include at least one certificate or else set GetCertificate.
 	TLSConfig *tls.Config
+
+	// StreamReadBufferSize is the maximum amount of data sent by the peer that a
+	// stream will buffer for reading.
+	// If zero, the default value of 1MiB is used.
+	// If negative, the limit is zero.
+	StreamReadBufferSize int64
+
+	// StreamWriteBufferSize is the maximum amount of data a stream will buffer for
+	// sending to the peer.
+	// If zero, the default value of 1MiB is used.
+	// If negative, the limit is zero.
+	StreamWriteBufferSize int64
 }
+
+func configDefault(v, def int64) int64 {
+	switch v {
+	case -1:
+		return 0
+	case 0:
+		return def
+	}
+	return v
+}
+
+func (c *Config) streamReadBufferSize() int64  { return configDefault(c.StreamReadBufferSize, 1<<20) }
+func (c *Config) streamWriteBufferSize() int64 { return configDefault(c.StreamWriteBufferSize, 1<<20) }
diff --git a/internal/quic/conn.go b/internal/quic/conn.go
index 90e6739..0952a79 100644
--- a/internal/quic/conn.go
+++ b/internal/quic/conn.go
@@ -160,6 +160,9 @@
 
 // receiveTransportParameters applies transport parameters sent by the peer.
 func (c *Conn) receiveTransportParameters(p transportParameters) error {
+	c.streams.peerInitialMaxStreamDataBidiLocal = p.initialMaxStreamDataBidiLocal
+	c.streams.peerInitialMaxStreamDataRemote[bidiStream] = p.initialMaxStreamDataBidiRemote
+	c.streams.peerInitialMaxStreamDataRemote[uniStream] = p.initialMaxStreamDataUni
 	c.peerAckDelayExponent = p.ackDelayExponent
 	c.loss.setMaxAckDelay(p.maxAckDelay)
 	if err := c.connIDState.setPeerActiveConnIDLimit(p.activeConnIDLimit, c.newConnIDFunc()); err != nil {
diff --git a/internal/quic/conn_loss.go b/internal/quic/conn_loss.go
index ca17808..f42f7e5 100644
--- a/internal/quic/conn_loss.go
+++ b/internal/quic/conn_loss.go
@@ -44,6 +44,14 @@
 		case frameTypeCrypto:
 			start, end := sent.nextRange()
 			c.crypto[space].ackOrLoss(start, end, fate)
+		case frameTypeMaxStreamData,
+			frameTypeStreamDataBlocked:
+			id := streamID(sent.nextInt())
+			s := c.streamForID(id)
+			if s == nil {
+				continue
+			}
+			s.ackOrLoss(sent.num, f, fate)
 		case frameTypeStreamBase,
 			frameTypeStreamBase | streamFinBit:
 			id := streamID(sent.nextInt())
diff --git a/internal/quic/conn_loss_test.go b/internal/quic/conn_loss_test.go
index e3d16a7..d944515 100644
--- a/internal/quic/conn_loss_test.go
+++ b/internal/quic/conn_loss_test.go
@@ -7,7 +7,9 @@
 package quic
 
 import (
+	"context"
 	"crypto/tls"
+	"fmt"
 	"testing"
 )
 
@@ -145,7 +147,275 @@
 				data: []byte{},
 			})
 	})
+}
 
+func TestLostStreamWithData(t *testing.T) {
+	// "Application data sent in STREAM frames is retransmitted in new STREAM
+	// frames unless the endpoint has sent a RESET_STREAM for that stream."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.2
+	//
+	// TODO: Lost stream frame after RESET_STREAM
+	lostFrameTest(t, func(t *testing.T, pto bool) {
+		data := []byte{0, 1, 2, 3, 4, 5, 6, 7}
+		tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+			p.initialMaxStreamsUni = 1
+			p.initialMaxData = 1 << 20
+			p.initialMaxStreamDataUni = 1 << 20
+		})
+		s.Write(data[:4])
+		tc.wantFrame("send [0,4)",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  0,
+				data: data[:4],
+			})
+		s.Write(data[4:8])
+		tc.wantFrame("send [4,8)",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  4,
+				data: data[4:8],
+			})
+		s.Close()
+		tc.wantFrame("send FIN",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  8,
+				fin:  true,
+				data: []byte{},
+			})
+
+		tc.triggerLossOrPTO(packetType1RTT, pto)
+		tc.wantFrame("resend data",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  0,
+				fin:  true,
+				data: data[:8],
+			})
+	})
+}
+
+func TestLostStreamPartialLoss(t *testing.T) {
+	// Conn sends four STREAM packets.
+	// ACKs are received for the packets containing bytes 0 and 2.
+	// The remaining packets are declared lost.
+	// The Conn resends only the lost data.
+	//
+	// This test doesn't have a PTO mode, because the ACK for the packet containing byte 2
+	// starts the loss timer for the packet containing byte 1, and the PTO timer is not
+	// armed when the loss timer is.
+	data := []byte{0, 1, 2, 3}
+	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+		p.initialMaxStreamsUni = 1
+		p.initialMaxData = 1 << 20
+		p.initialMaxStreamDataUni = 1 << 20
+	})
+	for i := range data {
+		s.Write(data[i : i+1])
+		tc.wantFrame(fmt.Sprintf("send STREAM frame with byte %v", i),
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  int64(i),
+				data: data[i : i+1],
+			})
+		if i%2 == 0 {
+			num := tc.sentFramePacket.num
+			tc.writeFrames(packetType1RTT, debugFrameAck{
+				ranges: []i64range[packetNumber]{
+					{num, num + 1},
+				},
+			})
+		}
+	}
+	const pto = false
+	tc.triggerLossOrPTO(packetType1RTT, pto)
+	tc.wantFrame("resend byte 1",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  1,
+			data: data[1:2],
+		})
+	tc.wantFrame("resend byte 3",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  3,
+			data: data[3:4],
+		})
+	tc.wantIdle("no more frames sent after packet loss")
+}
+
+func TestLostMaxStreamDataFrame(t *testing.T) {
+	// "[...] an updated value is sent when the packet containing
+	// the most recent MAX_STREAM_DATA frame for a stream is lost"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.8
+	lostFrameTest(t, func(t *testing.T, pto bool) {
+		const maxWindowSize = 10
+		buf := make([]byte, maxWindowSize)
+		tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+			c.StreamReadBufferSize = maxWindowSize
+		})
+
+		// We send MAX_STREAM_DATA = 19.
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  0,
+			data: make([]byte, maxWindowSize),
+		})
+		if n, err := s.Read(buf[:maxWindowSize-1]); err != nil || n != maxWindowSize-1 {
+			t.Fatalf("Read() = %v, %v; want %v, nil", n, err, maxWindowSize-1)
+		}
+		tc.wantFrame("stream window is extended after reading data",
+			packetType1RTT, debugFrameMaxStreamData{
+				id:  s.id,
+				max: (maxWindowSize * 2) - 1,
+			})
+
+		// MAX_STREAM_DATA = 20, which is only one more byte, so we don't send the frame.
+		if n, err := s.Read(buf); err != nil || n != 1 {
+			t.Fatalf("Read() = %v, %v; want %v, nil", n, err, 1)
+		}
+		tc.wantIdle("read doesn't extend window enough to send another MAX_STREAM_DATA")
+
+		// The MAX_STREAM_DATA = 19 packet was lost, so we send 20.
+		tc.triggerLossOrPTO(packetType1RTT, pto)
+		tc.wantFrame("resent MAX_STREAM_DATA includes most current value",
+			packetType1RTT, debugFrameMaxStreamData{
+				id:  s.id,
+				max: maxWindowSize * 2,
+			})
+	})
+}
+
+func TestLostMaxStreamDataFrameAfterStreamFinReceived(t *testing.T) {
+	// "An endpoint SHOULD stop sending MAX_STREAM_DATA frames when
+	// the receiving part of the stream enters a "Size Known" or "Reset Recvd" state."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.8
+	lostFrameTest(t, func(t *testing.T, pto bool) {
+		const maxWindowSize = 10
+		buf := make([]byte, maxWindowSize)
+		tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+			c.StreamReadBufferSize = maxWindowSize
+		})
+
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  0,
+			data: make([]byte, maxWindowSize),
+		})
+		if n, err := s.Read(buf); err != nil || n != maxWindowSize {
+			t.Fatalf("Read() = %v, %v; want %v, nil", n, err, maxWindowSize)
+		}
+		tc.wantFrame("stream window is extended after reading data",
+			packetType1RTT, debugFrameMaxStreamData{
+				id:  s.id,
+				max: 2 * maxWindowSize,
+			})
+
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:  s.id,
+			off: maxWindowSize,
+			fin: true,
+		})
+
+		tc.ignoreFrame(frameTypePing)
+		tc.triggerLossOrPTO(packetType1RTT, pto)
+		tc.wantIdle("lost MAX_STREAM_DATA not resent for stream in 'size known'")
+	})
+}
+
+func TestLostStreamDataBlockedFrame(t *testing.T) {
+	// "A new [STREAM_DATA_BLOCKED] frame is sent if a packet containing
+	// the most recent frame for a scope is lost [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.10
+	lostFrameTest(t, func(t *testing.T, pto bool) {
+		tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+			p.initialMaxStreamsUni = 1
+			p.initialMaxData = 1 << 20
+		})
+
+		w := runAsync(tc, func(ctx context.Context) (int, error) {
+			return s.WriteContext(ctx, []byte{0, 1, 2, 3})
+		})
+		defer w.cancel()
+		tc.wantFrame("write is blocked by flow control",
+			packetType1RTT, debugFrameStreamDataBlocked{
+				id:  s.id,
+				max: 0,
+			})
+
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+			id:  s.id,
+			max: 1,
+		})
+		tc.wantFrame("write makes some progress, but is still blocked by flow control",
+			packetType1RTT, debugFrameStreamDataBlocked{
+				id:  s.id,
+				max: 1,
+			})
+		tc.wantFrame("write consuming available window",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  0,
+				data: []byte{0},
+			})
+
+		tc.triggerLossOrPTO(packetType1RTT, pto)
+		tc.wantFrame("STREAM_DATA_BLOCKED is resent",
+			packetType1RTT, debugFrameStreamDataBlocked{
+				id:  s.id,
+				max: 1,
+			})
+		tc.wantFrame("STREAM is resent as well",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  0,
+				data: []byte{0},
+			})
+	})
+}
+
+func TestLostStreamDataBlockedFrameAfterStreamUnblocked(t *testing.T) {
+	// "A new [STREAM_DATA_BLOCKED] frame is sent [...] only while
+	// the endpoint is blocked on the corresponding limit."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.10
+	lostFrameTest(t, func(t *testing.T, pto bool) {
+		tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+			p.initialMaxStreamsUni = 1
+			p.initialMaxData = 1 << 20
+		})
+
+		data := []byte{0, 1, 2, 3}
+		w := runAsync(tc, func(ctx context.Context) (int, error) {
+			return s.WriteContext(ctx, data)
+		})
+		defer w.cancel()
+		tc.wantFrame("write is blocked by flow control",
+			packetType1RTT, debugFrameStreamDataBlocked{
+				id:  s.id,
+				max: 0,
+			})
+
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+			id:  s.id,
+			max: 10,
+		})
+		tc.wantFrame("write completes after flow control available",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  0,
+				data: data,
+			})
+
+		tc.triggerLossOrPTO(packetType1RTT, pto)
+		tc.wantFrame("STREAM data is resent",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  0,
+				data: data,
+			})
+		tc.wantIdle("STREAM_DATA_BLOCKED is not resent, since the stream is not blocked")
+	})
 }
 
 func TestLostNewConnectionIDFrame(t *testing.T) {
diff --git a/internal/quic/conn_recv.go b/internal/quic/conn_recv.go
index 45ef384..00985b6 100644
--- a/internal/quic/conn_recv.go
+++ b/internal/quic/conn_recv.go
@@ -191,7 +191,7 @@
 			if !frameOK(c, ptype, __01) {
 				return
 			}
-			_, _, n = consumeMaxStreamDataFrame(payload)
+			n = c.handleMaxStreamDataFrame(now, payload)
 		case frameTypeMaxStreamsBidi, frameTypeMaxStreamsUni:
 			if !frameOK(c, ptype, __01) {
 				return
@@ -280,6 +280,17 @@
 	return n
 }
 
+func (c *Conn) handleMaxStreamDataFrame(now time.Time, payload []byte) int {
+	id, maxStreamData, n := consumeMaxStreamDataFrame(payload)
+	if s := c.streamForFrame(now, id, sendStream); s != nil {
+		if err := s.handleMaxStreamData(maxStreamData); err != nil {
+			c.abort(now, err)
+			return -1
+		}
+	}
+	return n
+}
+
 func (c *Conn) handleCryptoFrame(now time.Time, space numberSpace, payload []byte) int {
 	off, data, n := consumeCryptoFrame(payload)
 	err := c.handleCrypto(now, space, off, data)
diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go
index f626323..7a531f5 100644
--- a/internal/quic/conn_streams.go
+++ b/internal/quic/conn_streams.go
@@ -20,6 +20,10 @@
 	streams   map[streamID]*Stream
 	opened    [streamTypeCount]int64 // number of streams opened by us
 
+	// Peer configuration provided in transport parameters.
+	peerInitialMaxStreamDataRemote    [streamTypeCount]int64 // streams opened by us
+	peerInitialMaxStreamDataBidiLocal int64                  // streams opened by them
+
 	// Streams with frames to send are stored in a circular linked list.
 	// sendHead is the next stream to write, or nil if there are no streams
 	// with data to send. sendTail is the last stream to write.
@@ -55,15 +59,24 @@
 	return c.newLocalStream(ctx, uniStream)
 }
 
-func (c *Conn) newLocalStream(ctx context.Context, typ streamType) (*Stream, error) {
+func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, error) {
 	// TODO: Stream limits.
 	c.streams.streamsMu.Lock()
 	defer c.streams.streamsMu.Unlock()
 
-	num := c.streams.opened[typ]
-	c.streams.opened[typ]++
+	num := c.streams.opened[styp]
+	c.streams.opened[styp]++
 
-	s := newStream(c, newStreamID(c.side, typ, num))
+	s := newStream(c, newStreamID(c.side, styp, num))
+	s.outmaxbuf = c.config.streamWriteBufferSize()
+	s.outwin = c.streams.peerInitialMaxStreamDataRemote[styp]
+	if styp == bidiStream {
+		s.inmaxbuf = c.config.streamReadBufferSize()
+		s.inwin = c.config.streamReadBufferSize()
+	}
+	s.inUnlock()
+	s.outUnlock()
+
 	c.streams.streams[s.id] = s
 	return s, nil
 }
@@ -117,7 +130,17 @@
 		c.abort(now, localTransportError(errStreamState))
 		return nil
 	}
+
 	s := newStream(c, id)
+	s.inmaxbuf = c.config.streamReadBufferSize()
+	s.inwin = c.config.streamReadBufferSize()
+	if id.streamType() == bidiStream {
+		s.outmaxbuf = c.config.streamWriteBufferSize()
+		s.outwin = c.streams.peerInitialMaxStreamDataBidiLocal
+	}
+	s.inUnlock()
+	s.outUnlock()
+
 	c.streams.streams[id] = s
 	c.streams.queue.put(s)
 	return s
diff --git a/internal/quic/conn_test.go b/internal/quic/conn_test.go
index 5aad69f..2480f9c 100644
--- a/internal/quic/conn_test.go
+++ b/internal/quic/conn_test.go
@@ -179,6 +179,8 @@
 	peerProvidedParams := defaultTransportParameters()
 	for _, o := range opts {
 		switch o := o.(type) {
+		case func(*Config):
+			o(config)
 		case func(*tls.Config):
 			o(config.TLSConfig)
 		case func(p *transportParameters):
diff --git a/internal/quic/crypto_stream.go b/internal/quic/crypto_stream.go
index 6cda657..75dea87 100644
--- a/internal/quic/crypto_stream.go
+++ b/internal/quic/crypto_stream.go
@@ -118,28 +118,7 @@
 // copy the data it wants into position.
 func (s *cryptoStream) dataToSend(pto bool, f func(off, size int64) (sent int64)) {
 	for {
-		var off, size int64
-		if pto {
-			// On PTO, resend unacked data that fits in the probe packet.
-			// For simplicity, we send the range starting at s.out.start
-			// (which is definitely unacked, or else we would have discarded it)
-			// up to the next acked byte (if any).
-			//
-			// This may miss unacked data starting after that acked byte,
-			// but avoids resending data the peer has acked.
-			off = s.out.start
-			end := s.out.end
-			for _, r := range s.outacked {
-				if r.start > off {
-					end = r.start
-					break
-				}
-			}
-			size = end - s.out.start
-		} else if s.outunsent.numRanges() > 0 {
-			off = s.outunsent.min()
-			size = s.outunsent[0].size()
-		}
+		off, size := dataToSend(s.out, s.outunsent, s.outacked, pto)
 		if size == 0 {
 			return
 		}
diff --git a/internal/quic/gate.go b/internal/quic/gate.go
index efb28da..27ab07a 100644
--- a/internal/quic/gate.go
+++ b/internal/quic/gate.go
@@ -20,13 +20,19 @@
 	unset chan struct{}
 }
 
+// newGate returns a new, unlocked gate with the condition unset.
 func newGate() gate {
-	g := gate{
+	g := newLockedGate()
+	g.unlock(false)
+	return g
+}
+
+// newLocked gate returns a new, locked gate.
+func newLockedGate() gate {
+	return gate{
 		set:   make(chan struct{}, 1),
 		unset: make(chan struct{}, 1),
 	}
-	g.unset <- struct{}{}
-	return g
 }
 
 // lock acquires the gate unconditionally.
diff --git a/internal/quic/quic_test.go b/internal/quic/quic_test.go
new file mode 100644
index 0000000..1281b54
--- /dev/null
+++ b/internal/quic/quic_test.go
@@ -0,0 +1,37 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+	"testing"
+)
+
+func testSides(t *testing.T, name string, f func(*testing.T, connSide)) {
+	if name != "" {
+		name += "/"
+	}
+	t.Run(name+"server", func(t *testing.T) { f(t, serverSide) })
+	t.Run(name+"client", func(t *testing.T) { f(t, clientSide) })
+}
+
+func testStreamTypes(t *testing.T, name string, f func(*testing.T, streamType)) {
+	if name != "" {
+		name += "/"
+	}
+	t.Run(name+"bidi", func(t *testing.T) { f(t, bidiStream) })
+	t.Run(name+"uni", func(t *testing.T) { f(t, uniStream) })
+}
+
+func testSidesAndStreamTypes(t *testing.T, name string, f func(*testing.T, connSide, streamType)) {
+	if name != "" {
+		name += "/"
+	}
+	t.Run(name+"server/bidi", func(t *testing.T) { f(t, serverSide, bidiStream) })
+	t.Run(name+"client/bidi", func(t *testing.T) { f(t, clientSide, bidiStream) })
+	t.Run(name+"server/uni", func(t *testing.T) { f(t, serverSide, uniStream) })
+	t.Run(name+"client/uni", func(t *testing.T) { f(t, clientSide, uniStream) })
+}
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index b55f927..83215df 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -9,34 +9,57 @@
 import (
 	"context"
 	"errors"
+	"io"
 )
 
 type Stream struct {
 	id   streamID
 	conn *Conn
 
+	// ingate's lock guards all receive-related state.
+	//
+	// The gate condition is set if a read from the stream will not block,
+	// either because the stream has available data or because the read will fail.
+	ingate    gate
+	in        pipe            // received data
+	inwin     int64           // last MAX_STREAM_DATA sent to the peer
+	insendmax sentVal         // set when we should send MAX_STREAM_DATA to the peer
+	inmaxbuf  int64           // maximum amount of data we will buffer
+	insize    int64           // stream final size; -1 before this is known
+	inset     rangeset[int64] // received ranges
+
 	// outgate's lock guards all send-related state.
 	//
 	// The gate condition is set if a write to the stream will not block,
 	// either because the stream has available flow control or because
 	// the write will fail.
-	outgate   gate
-	outopened sentVal // set if we should open the stream
+	outgate    gate
+	out        pipe            // buffered data to send
+	outwin     int64           // maximum MAX_STREAM_DATA received from the peer
+	outmaxbuf  int64           // maximum amount of data we will buffer
+	outunsent  rangeset[int64] // ranges buffered but not yet sent
+	outacked   rangeset[int64] // ranges sent and acknowledged
+	outopened  sentVal         // set if we should open the stream
+	outclosed  sentVal         // set by CloseWrite
+	outblocked sentVal         // set when a write to the stream is blocked by flow control
 
 	prev, next *Stream // guarded by streamsState.sendMu
 }
 
+// newStream returns a new stream.
+//
+// The stream's ingate and outgate are locked.
+// (We create the stream with locked gates so after the caller
+// initializes the flow control window,
+// unlocking outgate will set the stream writability state.)
 func newStream(c *Conn, id streamID) *Stream {
 	s := &Stream{
 		conn:    c,
 		id:      id,
-		outgate: newGate(),
+		insize:  -1, // -1 indicates the stream size is unknown
+		ingate:  newLockedGate(),
+		outgate: newLockedGate(),
 	}
-
-	// Lock and unlock outgate to update the stream writability state.
-	s.outgate.lock()
-	s.outUnlock()
-
 	return s
 }
 
@@ -66,8 +89,48 @@
 // returning all data sent by the peer.
 // If the peer terminates reads abruptly, ReadContext returns StreamResetError.
 func (s *Stream) ReadContext(ctx context.Context, b []byte) (n int, err error) {
-	// TODO: implement
-	return 0, errors.New("unimplemented")
+	if s.IsWriteOnly() {
+		return 0, errors.New("read from write-only stream")
+	}
+	// Wait until data is available.
+	if err := s.conn.waitAndLockGate(ctx, &s.ingate); err != nil {
+		return 0, err
+	}
+	defer s.inUnlock()
+	if s.insize == s.in.start {
+		return 0, io.EOF
+	}
+	// Getting here indicates the stream contains data to be read.
+	if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start {
+		panic("BUG: inconsistent input stream state")
+	}
+	if size := int(s.inset[0].end - s.in.start); size < len(b) {
+		b = b[:size]
+	}
+	start := s.in.start
+	end := start + int64(len(b))
+	s.in.copy(start, b)
+	s.in.discardBefore(end)
+	if s.insize == -1 || s.insize > s.inwin {
+		if shouldUpdateFlowControl(s.inwin-s.in.start, s.inmaxbuf) {
+			// Update stream flow control with a STREAM_MAX_DATA frame.
+			s.insendmax.setUnsent()
+		}
+	}
+	if end == s.insize {
+		return len(b), io.EOF
+	}
+	return len(b), nil
+}
+
+// shouldUpdateFlowControl determines whether to send a flow control window update.
+//
+// We want to balance keeping the peer well-supplied with flow control with not sending
+// many small updates.
+func shouldUpdateFlowControl(curwin, maxwin int64) bool {
+	// Update flow control if doing so gives the peer at least 64k tokens,
+	// or if it will double the current window.
+	return maxwin-curwin >= 64<<10 || curwin*2 < maxwin
 }
 
 // Write writes data to the stream.
@@ -87,65 +150,330 @@
 	if s.IsReadOnly() {
 		return 0, errors.New("write to read-only stream")
 	}
-	if len(b) > 0 {
-		// TODO: implement
-		return 0, errors.New("unimplemented")
+	canWrite := s.outgate.lock()
+	if s.outclosed.isSet() {
+		s.outUnlock()
+		return 0, errors.New("write to closed stream")
 	}
-	if err := s.outgate.waitAndLockContext(ctx); err != nil {
-		return 0, err
+	if len(b) == 0 {
+		// We aren't writing any data, but send a STREAM frame to open the stream
+		// if we haven't done so already.
+		s.outopened.set()
 	}
-	defer s.outUnlock()
-
-	// Set outopened to send a STREAM frame with no data,
-	// opening the stream on the peer.
-	s.outopened.set()
-
+	for len(b) > 0 {
+		// The first time through this loop, we may or may not be write blocked.
+		// We exit the loop after writing all data, so on subsequent passes through
+		// the loop we are always write blocked.
+		if !canWrite {
+			// We're blocked, either by flow control or by our own buffer limit.
+			// We either need the peer to extend our flow control window,
+			// or ack some of our outstanding packets.
+			if s.out.end == s.outwin {
+				// We're blocked by flow control.
+				// Send a STREAM_DATA_BLOCKED frame to let the peer know.
+				s.outblocked.setUnsent()
+			}
+			s.outUnlock()
+			if err := s.conn.waitAndLockGate(ctx, &s.outgate); err != nil {
+				return n, err
+			}
+			// Successfully returning from waitAndLockGate means we are no longer
+			// write blocked. (Unlike traditional condition variables, gates do not
+			// have spurious wakeups.)
+		}
+		s.outblocked.clear()
+		// Write limit is min(our own buffer limit, the peer-provided flow control window).
+		// This is a stream offset.
+		lim := min(s.out.start+s.outmaxbuf, s.outwin)
+		// Amount to write is min(the full buffer, data up to the write limit).
+		// This is a number of bytes.
+		nn := min(int64(len(b)), lim-s.out.end)
+		// Copy the data into the output buffer and mark it as unsent.
+		s.outunsent.add(s.out.end, s.out.end+nn)
+		s.out.writeAt(b[:nn], s.out.end)
+		s.outopened.set()
+		b = b[nn:]
+		n += int(nn)
+		// If we have bytes left to send, we're blocked.
+		canWrite = false
+	}
+	s.outUnlock()
 	return n, nil
 }
 
+// Close closes the stream.
+// See CloseContext for more details.
+func (s *Stream) Close() error {
+	return s.CloseContext(context.Background())
+}
+
+// CloseContext closes the stream.
+// Any blocked stream operations will be unblocked and return errors.
+//
+// CloseContext flushes any data in the stream write buffer and waits for the peer to
+// acknowledge receipt of the data.
+// If the stream has been reset, it waits for the peer to acknowledge the reset.
+// If the context expires before the peer receives the stream's data,
+// CloseContext discards the buffer and returns the context error.
+func (s *Stream) CloseContext(ctx context.Context) error {
+	s.CloseRead()
+	s.CloseWrite()
+	// TODO: wait for peer to acknowledge data
+	// TODO: Return code from peer's RESET_STREAM frame?
+	return nil
+}
+
+// CloseRead aborts reads on the stream.
+// Any blocked reads will be unblocked and return errors.
+//
+// CloseRead notifies the peer that the stream has been closed for reading.
+// It does not wait for the peer to acknowledge the closure.
+// Use CloseContext to wait for the peer's acknowledgement.
+func (s *Stream) CloseRead() {
+	if s.IsWriteOnly() {
+		return
+	}
+	// TODO: support read-closing streams with a STOP_SENDING frame
+}
+
+// CloseWrite aborts writes on the stream.
+// Any blocked writes will be unblocked and return errors.
+//
+// CloseWrite sends any data in the stream write buffer to the peer.
+// It does not wait for the peer to acknowledge receipt of the data.
+// Use CloseContext to wait for the peer's acknowledgement.
+func (s *Stream) CloseWrite() {
+	if s.IsReadOnly() {
+		return
+	}
+	s.outgate.lock()
+	defer s.outUnlock()
+	s.outclosed.set()
+}
+
+// inUnlock unlocks s.ingate.
+// It sets the gate condition if reads from s will not block.
+// If s has receive-related frames to write, it notifies the Conn.
+func (s *Stream) inUnlock() {
+	if s.inUnlockNoQueue() {
+		s.conn.queueStreamForSend(s)
+	}
+}
+
+// inUnlockNoQueue is inUnlock,
+// but reports whether s has frames to write rather than notifying the Conn.
+func (s *Stream) inUnlockNoQueue() (shouldSend bool) {
+	// TODO: STOP_SENDING
+	canRead := s.inset.contains(s.in.start) || // data available to read
+		s.insize == s.in.start // at EOF
+	s.ingate.unlock(canRead)
+	return s.insendmax.shouldSend() // STREAM_MAX_DATA
+}
+
 // outUnlock unlocks s.outgate.
 // It sets the gate condition if writes to s will not block.
-// If s has frames to write, it notifies the Conn.
+// If s has send-related frames to write, it notifies the Conn.
 func (s *Stream) outUnlock() {
-	if s.outopened.shouldSend() {
+	if s.outUnlockNoQueue() {
 		s.conn.queueStreamForSend(s)
 	}
-	canSend := true // TODO: set sendability status based on flow control
-	s.outgate.unlock(canSend)
+}
+
+// outUnlockNoQueue is outUnlock,
+// but reports whether s has frames to write rather than notifying the Conn.
+func (s *Stream) outUnlockNoQueue() (shouldSend bool) {
+	lim := min(s.out.start+s.outmaxbuf, s.outwin)
+	canWrite := lim > s.out.end || // available flow control
+		s.outclosed.isSet() // closed
+	s.outgate.unlock(canWrite)
+	return len(s.outunsent) > 0 || // STREAM frame with data
+		s.outclosed.shouldSend() || // STREAM frame with FIN bit
+		s.outopened.shouldSend() || // STREAM frame with no data
+		s.outblocked.shouldSend() // STREAM_DATA_BLOCKED
 }
 
 // handleData handles data received in a STREAM frame.
 func (s *Stream) handleData(off int64, b []byte, fin bool) error {
-	// TODO
+	s.ingate.lock()
+	defer s.inUnlock()
+	end := off + int64(len(b))
+	if end > s.inwin {
+		// The peer sent us data past the maximum flow control window we gave them.
+		return localTransportError(errFlowControl)
+	}
+	if s.insize != -1 && end > s.insize {
+		// The peer sent us data past the final size of the stream they previously gave us.
+		return localTransportError(errFinalSize)
+	}
+	s.in.writeAt(b, off)
+	s.inset.add(off, end)
+	if fin {
+		if s.insize != -1 && s.insize != end {
+			// The peer changed the final size of the stream.
+			return localTransportError(errFinalSize)
+		}
+		s.insize = end
+		// The peer has enough flow control window to send the entire stream.
+		s.insendmax.clear()
+	}
 	return nil
 }
 
+// handleMaxStreamData handles an update received in a MAX_STREAM_DATA frame.
+func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
+	s.outgate.lock()
+	defer s.outUnlock()
+	s.outwin = max(maxStreamData, s.outwin)
+	return nil
+}
+
+// ackOrLoss handles the fate of stream frames other than STREAM.
+func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) {
+	// Frames which carry new information each time they are sent
+	// (MAX_STREAM_DATA, STREAM_DATA_BLOCKED) must only be marked
+	// as received if the most recent packet carrying this frame is acked.
+	//
+	// Frames which are always the same (STOP_SENDING, RESET_STREAM)
+	// can be marked as received if any packet carrying this frame is acked.
+	switch ftype {
+	case frameTypeMaxStreamData:
+		s.ingate.lock()
+		s.insendmax.ackLatestOrLoss(pnum, fate)
+		s.inUnlock()
+	case frameTypeStreamDataBlocked:
+		s.outgate.lock()
+		s.outblocked.ackLatestOrLoss(pnum, fate)
+		s.outUnlock()
+	default:
+		// TODO: Handle STOP_SENDING, RESET_STREAM.
+		panic("unhandled frame type")
+	}
+}
+
 // ackOrLossData handles the fate of a STREAM frame.
 func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) {
 	s.outgate.lock()
 	defer s.outUnlock()
 	s.outopened.ackOrLoss(pnum, fate)
+	if fin {
+		s.outclosed.ackOrLoss(pnum, fate)
+	}
+	switch fate {
+	case packetAcked:
+		s.outacked.add(start, end)
+		s.outunsent.sub(start, end)
+		// If this ack is for data at the start of the send buffer, we can now discard it.
+		if s.outacked.contains(s.out.start) {
+			s.out.discardBefore(s.outacked[0].end)
+		}
+	case packetLost:
+		// Mark everything lost, but not previously acked, as needing retransmission.
+		// We do this by adding all the lost bytes to outunsent, and then
+		// removing everything already acked.
+		s.outunsent.add(start, end)
+		for _, a := range s.outacked {
+			s.outunsent.sub(a.start, a.end)
+		}
+	}
 }
 
+// appendInFrames appends STOP_SENDING and MAX_STREAM_DATA frames
+// to the current packet.
+//
+// It returns true if no more frames need appending,
+// false if not everything fit in the current packet.
 func (s *Stream) appendInFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
+	s.ingate.lock()
+	defer s.inUnlockNoQueue()
 	// TODO: STOP_SENDING
-	// TODO: MAX_STREAM_DATA
+	if s.insendmax.shouldSendPTO(pto) {
+		// MAX_STREAM_DATA
+		maxStreamData := s.in.start + s.inmaxbuf
+		if !w.appendMaxStreamDataFrame(s.id, maxStreamData) {
+			return false
+		}
+		s.inwin = maxStreamData
+		s.insendmax.setSent(pnum)
+	}
 	return true
 }
 
+// appendOutFrames appends RESET_STREAM, STREAM_DATA_BLOCKED, and STREAM frames
+// to the current packet.
+//
+// It returns true if no more frames need appending,
+// false if not everything fit in the current packet.
 func (s *Stream) appendOutFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
+	s.outgate.lock()
+	defer s.outUnlockNoQueue()
 	// TODO: RESET_STREAM
-	// TODO: STREAM_DATA_BLOCKED
-	// TODO: STREAM frames with data
-	if s.outopened.shouldSendPTO(pto) {
-		off := int64(0)
-		size := 0
-		fin := false
-		_, added := w.appendStreamFrame(s.id, off, size, fin)
+	if s.outblocked.shouldSendPTO(pto) {
+		// STREAM_DATA_BLOCKED
+		if !w.appendStreamDataBlockedFrame(s.id, s.out.end) {
+			return false
+		}
+		s.outblocked.setSent(pnum)
+		s.frameOpensStream(pnum)
+	}
+	// STREAM
+	for {
+		off, size := dataToSend(s.out, s.outunsent, s.outacked, pto)
+		fin := s.outclosed.isSet() && off+size == s.out.end
+		shouldSend := size > 0 || // have data to send
+			s.outopened.shouldSendPTO(pto) || // should open the stream
+			(fin && s.outclosed.shouldSendPTO(pto)) // should close the stream
+		if !shouldSend {
+			return true
+		}
+		b, added := w.appendStreamFrame(s.id, off, int(size), fin)
 		if !added {
 			return false
 		}
+		s.out.copy(off, b)
+		s.outunsent.sub(off, off+int64(len(b)))
+		s.frameOpensStream(pnum)
+		if fin {
+			s.outclosed.setSent(pnum)
+		}
+		if pto {
+			return true
+		}
+		if int64(len(b)) < size {
+			return false
+		}
+	}
+}
+
+// frameOpensStream records that we're sending a frame that will open the stream.
+//
+// If we don't have an acknowledgement from the peer for a previous frame opening the stream,
+// record this packet as being the latest one to open it.
+func (s *Stream) frameOpensStream(pnum packetNumber) {
+	if !s.outopened.isReceived() {
 		s.outopened.setSent(pnum)
 	}
-	return true
+}
+
+// dataToSend returns the next range of data to send in a STREAM or CRYPTO_STREAM.
+func dataToSend(out pipe, outunsent, outacked rangeset[int64], pto bool) (start, size int64) {
+	switch {
+	case pto:
+		// On PTO, resend unacked data that fits in the probe packet.
+		// For simplicity, we send the range starting at s.out.start
+		// (which is definitely unacked, or else we would have discarded it)
+		// up to the next acked byte (if any).
+		//
+		// This may miss unacked data starting after that acked byte,
+		// but avoids resending data the peer has acked.
+		for _, r := range outacked {
+			if r.start > out.start {
+				return out.start, r.start - out.start
+			}
+		}
+		return out.start, out.end - out.start
+	case outunsent.numRanges() > 0:
+		return outunsent.min(), outunsent[0].size()
+	default:
+		return out.end, 0
+	}
 }
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index 8ae9dbc..d158e72 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -7,10 +7,703 @@
 package quic
 
 import (
+	"bytes"
+	"context"
+	"crypto/rand"
+	"fmt"
+	"io"
 	"reflect"
+	"strings"
 	"testing"
 )
 
+func TestStreamWriteBlockedByStreamFlowControl(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		ctx := canceledContext()
+		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+		tc := newTestConn(t, clientSide, func(p *transportParameters) {
+			p.initialMaxStreamsBidi = 100
+			p.initialMaxStreamsUni = 100
+			p.initialMaxData = 1 << 20
+		})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+
+		// Non-blocking write with no flow control.
+		s, err := tc.conn.newLocalStream(ctx, styp)
+		if err != nil {
+			t.Fatal(err)
+		}
+		_, err = s.WriteContext(ctx, want)
+		if err != context.Canceled {
+			t.Fatalf("write to stream with no flow control: err = %v, want context.Canceled", err)
+		}
+		tc.wantFrame("write blocked by flow control triggers a STREAM_DATA_BLOCKED frame",
+			packetType1RTT, debugFrameStreamDataBlocked{
+				id:  s.id,
+				max: 0,
+			})
+
+		// Blocking write waiting for flow control.
+		w := runAsync(tc, func(ctx context.Context) (int, error) {
+			return s.WriteContext(ctx, want)
+		})
+		tc.wantFrame("second blocked write triggers another STREAM_DATA_BLOCKED",
+			packetType1RTT, debugFrameStreamDataBlocked{
+				id:  s.id,
+				max: 0,
+			})
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+			id:  s.id,
+			max: 4,
+		})
+		tc.wantFrame("stream window extended, but still more data to write",
+			packetType1RTT, debugFrameStreamDataBlocked{
+				id:  s.id,
+				max: 4,
+			})
+		tc.wantFrame("stream window extended to 4, expect blocked write to progress",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				data: want[:4],
+			})
+
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+			id:  s.id,
+			max: int64(len(want)),
+		})
+		tc.wantFrame("stream window extended further, expect blocked write to finish",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  4,
+				data: want[4:],
+			})
+		n, err := w.result()
+		if n != len(want) || err != nil {
+			t.Errorf("Write() = %v, %v; want %v, nil", n, err, len(want))
+		}
+	})
+}
+
+func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) {
+	// "A sender MUST ignore any MAX_STREAM_DATA [...] frames that
+	// do not increase flow control limits."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-4.1-9
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		ctx := canceledContext()
+		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+		tc := newTestConn(t, clientSide, func(p *transportParameters) {
+			if styp == uniStream {
+				p.initialMaxStreamsUni = 1
+				p.initialMaxStreamDataUni = 4
+			} else {
+				p.initialMaxStreamsBidi = 1
+				p.initialMaxStreamDataBidiRemote = 4
+			}
+			p.initialMaxData = 1 << 20
+		})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+		tc.ignoreFrame(frameTypeStreamDataBlocked)
+
+		// Write [0,1).
+		s, err := tc.conn.newLocalStream(ctx, styp)
+		if err != nil {
+			t.Fatal(err)
+		}
+		s.WriteContext(ctx, want[:1])
+		tc.wantFrame("sent data (1 byte) fits within flow control limit",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  0,
+				data: want[:1],
+			})
+
+		// MAX_STREAM_DATA tries to decrease limit, and is ignored.
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+			id:  s.id,
+			max: 2,
+		})
+
+		// Write [1,4).
+		s.WriteContext(ctx, want[1:])
+		tc.wantFrame("stream limit is 4 bytes, ignoring decrease in MAX_STREAM_DATA",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  1,
+				data: want[1:4],
+			})
+
+		// MAX_STREAM_DATA increases limit.
+		// Second MAX_STREAM_DATA decreases it, and is ignored.
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+			id:  s.id,
+			max: 8,
+		})
+		tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+			id:  s.id,
+			max: 6,
+		})
+
+		// Write [1,4).
+		s.WriteContext(ctx, want[4:])
+		tc.wantFrame("stream limit is 8 bytes, ignoring decrease in MAX_STREAM_DATA",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  4,
+				data: want[4:8],
+			})
+	})
+}
+
+func TestStreamWriteBlockedByWriteBufferLimit(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		ctx := canceledContext()
+		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+		const maxWriteBuffer = 4
+		tc := newTestConn(t, clientSide, func(p *transportParameters) {
+			p.initialMaxStreamsBidi = 100
+			p.initialMaxStreamsUni = 100
+			p.initialMaxData = 1 << 20
+			p.initialMaxStreamDataBidiRemote = 1 << 20
+			p.initialMaxStreamDataUni = 1 << 20
+		}, func(c *Config) {
+			c.StreamWriteBufferSize = maxWriteBuffer
+		})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+
+		// Write more data than StreamWriteBufferSize.
+		// The peer has given us plenty of flow control,
+		// so we're just blocked by our local limit.
+		s, err := tc.conn.newLocalStream(ctx, styp)
+		if err != nil {
+			t.Fatal(err)
+		}
+		w := runAsync(tc, func(ctx context.Context) (int, error) {
+			return s.WriteContext(ctx, want)
+		})
+		tc.wantFrame("stream write should send as much data as write buffer allows",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  0,
+				data: want[:maxWriteBuffer],
+			})
+		tc.wantIdle("no STREAM_DATA_BLOCKED, we're blocked locally not by flow control")
+
+		// ACK for previously-sent data allows making more progress.
+		tc.writeFrames(packetType1RTT, debugFrameAck{
+			ranges: []i64range[packetNumber]{{0, tc.sentFramePacket.num + 1}},
+		})
+		tc.wantFrame("ACK for previous data allows making progress",
+			packetType1RTT, debugFrameStream{
+				id:   s.id,
+				off:  maxWriteBuffer,
+				data: want[maxWriteBuffer:][:maxWriteBuffer],
+			})
+
+		// Cancel the write with data left to send.
+		w.cancel()
+		n, err := w.result()
+		if n != 2*maxWriteBuffer || err == nil {
+			t.Fatalf("WriteContext() = %v, %v; want %v bytes, error", n, err, 2*maxWriteBuffer)
+		}
+	})
+}
+
+func TestStreamReceive(t *testing.T) {
+	// "Endpoints MUST be able to deliver stream data to an application as
+	// an ordered byte stream."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-2.2-2
+	want := make([]byte, 5000)
+	for i := range want {
+		want[i] = byte(i)
+	}
+	type frame struct {
+		start   int64
+		end     int64
+		fin     bool
+		want    int
+		wantEOF bool
+	}
+	for _, test := range []struct {
+		name   string
+		frames []frame
+	}{{
+		name: "linear",
+		frames: []frame{{
+			start: 0,
+			end:   1000,
+			want:  1000,
+		}, {
+			start: 1000,
+			end:   2000,
+			want:  2000,
+		}, {
+			start:   2000,
+			end:     3000,
+			want:    3000,
+			fin:     true,
+			wantEOF: true,
+		}},
+	}, {
+		name: "out of order",
+		frames: []frame{{
+			start: 1000,
+			end:   2000,
+		}, {
+			start: 2000,
+			end:   3000,
+		}, {
+			start: 0,
+			end:   1000,
+			want:  3000,
+		}},
+	}, {
+		name: "resent",
+		frames: []frame{{
+			start: 0,
+			end:   1000,
+			want:  1000,
+		}, {
+			start: 0,
+			end:   1000,
+			want:  1000,
+		}, {
+			start: 1000,
+			end:   2000,
+			want:  2000,
+		}, {
+			start: 0,
+			end:   1000,
+			want:  2000,
+		}, {
+			start: 1000,
+			end:   2000,
+			want:  2000,
+		}},
+	}, {
+		name: "overlapping",
+		frames: []frame{{
+			start: 0,
+			end:   1000,
+			want:  1000,
+		}, {
+			start: 3000,
+			end:   4000,
+			want:  1000,
+		}, {
+			start: 2000,
+			end:   3000,
+			want:  1000,
+		}, {
+			start: 1000,
+			end:   3000,
+			want:  4000,
+		}},
+	}, {
+		name: "early eof",
+		frames: []frame{{
+			start: 3000,
+			end:   3000,
+			fin:   true,
+			want:  0,
+		}, {
+			start: 1000,
+			end:   2000,
+			want:  0,
+		}, {
+			start: 0,
+			end:   1000,
+			want:  2000,
+		}, {
+			start:   2000,
+			end:     3000,
+			want:    3000,
+			wantEOF: true,
+		}},
+	}, {
+		name: "empty eof",
+		frames: []frame{{
+			start: 0,
+			end:   1000,
+			want:  1000,
+		}, {
+			start:   1000,
+			end:     1000,
+			fin:     true,
+			want:    1000,
+			wantEOF: true,
+		}},
+	}} {
+		testStreamTypes(t, test.name, func(t *testing.T, styp streamType) {
+			ctx := canceledContext()
+			tc := newTestConn(t, serverSide)
+			tc.handshake()
+			sid := newStreamID(clientSide, styp, 0)
+			var s *Stream
+			got := make([]byte, len(want))
+			var total int
+			for _, f := range test.frames {
+				t.Logf("receive [%v,%v)", f.start, f.end)
+				tc.writeFrames(packetType1RTT, debugFrameStream{
+					id:   sid,
+					off:  f.start,
+					data: want[f.start:f.end],
+					fin:  f.fin,
+				})
+				if s == nil {
+					var err error
+					s, err = tc.conn.AcceptStream(ctx)
+					if err != nil {
+						tc.t.Fatalf("conn.AcceptStream() = %v", err)
+					}
+				}
+				for {
+					n, err := s.ReadContext(ctx, got[total:])
+					t.Logf("s.ReadContext() = %v, %v", n, err)
+					total += n
+					if f.wantEOF && err != io.EOF {
+						t.Fatalf("ReadContext() error = %v; want io.EOF", err)
+					}
+					if !f.wantEOF && err == io.EOF {
+						t.Fatalf("ReadContext() error = io.EOF, want something else")
+					}
+					if err != nil {
+						break
+					}
+				}
+				if total != f.want {
+					t.Fatalf("total bytes read = %v, want %v", total, f.want)
+				}
+				for i := 0; i < total; i++ {
+					if got[i] != want[i] {
+						t.Fatalf("byte %v differs: got %v, want %v", i, got[i], want[i])
+					}
+				}
+			}
+		})
+	}
+
+}
+
+func TestStreamReceiveExtendsStreamWindow(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		const maxWindowSize = 20
+		ctx := canceledContext()
+		tc := newTestConn(t, serverSide, func(c *Config) {
+			c.StreamReadBufferSize = maxWindowSize
+		})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+		sid := newStreamID(clientSide, styp, 0)
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   sid,
+			off:  0,
+			data: make([]byte, maxWindowSize),
+		})
+		s, err := tc.conn.AcceptStream(ctx)
+		if err != nil {
+			t.Fatalf("AcceptStream: %v", err)
+		}
+		tc.wantIdle("stream window is not extended before data is read")
+		buf := make([]byte, maxWindowSize+1)
+		if n, err := s.ReadContext(ctx, buf); n != maxWindowSize || err != nil {
+			t.Fatalf("s.ReadContext() = %v, %v; want %v, nil", n, err, maxWindowSize)
+		}
+		tc.wantFrame("stream window is extended after reading data",
+			packetType1RTT, debugFrameMaxStreamData{
+				id:  sid,
+				max: maxWindowSize * 2,
+			})
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   sid,
+			off:  maxWindowSize,
+			data: make([]byte, maxWindowSize),
+			fin:  true,
+		})
+		if n, err := s.ReadContext(ctx, buf); n != maxWindowSize || err != io.EOF {
+			t.Fatalf("s.ReadContext() = %v, %v; want %v, io.EOF", n, err, maxWindowSize)
+		}
+		tc.wantIdle("stream window is not extended after FIN")
+	})
+}
+
+func TestStreamReceiveViolatesStreamDataLimit(t *testing.T) {
+	// "A receiver MUST close the connection with an error of type FLOW_CONTROL_ERROR if
+	// the sender violates the advertised [...] stream data limits [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-4.1-8
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		const maxStreamData = 10
+		for _, test := range []struct {
+			off  int64
+			size int64
+		}{{
+			off:  maxStreamData,
+			size: 1,
+		}, {
+			off:  0,
+			size: maxStreamData + 1,
+		}, {
+			off:  maxStreamData - 1,
+			size: 2,
+		}} {
+			tc := newTestConn(t, serverSide, func(c *Config) {
+				c.StreamReadBufferSize = maxStreamData
+			})
+			tc.handshake()
+			tc.ignoreFrame(frameTypeAck)
+			tc.writeFrames(packetType1RTT, debugFrameStream{
+				id:   newStreamID(clientSide, styp, 0),
+				off:  test.off,
+				data: make([]byte, test.size),
+			})
+			tc.wantFrame(
+				fmt.Sprintf("data [%v,%v) violates stream data limit and closes connection",
+					test.off, test.off+test.size),
+				packetType1RTT, debugFrameConnectionCloseTransport{
+					code: errFlowControl,
+				},
+			)
+		}
+	})
+}
+
+func TestStreamReceiveDuplicateDataDoesNotViolateLimits(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		const maxData = 10
+		tc := newTestConn(t, serverSide, func(c *Config) {
+			// TODO: Add connection-level maximum data here as well.
+			c.StreamReadBufferSize = maxData
+		})
+		tc.handshake()
+		tc.ignoreFrame(frameTypeAck)
+		for i := 0; i < 3; i++ {
+			tc.writeFrames(packetType1RTT, debugFrameStream{
+				id:   newStreamID(clientSide, styp, 0),
+				off:  0,
+				data: make([]byte, maxData),
+			})
+			tc.wantIdle(fmt.Sprintf("conn sends no frames after receiving data frame %v", i))
+		}
+	})
+}
+
+func TestStreamFinalSizeChangedByStreamFrame(t *testing.T) {
+	// "If a [...] STREAM frame is received indicating a change
+	// in the final size for the stream, an endpoint SHOULD
+	// respond with an error of type FINAL_SIZE_ERROR [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc := newTestConn(t, serverSide)
+		tc.handshake()
+		sid := newStreamID(clientSide, styp, 0)
+
+		const write1size = 4
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:  sid,
+			off: 10,
+			fin: true,
+		})
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:  sid,
+			off: 9,
+			fin: true,
+		})
+		tc.wantFrame("change in final size of stream is an error",
+			packetType1RTT, debugFrameConnectionCloseTransport{
+				code: errFinalSize,
+			},
+		)
+	})
+}
+
+func TestStreamDataBeyondFinalSize(t *testing.T) {
+	// "A receiver SHOULD treat receipt of data at or beyond
+	// the final size as an error of type FINAL_SIZE_ERROR [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc := newTestConn(t, serverSide)
+		tc.handshake()
+		sid := newStreamID(clientSide, styp, 0)
+
+		const write1size = 4
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   sid,
+			off:  0,
+			data: make([]byte, 16),
+			fin:  true,
+		})
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   sid,
+			off:  16,
+			data: []byte{0},
+		})
+		tc.wantFrame("received data past final size of stream",
+			packetType1RTT, debugFrameConnectionCloseTransport{
+				code: errFinalSize,
+			},
+		)
+	})
+}
+
+func TestStreamReceiveUnblocksReader(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc := newTestConn(t, serverSide)
+		tc.handshake()
+		want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+		sid := newStreamID(clientSide, styp, 0)
+
+		// AcceptStream blocks until a STREAM frame is received.
+		accept := runAsync(tc, func(ctx context.Context) (*Stream, error) {
+			return tc.conn.AcceptStream(ctx)
+		})
+		const write1size = 4
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   sid,
+			off:  0,
+			data: want[:write1size],
+		})
+		s, err := accept.result()
+		if err != nil {
+			t.Fatalf("AcceptStream() = %v", err)
+		}
+
+		// ReadContext succeeds immediately, since we already have data.
+		got := make([]byte, len(want))
+		read := runAsync(tc, func(ctx context.Context) (int, error) {
+			return s.ReadContext(ctx, got)
+		})
+		if n, err := read.result(); n != write1size || err != nil {
+			t.Fatalf("ReadContext = %v, %v; want %v, nil", n, err, write1size)
+		}
+
+		// ReadContext blocks waiting for more data.
+		read = runAsync(tc, func(ctx context.Context) (int, error) {
+			return s.ReadContext(ctx, got[write1size:])
+		})
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   sid,
+			off:  write1size,
+			data: want[write1size:],
+			fin:  true,
+		})
+		if n, err := read.result(); n != len(want)-write1size || err != io.EOF {
+			t.Fatalf("ReadContext = %v, %v; want %v, io.EOF", n, err, len(want)-write1size)
+		}
+		if !bytes.Equal(got, want) {
+			t.Fatalf("read bytes %x, want %x", got, want)
+		}
+	})
+}
+
+// testStreamSendFrameInvalidState calls the test func with a stream ID for:
+//
+//   - a remote bidirectional stream that the peer has not created
+//   - a remote unidirectional stream
+//
+// It then sends the returned frame (STREAM, STREAM_DATA_BLOCKED, etc.)
+// to the conn and expects a STREAM_STATE_ERROR.
+func testStreamSendFrameInvalidState(t *testing.T, f func(sid streamID) debugFrame) {
+	testSides(t, "stream_not_created", func(t *testing.T, side connSide) {
+		tc := newTestConn(t, side)
+		tc.handshake()
+		tc.writeFrames(packetType1RTT, f(newStreamID(side, bidiStream, 0)))
+		tc.wantFrame("frame for local stream which has not been created",
+			packetType1RTT, debugFrameConnectionCloseTransport{
+				code: errStreamState,
+			})
+	})
+	testSides(t, "uni_stream", func(t *testing.T, side connSide) {
+		ctx := canceledContext()
+		tc := newTestConn(t, side)
+		tc.handshake()
+		sid := newStreamID(side, uniStream, 0)
+		s, err := tc.conn.NewSendOnlyStream(ctx)
+		if err != nil {
+			t.Fatal(err)
+		}
+		s.Write(nil) // open the stream
+		tc.wantFrame("new stream is opened",
+			packetType1RTT, debugFrameStream{
+				id:   sid,
+				data: []byte{},
+			})
+		tc.writeFrames(packetType1RTT, f(sid))
+		tc.wantFrame("send-oriented frame for send-only stream",
+			packetType1RTT, debugFrameConnectionCloseTransport{
+				code: errStreamState,
+			})
+	})
+}
+
+func TestStreamStreamFrameInvalidState(t *testing.T) {
+	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
+	// if it receives a STREAM frame for a locally initiated stream
+	// that has not yet been created, or for a send-only stream."
+	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
+	testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
+		return debugFrameStream{
+			id: sid,
+		}
+	})
+}
+
+func TestStreamDataBlockedInvalidState(t *testing.T) {
+	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
+	// if it receives a STREAM frame for a locally initiated stream
+	// that has not yet been created, or for a send-only stream."
+	// https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
+	testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
+		return debugFrameStream{
+			id: sid,
+		}
+	})
+}
+
+// testStreamReceiveFrameInvalidState calls the test func with a stream ID for:
+//
+//   - a remote bidirectional stream that the peer has not created
+//   - a local unidirectional stream
+//
+// It then sends the returned frame (MAX_STREAM_DATA, STOP_SENDING, etc.)
+// to the conn and expects a STREAM_STATE_ERROR.
+func testStreamReceiveFrameInvalidState(t *testing.T, f func(sid streamID) debugFrame) {
+	testSides(t, "stream_not_created", func(t *testing.T, side connSide) {
+		tc := newTestConn(t, side)
+		tc.handshake()
+		tc.writeFrames(packetType1RTT, f(newStreamID(side, bidiStream, 0)))
+		tc.wantFrame("frame for local stream which has not been created",
+			packetType1RTT, debugFrameConnectionCloseTransport{
+				code: errStreamState,
+			})
+	})
+	testSides(t, "uni_stream", func(t *testing.T, side connSide) {
+		tc := newTestConn(t, side)
+		tc.handshake()
+		tc.writeFrames(packetType1RTT, f(newStreamID(side.peer(), uniStream, 0)))
+		tc.wantFrame("receive-oriented frame for receive-only stream",
+			packetType1RTT, debugFrameConnectionCloseTransport{
+				code: errStreamState,
+			})
+	})
+}
+
+func TestStreamMaxStreamDataInvalidState(t *testing.T) {
+	// "Receiving a MAX_STREAM_DATA frame for a locally initiated stream
+	// that has not yet been created MUST be treated as a connection error
+	// of type STREAM_STATE_ERROR. An endpoint that receives a MAX_STREAM_DATA
+	// frame for a receive-only stream MUST terminate the connection
+	// with error STREAM_STATE_ERROR."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-19.10-2
+	testStreamReceiveFrameInvalidState(t, func(sid streamID) debugFrame {
+		return debugFrameMaxStreamData{
+			id:  sid,
+			max: 1000,
+		}
+	})
+}
+
 func TestStreamOffsetTooLarge(t *testing.T) {
 	// "Receipt of a frame that exceeds [2^62-1] MUST be treated as a
 	// connection error of type FRAME_ENCODING_ERROR or FLOW_CONTROL_ERROR."
@@ -31,3 +724,104 @@
 		t.Fatalf("STREAM offset exceeds 2^62-1\ngot:  %v\nwant: %v\n  or: %v", got, want1, want2)
 	}
 }
+
+func TestStreamReadFromWriteOnlyStream(t *testing.T) {
+	_, s := newTestConnAndLocalStream(t, serverSide, uniStream)
+	buf := make([]byte, 10)
+	wantErr := "read from write-only stream"
+	if n, err := s.Read(buf); err == nil || !strings.Contains(err.Error(), wantErr) {
+		t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
+	}
+}
+
+func TestStreamWriteToReadOnlyStream(t *testing.T) {
+	_, s := newTestConnAndRemoteStream(t, serverSide, uniStream)
+	buf := make([]byte, 10)
+	wantErr := "write to read-only stream"
+	if n, err := s.Write(buf); err == nil || !strings.Contains(err.Error(), wantErr) {
+		t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
+	}
+}
+
+func TestStreamWriteToClosedStream(t *testing.T) {
+	tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, func(p *transportParameters) {
+		p.initialMaxStreamsBidi = 1
+		p.initialMaxData = 1 << 20
+		p.initialMaxStreamDataBidiRemote = 1 << 20
+	})
+	s.Close()
+	tc.wantFrame("stream is opened after being closed",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  0,
+			fin:  true,
+			data: []byte{},
+		})
+	wantErr := "write to closed stream"
+	if n, err := s.Write([]byte{}); err == nil || !strings.Contains(err.Error(), wantErr) {
+		t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
+	}
+}
+
+func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
+	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+		p.initialMaxStreamsUni = 1
+		p.initialMaxData = 1 << 20
+		p.initialMaxStreamDataUni = 1 << 20
+	})
+	want := make([]byte, 4096)
+	rand.Read(want) // doesn't need to be crypto/rand, but non-deprecated and harmless
+	w := runAsync(tc, func(ctx context.Context) (int, error) {
+		return s.WriteContext(ctx, want)
+	})
+	got := make([]byte, 0, len(want))
+	for {
+		f, _ := tc.readFrame()
+		if f == nil {
+			break
+		}
+		sf, ok := f.(debugFrameStream)
+		if !ok {
+			t.Fatalf("unexpected frame: %v", sf)
+		}
+		if len(got) != int(sf.off) {
+			t.Fatalf("got frame: %v\nwant offset %v", sf, len(got))
+		}
+		got = append(got, sf.data...)
+	}
+	if n, err := w.result(); n != len(want) || err != nil {
+		t.Fatalf("s.WriteContext() = %v, %v; want %v, nil", n, err, len(want))
+	}
+	if !bytes.Equal(got, want) {
+		t.Fatalf("mismatch in received stream data")
+	}
+}
+
+func newTestConnAndLocalStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
+	t.Helper()
+	ctx := canceledContext()
+	tc := newTestConn(t, side, opts...)
+	tc.handshake()
+	tc.ignoreFrame(frameTypeAck)
+	s, err := tc.conn.newLocalStream(ctx, styp)
+	if err != nil {
+		t.Fatalf("conn.newLocalStream(%v) = %v", styp, err)
+	}
+	return tc, s
+}
+
+func newTestConnAndRemoteStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
+	t.Helper()
+	ctx := canceledContext()
+	tc := newTestConn(t, side, opts...)
+	tc.handshake()
+	tc.ignoreFrame(frameTypeAck)
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id: newStreamID(side.peer(), styp, 0),
+	})
+	s, err := tc.conn.AcceptStream(ctx)
+	if err != nil {
+		t.Fatalf("conn.AcceptStream() = %v", err)
+	}
+	return tc, s
+}