quic: read-closing and reset streams, wait on close

s.Close waits for the peer to acknowledge receipt of sent
data before returning.

s.ReadClose closes the receive end of a stream, discarding
buffered data and sending a STOP_SENDING frame to the peer.

s.Reset(code) closes the send end of a stream with an error,
which is sent to the peer in a RESET_STREAM frame.

Receipt of a STOP_SENDING frame resets the stream locally
and causes future writes to fail.

Receipt of a RESET_STREAM frame causes future reads to fail.

Stream state is currently retained even after a stream
has been completely closed. A future CL will add cleanup.

For golang/go#58547

Change-Id: I29088ae570db4079926ad426be6e85dace2122da
Reviewed-on: https://go-review.googlesource.com/c/net/+/518435
Run-TryBot: Damien Neil <dneil@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/quic/conn.go b/internal/quic/conn.go
index 0952a79..ee8f011 100644
--- a/internal/quic/conn.go
+++ b/internal/quic/conn.go
@@ -73,6 +73,7 @@
 	handleTLSEvent(tls.QUICEvent)
 	newConnID(seq int64) ([]byte, error)
 	waitAndLockGate(ctx context.Context, g *gate) error
+	waitOnDone(ctx context.Context, ch <-chan struct{}) error
 }
 
 func newConn(now time.Time, side connSide, initialConnID []byte, peerAddr netip.AddrPort, config *Config, l connListener, hooks connTestHooks) (*Conn, error) {
@@ -311,6 +312,26 @@
 	return g.waitAndLockContext(ctx)
 }
 
+func (c *Conn) waitOnDone(ctx context.Context, ch <-chan struct{}) error {
+	if c.testHooks != nil {
+		return c.testHooks.waitOnDone(ctx, ch)
+	}
+	// Check the channel before the context.
+	// We always prefer to return results when available,
+	// even when provided with an already-canceled context.
+	select {
+	case <-ch:
+		return nil
+	default:
+	}
+	select {
+	case <-ch:
+	case <-ctx.Done():
+		return ctx.Err()
+	}
+	return nil
+}
+
 // abort terminates a connection with an error.
 func (c *Conn) abort(now time.Time, err error) {
 	if c.errForPeer == nil {
diff --git a/internal/quic/conn_async_test.go b/internal/quic/conn_async_test.go
index 2078325..0da3ddb 100644
--- a/internal/quic/conn_async_test.go
+++ b/internal/quic/conn_async_test.go
@@ -82,10 +82,11 @@
 }
 
 // A blockedAsync is a blocked async operation.
-//
-// Currently, the only type of blocked operation is one waiting on a gate.
 type blockedAsync struct {
-	g     *gate
+	// Exactly one of these will be set, depending on the type of blocked operation.
+	g  *gate
+	ch <-chan struct{}
+
 	donec chan struct{} // closed when the operation is unblocked
 }
 
@@ -133,6 +134,25 @@
 		// Gate can be acquired without blocking.
 		return nil
 	}
+	return as.block(ctx, &blockedAsync{
+		g: g,
+	})
+}
+
+// waitOnDone replaces receiving from a chan struct{} in tests.
+func (as *asyncTestState) waitOnDone(ctx context.Context, ch <-chan struct{}) error {
+	select {
+	case <-ch:
+		return nil // read without blocking
+	default:
+	}
+	return as.block(ctx, &blockedAsync{
+		ch: ch,
+	})
+}
+
+// block waits for a blocked async operation to complete.
+func (as *asyncTestState) block(ctx context.Context, b *blockedAsync) error {
 	if err := ctx.Err(); err != nil {
 		// Context has already expired.
 		return err
@@ -144,12 +164,9 @@
 		// which may have unpredictable results.
 		panic("blocking async point with unexpected Context")
 	}
+	b.donec = make(chan struct{})
 	// Record this as a pending blocking operation.
 	as.mu.Lock()
-	b := &blockedAsync{
-		g:     g,
-		donec: make(chan struct{}),
-	}
 	as.blocked[b] = struct{}{}
 	as.mu.Unlock()
 	// Notify the creator of the operation that we're blocked,
@@ -169,8 +186,19 @@
 	as.mu.Lock()
 	var woken *blockedAsync
 	for w := range as.blocked {
-		if w.g.lockIfSet() {
-			woken = w
+		switch {
+		case w.g != nil:
+			if w.g.lockIfSet() {
+				woken = w
+			}
+		case w.ch != nil:
+			select {
+			case <-w.ch:
+				woken = w
+			default:
+			}
+		}
+		if woken != nil {
 			delete(as.blocked, woken)
 			break
 		}
diff --git a/internal/quic/conn_loss.go b/internal/quic/conn_loss.go
index f42f7e5..103db9f 100644
--- a/internal/quic/conn_loss.go
+++ b/internal/quic/conn_loss.go
@@ -44,7 +44,9 @@
 		case frameTypeCrypto:
 			start, end := sent.nextRange()
 			c.crypto[space].ackOrLoss(start, end, fate)
-		case frameTypeMaxStreamData,
+		case frameTypeResetStream,
+			frameTypeStopSending,
+			frameTypeMaxStreamData,
 			frameTypeStreamDataBlocked:
 			id := streamID(sent.nextInt())
 			s := c.streamForID(id)
diff --git a/internal/quic/conn_loss_test.go b/internal/quic/conn_loss_test.go
index d944515..dc0dc6c 100644
--- a/internal/quic/conn_loss_test.go
+++ b/internal/quic/conn_loss_test.go
@@ -75,7 +75,58 @@
 	})
 }
 
-func TestLostCRYPTOFrame(t *testing.T) {
+func TestLostResetStreamFrame(t *testing.T) {
+	// "Cancellation of stream transmission, as carried in a RESET_STREAM frame,
+	// is sent until acknowledged or until all stream data is acknowledged by the peer [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000.html#section-13.3-3.4
+	lostFrameTest(t, func(t *testing.T, pto bool) {
+		tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
+		tc.ignoreFrame(frameTypeAck)
+
+		s.Reset(1)
+		tc.wantFrame("reset stream",
+			packetType1RTT, debugFrameResetStream{
+				id:   s.id,
+				code: 1,
+			})
+
+		tc.triggerLossOrPTO(packetType1RTT, pto)
+		tc.wantFrame("resent RESET_STREAM frame",
+			packetType1RTT, debugFrameResetStream{
+				id:   s.id,
+				code: 1,
+			})
+	})
+}
+
+func TestLostStopSendingFrame(t *testing.T) {
+	// "[...] a request to cancel stream transmission, as encoded in a STOP_SENDING frame,
+	// is sent until the receiving part of the stream enters either a "Data Recvd" or
+	// "Reset Recvd" state [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000.html#section-13.3-3.5
+	//
+	// Technically, we can stop sending a STOP_SENDING frame if the peer sends
+	// us all the data for the stream or resets it. We don't bother tracking this,
+	// however, so we'll keep sending the frame until it is acked. This is harmless.
+	lostFrameTest(t, func(t *testing.T, pto bool) {
+		tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, permissiveTransportParameters)
+		tc.ignoreFrame(frameTypeAck)
+
+		s.CloseRead()
+		tc.wantFrame("stream is read-closed",
+			packetType1RTT, debugFrameStopSending{
+				id: s.id,
+			})
+
+		tc.triggerLossOrPTO(packetType1RTT, pto)
+		tc.wantFrame("resent STOP_SENDING frame",
+			packetType1RTT, debugFrameStopSending{
+				id: s.id,
+			})
+	})
+}
+
+func TestLostCryptoFrame(t *testing.T) {
 	// "Data sent in CRYPTO frames is retransmitted [...] until all data has been acknowledged."
 	// https://www.rfc-editor.org/rfc/rfc9000.html#section-13.3-3.1
 	lostFrameTest(t, func(t *testing.T, pto bool) {
@@ -176,7 +227,7 @@
 				off:  4,
 				data: data[4:8],
 			})
-		s.Close()
+		s.CloseWrite()
 		tc.wantFrame("send FIN",
 			packetType1RTT, debugFrameStream{
 				id:   s.id,
diff --git a/internal/quic/conn_recv.go b/internal/quic/conn_recv.go
index 00985b6..e0a91ab 100644
--- a/internal/quic/conn_recv.go
+++ b/internal/quic/conn_recv.go
@@ -161,12 +161,12 @@
 			if !frameOK(c, ptype, __01) {
 				return
 			}
-			_, _, _, n = consumeResetStreamFrame(payload)
+			n = c.handleResetStreamFrame(now, space, payload)
 		case frameTypeStopSending:
 			if !frameOK(c, ptype, __01) {
 				return
 			}
-			_, _, n = consumeStopSendingFrame(payload)
+			n = c.handleStopSendingFrame(now, space, payload)
 		case frameTypeCrypto:
 			if !frameOK(c, ptype, IH_1) {
 				return
@@ -291,6 +291,32 @@
 	return n
 }
 
+func (c *Conn) handleResetStreamFrame(now time.Time, space numberSpace, payload []byte) int {
+	id, code, finalSize, n := consumeResetStreamFrame(payload)
+	if n < 0 {
+		return -1
+	}
+	if s := c.streamForFrame(now, id, recvStream); s != nil {
+		if err := s.handleReset(code, finalSize); err != nil {
+			c.abort(now, err)
+		}
+	}
+	return n
+}
+
+func (c *Conn) handleStopSendingFrame(now time.Time, space numberSpace, payload []byte) int {
+	id, code, n := consumeStopSendingFrame(payload)
+	if n < 0 {
+		return -1
+	}
+	if s := c.streamForFrame(now, id, sendStream); s != nil {
+		if err := s.handleStopSending(code); err != nil {
+			c.abort(now, err)
+		}
+	}
+	return n
+}
+
 func (c *Conn) handleCryptoFrame(now time.Time, space numberSpace, payload []byte) int {
 	off, data, n := consumeCryptoFrame(payload)
 	err := c.handleCrypto(now, space, off, data)
diff --git a/internal/quic/errors.go b/internal/quic/errors.go
index 55d32f3..f156859 100644
--- a/internal/quic/errors.go
+++ b/internal/quic/errors.go
@@ -99,6 +99,14 @@
 	return fmt.Sprintf("peer closed connection: %v: %q", e.code, e.reason)
 }
 
+// A StreamErrorCode is an application protocol error code (RFC 9000, Section 20.2)
+// indicating whay a stream is being closed.
+type StreamErrorCode uint64
+
+func (e StreamErrorCode) Error() string {
+	return fmt.Sprintf("stream error code %v", uint64(e))
+}
+
 // An ApplicationError is an application protocol error code (RFC 9000, Section 20.2).
 // Application protocol errors may be sent when terminating a stream or connection.
 type ApplicationError struct {
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 83215df..12117db 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -9,6 +9,7 @@
 import (
 	"context"
 	"errors"
+	"fmt"
 	"io"
 )
 
@@ -20,28 +21,33 @@
 	//
 	// The gate condition is set if a read from the stream will not block,
 	// either because the stream has available data or because the read will fail.
-	ingate    gate
-	in        pipe            // received data
-	inwin     int64           // last MAX_STREAM_DATA sent to the peer
-	insendmax sentVal         // set when we should send MAX_STREAM_DATA to the peer
-	inmaxbuf  int64           // maximum amount of data we will buffer
-	insize    int64           // stream final size; -1 before this is known
-	inset     rangeset[int64] // received ranges
+	ingate      gate
+	in          pipe            // received data
+	inwin       int64           // last MAX_STREAM_DATA sent to the peer
+	insendmax   sentVal         // set when we should send MAX_STREAM_DATA to the peer
+	inmaxbuf    int64           // maximum amount of data we will buffer
+	insize      int64           // stream final size; -1 before this is known
+	inset       rangeset[int64] // received ranges
+	inclosed    sentVal         // set by CloseRead
+	inresetcode int64           // RESET_STREAM code received from the peer; -1 if not reset
 
 	// outgate's lock guards all send-related state.
 	//
 	// The gate condition is set if a write to the stream will not block,
 	// either because the stream has available flow control or because
 	// the write will fail.
-	outgate    gate
-	out        pipe            // buffered data to send
-	outwin     int64           // maximum MAX_STREAM_DATA received from the peer
-	outmaxbuf  int64           // maximum amount of data we will buffer
-	outunsent  rangeset[int64] // ranges buffered but not yet sent
-	outacked   rangeset[int64] // ranges sent and acknowledged
-	outopened  sentVal         // set if we should open the stream
-	outclosed  sentVal         // set by CloseWrite
-	outblocked sentVal         // set when a write to the stream is blocked by flow control
+	outgate      gate
+	out          pipe            // buffered data to send
+	outwin       int64           // maximum MAX_STREAM_DATA received from the peer
+	outmaxbuf    int64           // maximum amount of data we will buffer
+	outunsent    rangeset[int64] // ranges buffered but not yet sent
+	outacked     rangeset[int64] // ranges sent and acknowledged
+	outopened    sentVal         // set if we should open the stream
+	outclosed    sentVal         // set by CloseWrite
+	outblocked   sentVal         // set when a write to the stream is blocked by flow control
+	outreset     sentVal         // set by Reset
+	outresetcode uint64          // reset code to send in RESET_STREAM
+	outdone      chan struct{}   // closed when all data sent
 
 	prev, next *Stream // guarded by streamsState.sendMu
 }
@@ -54,11 +60,13 @@
 // unlocking outgate will set the stream writability state.)
 func newStream(c *Conn, id streamID) *Stream {
 	s := &Stream{
-		conn:    c,
-		id:      id,
-		insize:  -1, // -1 indicates the stream size is unknown
-		ingate:  newLockedGate(),
-		outgate: newLockedGate(),
+		conn:        c,
+		id:          id,
+		insize:      -1, // -1 indicates the stream size is unknown
+		inresetcode: -1, // -1 indicates no RESET_STREAM received
+		ingate:      newLockedGate(),
+		outgate:     newLockedGate(),
+		outdone:     make(chan struct{}),
 	}
 	return s
 }
@@ -87,7 +95,8 @@
 //
 // If the peer closes the stream cleanly, ReadContext returns io.EOF after
 // returning all data sent by the peer.
-// If the peer terminates reads abruptly, ReadContext returns StreamResetError.
+// If the peer aborts reads on the stream, ReadContext returns
+// an error wrapping StreamResetCode.
 func (s *Stream) ReadContext(ctx context.Context, b []byte) (n int, err error) {
 	if s.IsWriteOnly() {
 		return 0, errors.New("read from write-only stream")
@@ -97,6 +106,12 @@
 		return 0, err
 	}
 	defer s.inUnlock()
+	if s.inresetcode != -1 {
+		return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
+	}
+	if s.inclosed.isSet() {
+		return 0, errors.New("read from closed stream")
+	}
 	if s.insize == s.in.start {
 		return 0, io.EOF
 	}
@@ -145,26 +160,17 @@
 // Buffered data is only sent when the buffer is sufficiently full.
 // Call the Flush method to ensure buffered data is sent.
 //
-// If the peer aborts reads on the stream, ReadContext returns StreamResetError.
+// TODO: Implement Flush.
 func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) {
 	if s.IsReadOnly() {
 		return 0, errors.New("write to read-only stream")
 	}
 	canWrite := s.outgate.lock()
-	if s.outclosed.isSet() {
-		s.outUnlock()
-		return 0, errors.New("write to closed stream")
-	}
-	if len(b) == 0 {
-		// We aren't writing any data, but send a STREAM frame to open the stream
-		// if we haven't done so already.
-		s.outopened.set()
-	}
-	for len(b) > 0 {
+	for {
 		// The first time through this loop, we may or may not be write blocked.
 		// We exit the loop after writing all data, so on subsequent passes through
 		// the loop we are always write blocked.
-		if !canWrite {
+		if len(b) > 0 && !canWrite {
 			// We're blocked, either by flow control or by our own buffer limit.
 			// We either need the peer to extend our flow control window,
 			// or ack some of our outstanding packets.
@@ -181,6 +187,21 @@
 			// write blocked. (Unlike traditional condition variables, gates do not
 			// have spurious wakeups.)
 		}
+		if s.outreset.isSet() {
+			s.outUnlock()
+			return n, errors.New("write to reset stream")
+		}
+		if s.outclosed.isSet() {
+			s.outUnlock()
+			return n, errors.New("write to closed stream")
+		}
+		// We set outopened here rather than below,
+		// so if this is a zero-length write we still
+		// open the stream despite not writing any data to it.
+		s.outopened.set()
+		if len(b) == 0 {
+			break
+		}
 		s.outblocked.clear()
 		// Write limit is min(our own buffer limit, the peer-provided flow control window).
 		// This is a stream offset.
@@ -191,7 +212,6 @@
 		// Copy the data into the output buffer and mark it as unsent.
 		s.outunsent.add(s.out.end, s.out.end+nn)
 		s.out.writeAt(b[:nn], s.out.end)
-		s.outopened.set()
 		b = b[nn:]
 		n += int(nn)
 		// If we have bytes left to send, we're blocked.
@@ -218,9 +238,8 @@
 func (s *Stream) CloseContext(ctx context.Context) error {
 	s.CloseRead()
 	s.CloseWrite()
-	// TODO: wait for peer to acknowledge data
 	// TODO: Return code from peer's RESET_STREAM frame?
-	return nil
+	return s.conn.waitOnDone(ctx, s.outdone)
 }
 
 // CloseRead aborts reads on the stream.
@@ -233,7 +252,17 @@
 	if s.IsWriteOnly() {
 		return
 	}
-	// TODO: support read-closing streams with a STOP_SENDING frame
+	s.ingate.lock()
+	defer s.inUnlock()
+	if s.inset.isrange(0, s.insize) || s.inresetcode != -1 {
+		// We've already received all data from the peer,
+		// so there's no need to send STOP_SENDING.
+		// This is the same as saying we sent one and they got it.
+		s.inclosed.setReceived()
+	} else {
+		s.inclosed.set()
+	}
+	s.in.discardBefore(s.in.end)
 }
 
 // CloseWrite aborts writes on the stream.
@@ -251,6 +280,29 @@
 	s.outclosed.set()
 }
 
+// Reset aborts writes on the stream and notifies the peer
+// that the stream was terminated abruptly.
+// Any blocked writes will be unblocked and return errors.
+//
+// Reset sends the application protocol error code to the peer.
+// It does not wait for the peer to acknowledge receipt of the error.
+// Use CloseContext to wait for the peer's acknowledgement.
+func (s *Stream) Reset(code uint64) {
+	s.outgate.lock()
+	defer s.outUnlock()
+	if s.outreset.isSet() {
+		return
+	}
+	// We could check here to see if the stream is closed and the
+	// peer has acked all the data and the FIN, but sending an
+	// extra RESET_STREAM in this case is harmless.
+	s.outreset.set()
+	s.outresetcode = code
+	s.out.discardBefore(s.out.end)
+	s.outunsent = rangeset[int64]{}
+	s.outblocked.clear()
+}
+
 // inUnlock unlocks s.ingate.
 // It sets the gate condition if reads from s will not block.
 // If s has receive-related frames to write, it notifies the Conn.
@@ -263,11 +315,13 @@
 // inUnlockNoQueue is inUnlock,
 // but reports whether s has frames to write rather than notifying the Conn.
 func (s *Stream) inUnlockNoQueue() (shouldSend bool) {
-	// TODO: STOP_SENDING
 	canRead := s.inset.contains(s.in.start) || // data available to read
-		s.insize == s.in.start // at EOF
-	s.ingate.unlock(canRead)
-	return s.insendmax.shouldSend() // STREAM_MAX_DATA
+		s.insize == s.in.start || // at EOF
+		s.inresetcode != -1 || // reset by peer
+		s.inclosed.isSet() // closed locally
+	defer s.ingate.unlock(canRead)
+	return s.insendmax.shouldSend() || // STREAM_MAX_DATA
+		s.inclosed.shouldSend() // STOP_SENDING
 }
 
 // outUnlock unlocks s.outgate.
@@ -282,10 +336,24 @@
 // outUnlockNoQueue is outUnlock,
 // but reports whether s has frames to write rather than notifying the Conn.
 func (s *Stream) outUnlockNoQueue() (shouldSend bool) {
+	isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) || // all data acked
+		s.outreset.isSet() // reset locally
+	if isDone {
+		select {
+		case <-s.outdone:
+		default:
+			close(s.outdone)
+		}
+	}
 	lim := min(s.out.start+s.outmaxbuf, s.outwin)
 	canWrite := lim > s.out.end || // available flow control
-		s.outclosed.isSet() // closed
-	s.outgate.unlock(canWrite)
+		s.outclosed.isSet() || // closed locally
+		s.outreset.isSet() // reset locally
+	defer s.outgate.unlock(canWrite)
+	if s.outreset.isSet() {
+		// If the stream is reset locally, the only frame we'll send is RESET_STREAM.
+		return s.outreset.shouldSend()
+	}
 	return len(s.outunsent) > 0 || // STREAM frame with data
 		s.outclosed.shouldSend() || // STREAM frame with FIN bit
 		s.outopened.shouldSend() || // STREAM frame with no data
@@ -297,6 +365,44 @@
 	s.ingate.lock()
 	defer s.inUnlock()
 	end := off + int64(len(b))
+	if err := s.checkStreamBounds(end, fin); err != nil {
+		return err
+	}
+	if s.inclosed.isSet() || s.inresetcode != -1 {
+		// The user read-closed the stream, or the peer reset it.
+		// Either way, we can discard this frame.
+		return nil
+	}
+	s.in.writeAt(b, off)
+	s.inset.add(off, end)
+	if fin {
+		s.insize = end
+		// The peer has enough flow control window to send the entire stream.
+		s.insendmax.clear()
+	}
+	return nil
+}
+
+// handleReset handles a RESET_STREAM frame.
+func (s *Stream) handleReset(code uint64, finalSize int64) error {
+	s.ingate.lock()
+	defer s.inUnlock()
+	const fin = true
+	if err := s.checkStreamBounds(finalSize, fin); err != nil {
+		return err
+	}
+	if s.inresetcode != -1 {
+		// The stream was already reset.
+		return nil
+	}
+	s.in.discardBefore(s.in.end)
+	s.inresetcode = int64(code)
+	s.insize = finalSize
+	return nil
+}
+
+// checkStreamBounds validates the stream offset in a STREAM or RESET_STREAM frame.
+func (s *Stream) checkStreamBounds(end int64, fin bool) error {
 	if end > s.inwin {
 		// The peer sent us data past the maximum flow control window we gave them.
 		return localTransportError(errFlowControl)
@@ -305,17 +411,22 @@
 		// The peer sent us data past the final size of the stream they previously gave us.
 		return localTransportError(errFinalSize)
 	}
-	s.in.writeAt(b, off)
-	s.inset.add(off, end)
-	if fin {
-		if s.insize != -1 && s.insize != end {
-			// The peer changed the final size of the stream.
-			return localTransportError(errFinalSize)
-		}
-		s.insize = end
-		// The peer has enough flow control window to send the entire stream.
-		s.insendmax.clear()
+	if fin && s.insize != -1 && end != s.insize {
+		// The peer changed the final size of the stream.
+		return localTransportError(errFinalSize)
 	}
+	if fin && end < s.in.end {
+		// The peer has previously sent us data past the final size.
+		return localTransportError(errFinalSize)
+	}
+	return nil
+}
+
+// handleStopSending handles a STOP_SENDING frame.
+func (s *Stream) handleStopSending(code uint64) error {
+	// Peer requests that we reset this stream.
+	// https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4
+	s.Reset(code)
 	return nil
 }
 
@@ -336,6 +447,14 @@
 	// Frames which are always the same (STOP_SENDING, RESET_STREAM)
 	// can be marked as received if any packet carrying this frame is acked.
 	switch ftype {
+	case frameTypeResetStream:
+		s.outgate.lock()
+		s.outreset.ackOrLoss(pnum, fate)
+		s.outUnlock()
+	case frameTypeStopSending:
+		s.ingate.lock()
+		s.inclosed.ackOrLoss(pnum, fate)
+		s.inUnlock()
 	case frameTypeMaxStreamData:
 		s.ingate.lock()
 		s.insendmax.ackLatestOrLoss(pnum, fate)
@@ -345,7 +464,6 @@
 		s.outblocked.ackLatestOrLoss(pnum, fate)
 		s.outUnlock()
 	default:
-		// TODO: Handle STOP_SENDING, RESET_STREAM.
 		panic("unhandled frame type")
 	}
 }
@@ -358,6 +476,10 @@
 	if fin {
 		s.outclosed.ackOrLoss(pnum, fate)
 	}
+	if s.outreset.isSet() {
+		// If the stream has been reset, we don't care any more.
+		return
+	}
 	switch fate {
 	case packetAcked:
 		s.outacked.add(start, end)
@@ -385,6 +507,15 @@
 func (s *Stream) appendInFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
 	s.ingate.lock()
 	defer s.inUnlockNoQueue()
+	if s.inclosed.shouldSendPTO(pto) {
+		// We don't currently have an API for setting the error code.
+		// Just send zero.
+		code := uint64(0)
+		if !w.appendStopSendingFrame(s.id, code) {
+			return false
+		}
+		s.inclosed.setSent(pnum)
+	}
 	// TODO: STOP_SENDING
 	if s.insendmax.shouldSendPTO(pto) {
 		// MAX_STREAM_DATA
@@ -406,7 +537,17 @@
 func (s *Stream) appendOutFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
 	s.outgate.lock()
 	defer s.outUnlockNoQueue()
-	// TODO: RESET_STREAM
+	if s.outreset.isSet() {
+		// RESET_STREAM
+		if s.outreset.shouldSendPTO(pto) {
+			if !w.appendResetStreamFrame(s.id, s.outresetcode, s.out.end) {
+				return false
+			}
+			s.outreset.setSent(pnum)
+			s.frameOpensStream(pnum)
+		}
+		return true
+	}
 	if s.outblocked.shouldSendPTO(pto) {
 		// STREAM_DATA_BLOCKED
 		if !w.appendStreamDataBlockedFrame(s.id, s.out.end) {
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index d158e72..5904a93 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -10,6 +10,7 @@
 	"bytes"
 	"context"
 	"crypto/rand"
+	"errors"
 	"fmt"
 	"io"
 	"reflect"
@@ -489,32 +490,76 @@
 	})
 }
 
-func TestStreamFinalSizeChangedByStreamFrame(t *testing.T) {
-	// "If a [...] STREAM frame is received indicating a change
-	// in the final size for the stream, an endpoint SHOULD
-	// respond with an error of type FINAL_SIZE_ERROR [...]"
-	// https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
+func finalSizeTest(t *testing.T, wantErr transportError, f func(tc *testConn, sid streamID) (finalSize int64), opts ...any) {
 	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
-		tc := newTestConn(t, serverSide)
-		tc.handshake()
-		sid := newStreamID(clientSide, styp, 0)
+		for _, test := range []struct {
+			name       string
+			finalFrame func(tc *testConn, sid streamID, finalSize int64)
+		}{{
+			name: "FIN",
+			finalFrame: func(tc *testConn, sid streamID, finalSize int64) {
+				tc.writeFrames(packetType1RTT, debugFrameStream{
+					id:  sid,
+					off: finalSize,
+					fin: true,
+				})
+			},
+		}, {
+			name: "RESET_STREAM",
+			finalFrame: func(tc *testConn, sid streamID, finalSize int64) {
+				tc.writeFrames(packetType1RTT, debugFrameResetStream{
+					id:        sid,
+					finalSize: finalSize,
+				})
+			},
+		}} {
+			t.Run(test.name, func(t *testing.T) {
+				tc := newTestConn(t, serverSide, opts...)
+				tc.handshake()
+				sid := newStreamID(clientSide, styp, 0)
+				finalSize := f(tc, sid)
+				test.finalFrame(tc, sid, finalSize)
+				tc.wantFrame("change in final size of stream is an error",
+					packetType1RTT, debugFrameConnectionCloseTransport{
+						code: wantErr,
+					},
+				)
+			})
+		}
+	})
+}
 
-		const write1size = 4
+func TestStreamFinalSizeChangedAfterFin(t *testing.T) {
+	// "If a RESET_STREAM or STREAM frame is received indicating a change
+	// in the final size for the stream, an endpoint SHOULD respond with
+	// an error of type FINAL_SIZE_ERROR [...]"
+	// https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
+	finalSizeTest(t, errFinalSize, func(tc *testConn, sid streamID) (finalSize int64) {
 		tc.writeFrames(packetType1RTT, debugFrameStream{
 			id:  sid,
 			off: 10,
 			fin: true,
 		})
+		return 9
+	})
+}
+
+func TestStreamFinalSizeBeforePreviousData(t *testing.T) {
+	finalSizeTest(t, errFinalSize, func(tc *testConn, sid streamID) (finalSize int64) {
 		tc.writeFrames(packetType1RTT, debugFrameStream{
-			id:  sid,
-			off: 9,
-			fin: true,
+			id:   sid,
+			off:  10,
+			data: []byte{0},
 		})
-		tc.wantFrame("change in final size of stream is an error",
-			packetType1RTT, debugFrameConnectionCloseTransport{
-				code: errFinalSize,
-			},
-		)
+		return 9
+	})
+}
+
+func TestStreamFinalSizePastMaxStreamData(t *testing.T) {
+	finalSizeTest(t, errFlowControl, func(tc *testConn, sid streamID) (finalSize int64) {
+		return 11
+	}, func(c *Config) {
+		c.StreamReadBufferSize = 10
 	})
 }
 
@@ -637,6 +682,19 @@
 	})
 }
 
+func TestStreamResetStreamInvalidState(t *testing.T) {
+	// "An endpoint that receives a RESET_STREAM frame for a send-only
+	// stream MUST terminate the connection with error STREAM_STATE_ERROR."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-19.4-3
+	testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
+		return debugFrameResetStream{
+			id:        sid,
+			code:      0,
+			finalSize: 0,
+		}
+	})
+}
+
 func TestStreamStreamFrameInvalidState(t *testing.T) {
 	// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
 	// if it receives a STREAM frame for a locally initiated stream
@@ -689,6 +747,20 @@
 	})
 }
 
+func TestStreamStopSendingInvalidState(t *testing.T) {
+	// "Receiving a STOP_SENDING frame for a locally initiated stream
+	// that has not yet been created MUST be treated as a connection error
+	// of type STREAM_STATE_ERROR. An endpoint that receives a STOP_SENDING
+	// frame for a receive-only stream MUST terminate the connection with
+	// error STREAM_STATE_ERROR."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-19.5-2
+	testStreamReceiveFrameInvalidState(t, func(sid streamID) debugFrame {
+		return debugFrameStopSending{
+			id: sid,
+		}
+	})
+}
+
 func TestStreamMaxStreamDataInvalidState(t *testing.T) {
 	// "Receiving a MAX_STREAM_DATA frame for a locally initiated stream
 	// that has not yet been created MUST be treated as a connection error
@@ -743,13 +815,47 @@
 	}
 }
 
-func TestStreamWriteToClosedStream(t *testing.T) {
-	tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, func(p *transportParameters) {
-		p.initialMaxStreamsBidi = 1
-		p.initialMaxData = 1 << 20
-		p.initialMaxStreamDataBidiRemote = 1 << 20
+func TestStreamReadFromClosedStream(t *testing.T) {
+	tc, s := newTestConnAndRemoteStream(t, serverSide, bidiStream, permissiveTransportParameters)
+	s.CloseRead()
+	tc.wantFrame("CloseRead sends a STOP_SENDING frame",
+		packetType1RTT, debugFrameStopSending{
+			id: s.id,
+		})
+	wantErr := "read from closed stream"
+	if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
+		t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
+	}
+	// Data which shows up after STOP_SENDING is discarded.
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   s.id,
+		data: []byte{1, 2, 3},
+		fin:  true,
 	})
-	s.Close()
+	if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
+		t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
+	}
+}
+
+func TestStreamCloseReadWithAllDataReceived(t *testing.T) {
+	tc, s := newTestConnAndRemoteStream(t, serverSide, bidiStream, permissiveTransportParameters)
+	tc.writeFrames(packetType1RTT, debugFrameStream{
+		id:   s.id,
+		data: []byte{1, 2, 3},
+		fin:  true,
+	})
+	s.CloseRead()
+	tc.wantIdle("CloseRead in Data Recvd state doesn't need to send STOP_SENDING")
+	// We had all the data for the stream, but CloseRead discarded it.
+	wantErr := "read from closed stream"
+	if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
+		t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
+	}
+}
+
+func TestStreamWriteToClosedStream(t *testing.T) {
+	tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, permissiveTransportParameters)
+	s.CloseWrite()
 	tc.wantFrame("stream is opened after being closed",
 		packetType1RTT, debugFrameStream{
 			id:   s.id,
@@ -763,6 +869,45 @@
 	}
 }
 
+func TestStreamResetBlockedStream(t *testing.T) {
+	tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, func(p *transportParameters) {
+		p.initialMaxStreamsBidi = 1
+		p.initialMaxData = 1 << 20
+		p.initialMaxStreamDataBidiRemote = 4
+	})
+	tc.ignoreFrame(frameTypeStreamDataBlocked)
+	writing := runAsync(tc, func(ctx context.Context) (int, error) {
+		return s.WriteContext(ctx, []byte{0, 1, 2, 3, 4, 5, 6, 7})
+	})
+	tc.wantFrame("stream writes data until blocked by flow control",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  0,
+			data: []byte{0, 1, 2, 3},
+		})
+	s.Reset(42)
+	tc.wantFrame("stream is reset",
+		packetType1RTT, debugFrameResetStream{
+			id:        s.id,
+			code:      42,
+			finalSize: 4,
+		})
+	wantErr := "write to reset stream"
+	if n, err := writing.result(); n != 4 || !strings.Contains(err.Error(), wantErr) {
+		t.Errorf("s.Write() interrupted by Reset: %v, %q; want 4, %q", n, err, wantErr)
+	}
+	tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+		id:  s.id,
+		max: 1 << 20,
+	})
+	tc.wantIdle("flow control is available, but stream has been reset")
+	s.Reset(100)
+	tc.wantIdle("resetting stream a second time has no effect")
+	if n, err := s.Write([]byte{}); err == nil || !strings.Contains(err.Error(), wantErr) {
+		t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
+	}
+}
+
 func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
 	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
 		p.initialMaxStreamsUni = 1
@@ -797,6 +942,209 @@
 	}
 }
 
+func TestStreamCloseWaitsForAcks(t *testing.T) {
+	ctx := canceledContext()
+	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
+	data := make([]byte, 100)
+	s.WriteContext(ctx, data)
+	tc.wantFrame("conn sends data for the stream",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			data: data,
+		})
+	if err := s.CloseContext(ctx); err != context.Canceled {
+		t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
+	}
+	tc.wantFrame("conn sends FIN for closed stream",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  int64(len(data)),
+			fin:  true,
+			data: []byte{},
+		})
+	closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
+		return struct{}{}, s.CloseContext(ctx)
+	})
+	if _, err := closing.result(); err != errNotDone {
+		t.Fatalf("s.CloseContext() = %v, want it to block waiting for acks", err)
+	}
+	tc.writeFrames(packetType1RTT, debugFrameAck{
+		ranges: []i64range[packetNumber]{{0, tc.sentFramePacket.num + 1}},
+	})
+	if _, err := closing.result(); err != nil {
+		t.Fatalf("s.CloseContext() = %v, want nil (all data acked)", err)
+	}
+}
+
+func TestStreamCloseUnblocked(t *testing.T) {
+	for _, test := range []struct {
+		name    string
+		unblock func(tc *testConn, s *Stream)
+	}{{
+		name: "data received",
+		unblock: func(tc *testConn, s *Stream) {
+			tc.writeFrames(packetType1RTT, debugFrameAck{
+				ranges: []i64range[packetNumber]{{0, tc.sentFramePacket.num + 1}},
+			})
+		},
+	}, {
+		name: "stop sending received",
+		unblock: func(tc *testConn, s *Stream) {
+			tc.writeFrames(packetType1RTT, debugFrameStopSending{
+				id: s.id,
+			})
+		},
+	}, {
+		name: "stream reset",
+		unblock: func(tc *testConn, s *Stream) {
+			s.Reset(0)
+			tc.wait() // wait for test conn to process the Reset
+		},
+	}} {
+		t.Run(test.name, func(t *testing.T) {
+			ctx := canceledContext()
+			tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
+			data := make([]byte, 100)
+			s.WriteContext(ctx, data)
+			tc.wantFrame("conn sends data for the stream",
+				packetType1RTT, debugFrameStream{
+					id:   s.id,
+					data: data,
+				})
+			if err := s.CloseContext(ctx); err != context.Canceled {
+				t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
+			}
+			tc.wantFrame("conn sends FIN for closed stream",
+				packetType1RTT, debugFrameStream{
+					id:   s.id,
+					off:  int64(len(data)),
+					fin:  true,
+					data: []byte{},
+				})
+			closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
+				return struct{}{}, s.CloseContext(ctx)
+			})
+			if _, err := closing.result(); err != errNotDone {
+				t.Fatalf("s.CloseContext() = %v, want it to block waiting for acks", err)
+			}
+			test.unblock(tc, s)
+			if _, err := closing.result(); err != nil {
+				t.Fatalf("s.CloseContext() = %v, want nil (all data acked)", err)
+			}
+		})
+	}
+}
+
+func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		ctx := canceledContext()
+		tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
+		data := []byte{0, 1, 2, 3, 4, 5, 6, 7}
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   s.id,
+			data: data,
+		})
+		got := make([]byte, 4)
+		if n, err := s.ReadContext(ctx, got); n != len(got) || err != nil {
+			t.Fatalf("Read start of stream: got %v, %v; want %v, nil", n, err, len(got))
+		}
+		const sentCode = 42
+		tc.writeFrames(packetType1RTT, debugFrameResetStream{
+			id:        s.id,
+			finalSize: 20,
+			code:      sentCode,
+		})
+		wantErr := StreamErrorCode(sentCode)
+		if n, err := s.ReadContext(ctx, got); n != 0 || !errors.Is(err, wantErr) {
+			t.Fatalf("Read reset stream: got %v, %v; want 0, %v", n, err, wantErr)
+		}
+	})
+}
+
+func TestStreamPeerResetWakesBlockedRead(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
+		reader := runAsync(tc, func(ctx context.Context) (int, error) {
+			got := make([]byte, 4)
+			return s.ReadContext(ctx, got)
+		})
+		const sentCode = 42
+		tc.writeFrames(packetType1RTT, debugFrameResetStream{
+			id:        s.id,
+			finalSize: 20,
+			code:      sentCode,
+		})
+		wantErr := StreamErrorCode(sentCode)
+		if n, err := reader.result(); n != 0 || !errors.Is(err, wantErr) {
+			t.Fatalf("Read reset stream: got %v, %v; want 0, %v", n, err, wantErr)
+		}
+	})
+}
+
+func TestStreamPeerResetFollowedByData(t *testing.T) {
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
+		tc.writeFrames(packetType1RTT, debugFrameResetStream{
+			id:        s.id,
+			finalSize: 4,
+			code:      1,
+		})
+		tc.writeFrames(packetType1RTT, debugFrameStream{
+			id:   s.id,
+			data: []byte{0, 1, 2, 3},
+		})
+		// Another reset with a different code, for good measure.
+		tc.writeFrames(packetType1RTT, debugFrameResetStream{
+			id:        s.id,
+			finalSize: 4,
+			code:      2,
+		})
+		wantErr := StreamErrorCode(1)
+		if n, err := s.Read(make([]byte, 16)); n != 0 || !errors.Is(err, wantErr) {
+			t.Fatalf("Read from reset stream: got %v, %v; want 0, %v", n, err, wantErr)
+		}
+	})
+}
+
+func TestStreamPeerStopSendingForActiveStream(t *testing.T) {
+	// "An endpoint that receives a STOP_SENDING frame MUST send a RESET_STREAM frame if
+	// the stream is in the "Ready" or "Send" state."
+	// https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4
+	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+		tc, s := newTestConnAndLocalStream(t, serverSide, styp, permissiveTransportParameters)
+		for i := 0; i < 4; i++ {
+			s.Write([]byte{byte(i)})
+			tc.wantFrame("write sends a STREAM frame to peer",
+				packetType1RTT, debugFrameStream{
+					id:   s.id,
+					off:  int64(i),
+					data: []byte{byte(i)},
+				})
+		}
+		tc.writeFrames(packetType1RTT, debugFrameStopSending{
+			id:   s.id,
+			code: 42,
+		})
+		tc.wantFrame("receiving STOP_SENDING causes stream reset",
+			packetType1RTT, debugFrameResetStream{
+				id:        s.id,
+				code:      42,
+				finalSize: 4,
+			})
+		if n, err := s.Write([]byte{0}); err == nil {
+			t.Errorf("s.Write() after STOP_SENDING = %v, %v; want error", n, err)
+		}
+		// This ack will result in some of the previous frames being marked as lost.
+		tc.writeFrames(packetType1RTT, debugFrameAck{
+			ranges: []i64range[packetNumber]{{
+				tc.sentFramePacket.num,
+				tc.sentFramePacket.num + 1,
+			}},
+		})
+		tc.wantIdle("lost STREAM frames for reset stream are not resent")
+	})
+}
+
 func newTestConnAndLocalStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
 	t.Helper()
 	ctx := canceledContext()
@@ -825,3 +1173,13 @@
 	}
 	return tc, s
 }
+
+// permissiveTransportParameters may be passed as an option to newTestConn.
+func permissiveTransportParameters(p *transportParameters) {
+	p.initialMaxStreamsBidi = maxVarint
+	p.initialMaxStreamsUni = maxVarint
+	p.initialMaxData = maxVarint
+	p.initialMaxStreamDataBidiRemote = maxVarint
+	p.initialMaxStreamDataBidiLocal = maxVarint
+	p.initialMaxStreamDataUni = maxVarint
+}
diff --git a/internal/quic/wire.go b/internal/quic/wire.go
index f0643c9..8486029 100644
--- a/internal/quic/wire.go
+++ b/internal/quic/wire.go
@@ -8,7 +8,10 @@
 
 import "encoding/binary"
 
-const maxVarintSize = 8
+const (
+	maxVarintSize = 8 // encoded size in bytes
+	maxVarint     = (1 << 62) - 1
+)
 
 // consumeVarint parses a variable-length integer, reporting its length.
 // It returns a negative length upon an error.