quic: read-closing and reset streams, wait on close
s.Close waits for the peer to acknowledge receipt of sent
data before returning.
s.ReadClose closes the receive end of a stream, discarding
buffered data and sending a STOP_SENDING frame to the peer.
s.Reset(code) closes the send end of a stream with an error,
which is sent to the peer in a RESET_STREAM frame.
Receipt of a STOP_SENDING frame resets the stream locally
and causes future writes to fail.
Receipt of a RESET_STREAM frame causes future reads to fail.
Stream state is currently retained even after a stream
has been completely closed. A future CL will add cleanup.
For golang/go#58547
Change-Id: I29088ae570db4079926ad426be6e85dace2122da
Reviewed-on: https://go-review.googlesource.com/c/net/+/518435
Run-TryBot: Damien Neil <dneil@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/quic/conn.go b/internal/quic/conn.go
index 0952a79..ee8f011 100644
--- a/internal/quic/conn.go
+++ b/internal/quic/conn.go
@@ -73,6 +73,7 @@
handleTLSEvent(tls.QUICEvent)
newConnID(seq int64) ([]byte, error)
waitAndLockGate(ctx context.Context, g *gate) error
+ waitOnDone(ctx context.Context, ch <-chan struct{}) error
}
func newConn(now time.Time, side connSide, initialConnID []byte, peerAddr netip.AddrPort, config *Config, l connListener, hooks connTestHooks) (*Conn, error) {
@@ -311,6 +312,26 @@
return g.waitAndLockContext(ctx)
}
+func (c *Conn) waitOnDone(ctx context.Context, ch <-chan struct{}) error {
+ if c.testHooks != nil {
+ return c.testHooks.waitOnDone(ctx, ch)
+ }
+ // Check the channel before the context.
+ // We always prefer to return results when available,
+ // even when provided with an already-canceled context.
+ select {
+ case <-ch:
+ return nil
+ default:
+ }
+ select {
+ case <-ch:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ return nil
+}
+
// abort terminates a connection with an error.
func (c *Conn) abort(now time.Time, err error) {
if c.errForPeer == nil {
diff --git a/internal/quic/conn_async_test.go b/internal/quic/conn_async_test.go
index 2078325..0da3ddb 100644
--- a/internal/quic/conn_async_test.go
+++ b/internal/quic/conn_async_test.go
@@ -82,10 +82,11 @@
}
// A blockedAsync is a blocked async operation.
-//
-// Currently, the only type of blocked operation is one waiting on a gate.
type blockedAsync struct {
- g *gate
+ // Exactly one of these will be set, depending on the type of blocked operation.
+ g *gate
+ ch <-chan struct{}
+
donec chan struct{} // closed when the operation is unblocked
}
@@ -133,6 +134,25 @@
// Gate can be acquired without blocking.
return nil
}
+ return as.block(ctx, &blockedAsync{
+ g: g,
+ })
+}
+
+// waitOnDone replaces receiving from a chan struct{} in tests.
+func (as *asyncTestState) waitOnDone(ctx context.Context, ch <-chan struct{}) error {
+ select {
+ case <-ch:
+ return nil // read without blocking
+ default:
+ }
+ return as.block(ctx, &blockedAsync{
+ ch: ch,
+ })
+}
+
+// block waits for a blocked async operation to complete.
+func (as *asyncTestState) block(ctx context.Context, b *blockedAsync) error {
if err := ctx.Err(); err != nil {
// Context has already expired.
return err
@@ -144,12 +164,9 @@
// which may have unpredictable results.
panic("blocking async point with unexpected Context")
}
+ b.donec = make(chan struct{})
// Record this as a pending blocking operation.
as.mu.Lock()
- b := &blockedAsync{
- g: g,
- donec: make(chan struct{}),
- }
as.blocked[b] = struct{}{}
as.mu.Unlock()
// Notify the creator of the operation that we're blocked,
@@ -169,8 +186,19 @@
as.mu.Lock()
var woken *blockedAsync
for w := range as.blocked {
- if w.g.lockIfSet() {
- woken = w
+ switch {
+ case w.g != nil:
+ if w.g.lockIfSet() {
+ woken = w
+ }
+ case w.ch != nil:
+ select {
+ case <-w.ch:
+ woken = w
+ default:
+ }
+ }
+ if woken != nil {
delete(as.blocked, woken)
break
}
diff --git a/internal/quic/conn_loss.go b/internal/quic/conn_loss.go
index f42f7e5..103db9f 100644
--- a/internal/quic/conn_loss.go
+++ b/internal/quic/conn_loss.go
@@ -44,7 +44,9 @@
case frameTypeCrypto:
start, end := sent.nextRange()
c.crypto[space].ackOrLoss(start, end, fate)
- case frameTypeMaxStreamData,
+ case frameTypeResetStream,
+ frameTypeStopSending,
+ frameTypeMaxStreamData,
frameTypeStreamDataBlocked:
id := streamID(sent.nextInt())
s := c.streamForID(id)
diff --git a/internal/quic/conn_loss_test.go b/internal/quic/conn_loss_test.go
index d944515..dc0dc6c 100644
--- a/internal/quic/conn_loss_test.go
+++ b/internal/quic/conn_loss_test.go
@@ -75,7 +75,58 @@
})
}
-func TestLostCRYPTOFrame(t *testing.T) {
+func TestLostResetStreamFrame(t *testing.T) {
+ // "Cancellation of stream transmission, as carried in a RESET_STREAM frame,
+ // is sent until acknowledged or until all stream data is acknowledged by the peer [...]"
+ // https://www.rfc-editor.org/rfc/rfc9000.html#section-13.3-3.4
+ lostFrameTest(t, func(t *testing.T, pto bool) {
+ tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
+ tc.ignoreFrame(frameTypeAck)
+
+ s.Reset(1)
+ tc.wantFrame("reset stream",
+ packetType1RTT, debugFrameResetStream{
+ id: s.id,
+ code: 1,
+ })
+
+ tc.triggerLossOrPTO(packetType1RTT, pto)
+ tc.wantFrame("resent RESET_STREAM frame",
+ packetType1RTT, debugFrameResetStream{
+ id: s.id,
+ code: 1,
+ })
+ })
+}
+
+func TestLostStopSendingFrame(t *testing.T) {
+ // "[...] a request to cancel stream transmission, as encoded in a STOP_SENDING frame,
+ // is sent until the receiving part of the stream enters either a "Data Recvd" or
+ // "Reset Recvd" state [...]"
+ // https://www.rfc-editor.org/rfc/rfc9000.html#section-13.3-3.5
+ //
+ // Technically, we can stop sending a STOP_SENDING frame if the peer sends
+ // us all the data for the stream or resets it. We don't bother tracking this,
+ // however, so we'll keep sending the frame until it is acked. This is harmless.
+ lostFrameTest(t, func(t *testing.T, pto bool) {
+ tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, permissiveTransportParameters)
+ tc.ignoreFrame(frameTypeAck)
+
+ s.CloseRead()
+ tc.wantFrame("stream is read-closed",
+ packetType1RTT, debugFrameStopSending{
+ id: s.id,
+ })
+
+ tc.triggerLossOrPTO(packetType1RTT, pto)
+ tc.wantFrame("resent STOP_SENDING frame",
+ packetType1RTT, debugFrameStopSending{
+ id: s.id,
+ })
+ })
+}
+
+func TestLostCryptoFrame(t *testing.T) {
// "Data sent in CRYPTO frames is retransmitted [...] until all data has been acknowledged."
// https://www.rfc-editor.org/rfc/rfc9000.html#section-13.3-3.1
lostFrameTest(t, func(t *testing.T, pto bool) {
@@ -176,7 +227,7 @@
off: 4,
data: data[4:8],
})
- s.Close()
+ s.CloseWrite()
tc.wantFrame("send FIN",
packetType1RTT, debugFrameStream{
id: s.id,
diff --git a/internal/quic/conn_recv.go b/internal/quic/conn_recv.go
index 00985b6..e0a91ab 100644
--- a/internal/quic/conn_recv.go
+++ b/internal/quic/conn_recv.go
@@ -161,12 +161,12 @@
if !frameOK(c, ptype, __01) {
return
}
- _, _, _, n = consumeResetStreamFrame(payload)
+ n = c.handleResetStreamFrame(now, space, payload)
case frameTypeStopSending:
if !frameOK(c, ptype, __01) {
return
}
- _, _, n = consumeStopSendingFrame(payload)
+ n = c.handleStopSendingFrame(now, space, payload)
case frameTypeCrypto:
if !frameOK(c, ptype, IH_1) {
return
@@ -291,6 +291,32 @@
return n
}
+func (c *Conn) handleResetStreamFrame(now time.Time, space numberSpace, payload []byte) int {
+ id, code, finalSize, n := consumeResetStreamFrame(payload)
+ if n < 0 {
+ return -1
+ }
+ if s := c.streamForFrame(now, id, recvStream); s != nil {
+ if err := s.handleReset(code, finalSize); err != nil {
+ c.abort(now, err)
+ }
+ }
+ return n
+}
+
+func (c *Conn) handleStopSendingFrame(now time.Time, space numberSpace, payload []byte) int {
+ id, code, n := consumeStopSendingFrame(payload)
+ if n < 0 {
+ return -1
+ }
+ if s := c.streamForFrame(now, id, sendStream); s != nil {
+ if err := s.handleStopSending(code); err != nil {
+ c.abort(now, err)
+ }
+ }
+ return n
+}
+
func (c *Conn) handleCryptoFrame(now time.Time, space numberSpace, payload []byte) int {
off, data, n := consumeCryptoFrame(payload)
err := c.handleCrypto(now, space, off, data)
diff --git a/internal/quic/errors.go b/internal/quic/errors.go
index 55d32f3..f156859 100644
--- a/internal/quic/errors.go
+++ b/internal/quic/errors.go
@@ -99,6 +99,14 @@
return fmt.Sprintf("peer closed connection: %v: %q", e.code, e.reason)
}
+// A StreamErrorCode is an application protocol error code (RFC 9000, Section 20.2)
+// indicating whay a stream is being closed.
+type StreamErrorCode uint64
+
+func (e StreamErrorCode) Error() string {
+ return fmt.Sprintf("stream error code %v", uint64(e))
+}
+
// An ApplicationError is an application protocol error code (RFC 9000, Section 20.2).
// Application protocol errors may be sent when terminating a stream or connection.
type ApplicationError struct {
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 83215df..12117db 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -9,6 +9,7 @@
import (
"context"
"errors"
+ "fmt"
"io"
)
@@ -20,28 +21,33 @@
//
// The gate condition is set if a read from the stream will not block,
// either because the stream has available data or because the read will fail.
- ingate gate
- in pipe // received data
- inwin int64 // last MAX_STREAM_DATA sent to the peer
- insendmax sentVal // set when we should send MAX_STREAM_DATA to the peer
- inmaxbuf int64 // maximum amount of data we will buffer
- insize int64 // stream final size; -1 before this is known
- inset rangeset[int64] // received ranges
+ ingate gate
+ in pipe // received data
+ inwin int64 // last MAX_STREAM_DATA sent to the peer
+ insendmax sentVal // set when we should send MAX_STREAM_DATA to the peer
+ inmaxbuf int64 // maximum amount of data we will buffer
+ insize int64 // stream final size; -1 before this is known
+ inset rangeset[int64] // received ranges
+ inclosed sentVal // set by CloseRead
+ inresetcode int64 // RESET_STREAM code received from the peer; -1 if not reset
// outgate's lock guards all send-related state.
//
// The gate condition is set if a write to the stream will not block,
// either because the stream has available flow control or because
// the write will fail.
- outgate gate
- out pipe // buffered data to send
- outwin int64 // maximum MAX_STREAM_DATA received from the peer
- outmaxbuf int64 // maximum amount of data we will buffer
- outunsent rangeset[int64] // ranges buffered but not yet sent
- outacked rangeset[int64] // ranges sent and acknowledged
- outopened sentVal // set if we should open the stream
- outclosed sentVal // set by CloseWrite
- outblocked sentVal // set when a write to the stream is blocked by flow control
+ outgate gate
+ out pipe // buffered data to send
+ outwin int64 // maximum MAX_STREAM_DATA received from the peer
+ outmaxbuf int64 // maximum amount of data we will buffer
+ outunsent rangeset[int64] // ranges buffered but not yet sent
+ outacked rangeset[int64] // ranges sent and acknowledged
+ outopened sentVal // set if we should open the stream
+ outclosed sentVal // set by CloseWrite
+ outblocked sentVal // set when a write to the stream is blocked by flow control
+ outreset sentVal // set by Reset
+ outresetcode uint64 // reset code to send in RESET_STREAM
+ outdone chan struct{} // closed when all data sent
prev, next *Stream // guarded by streamsState.sendMu
}
@@ -54,11 +60,13 @@
// unlocking outgate will set the stream writability state.)
func newStream(c *Conn, id streamID) *Stream {
s := &Stream{
- conn: c,
- id: id,
- insize: -1, // -1 indicates the stream size is unknown
- ingate: newLockedGate(),
- outgate: newLockedGate(),
+ conn: c,
+ id: id,
+ insize: -1, // -1 indicates the stream size is unknown
+ inresetcode: -1, // -1 indicates no RESET_STREAM received
+ ingate: newLockedGate(),
+ outgate: newLockedGate(),
+ outdone: make(chan struct{}),
}
return s
}
@@ -87,7 +95,8 @@
//
// If the peer closes the stream cleanly, ReadContext returns io.EOF after
// returning all data sent by the peer.
-// If the peer terminates reads abruptly, ReadContext returns StreamResetError.
+// If the peer aborts reads on the stream, ReadContext returns
+// an error wrapping StreamResetCode.
func (s *Stream) ReadContext(ctx context.Context, b []byte) (n int, err error) {
if s.IsWriteOnly() {
return 0, errors.New("read from write-only stream")
@@ -97,6 +106,12 @@
return 0, err
}
defer s.inUnlock()
+ if s.inresetcode != -1 {
+ return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
+ }
+ if s.inclosed.isSet() {
+ return 0, errors.New("read from closed stream")
+ }
if s.insize == s.in.start {
return 0, io.EOF
}
@@ -145,26 +160,17 @@
// Buffered data is only sent when the buffer is sufficiently full.
// Call the Flush method to ensure buffered data is sent.
//
-// If the peer aborts reads on the stream, ReadContext returns StreamResetError.
+// TODO: Implement Flush.
func (s *Stream) WriteContext(ctx context.Context, b []byte) (n int, err error) {
if s.IsReadOnly() {
return 0, errors.New("write to read-only stream")
}
canWrite := s.outgate.lock()
- if s.outclosed.isSet() {
- s.outUnlock()
- return 0, errors.New("write to closed stream")
- }
- if len(b) == 0 {
- // We aren't writing any data, but send a STREAM frame to open the stream
- // if we haven't done so already.
- s.outopened.set()
- }
- for len(b) > 0 {
+ for {
// The first time through this loop, we may or may not be write blocked.
// We exit the loop after writing all data, so on subsequent passes through
// the loop we are always write blocked.
- if !canWrite {
+ if len(b) > 0 && !canWrite {
// We're blocked, either by flow control or by our own buffer limit.
// We either need the peer to extend our flow control window,
// or ack some of our outstanding packets.
@@ -181,6 +187,21 @@
// write blocked. (Unlike traditional condition variables, gates do not
// have spurious wakeups.)
}
+ if s.outreset.isSet() {
+ s.outUnlock()
+ return n, errors.New("write to reset stream")
+ }
+ if s.outclosed.isSet() {
+ s.outUnlock()
+ return n, errors.New("write to closed stream")
+ }
+ // We set outopened here rather than below,
+ // so if this is a zero-length write we still
+ // open the stream despite not writing any data to it.
+ s.outopened.set()
+ if len(b) == 0 {
+ break
+ }
s.outblocked.clear()
// Write limit is min(our own buffer limit, the peer-provided flow control window).
// This is a stream offset.
@@ -191,7 +212,6 @@
// Copy the data into the output buffer and mark it as unsent.
s.outunsent.add(s.out.end, s.out.end+nn)
s.out.writeAt(b[:nn], s.out.end)
- s.outopened.set()
b = b[nn:]
n += int(nn)
// If we have bytes left to send, we're blocked.
@@ -218,9 +238,8 @@
func (s *Stream) CloseContext(ctx context.Context) error {
s.CloseRead()
s.CloseWrite()
- // TODO: wait for peer to acknowledge data
// TODO: Return code from peer's RESET_STREAM frame?
- return nil
+ return s.conn.waitOnDone(ctx, s.outdone)
}
// CloseRead aborts reads on the stream.
@@ -233,7 +252,17 @@
if s.IsWriteOnly() {
return
}
- // TODO: support read-closing streams with a STOP_SENDING frame
+ s.ingate.lock()
+ defer s.inUnlock()
+ if s.inset.isrange(0, s.insize) || s.inresetcode != -1 {
+ // We've already received all data from the peer,
+ // so there's no need to send STOP_SENDING.
+ // This is the same as saying we sent one and they got it.
+ s.inclosed.setReceived()
+ } else {
+ s.inclosed.set()
+ }
+ s.in.discardBefore(s.in.end)
}
// CloseWrite aborts writes on the stream.
@@ -251,6 +280,29 @@
s.outclosed.set()
}
+// Reset aborts writes on the stream and notifies the peer
+// that the stream was terminated abruptly.
+// Any blocked writes will be unblocked and return errors.
+//
+// Reset sends the application protocol error code to the peer.
+// It does not wait for the peer to acknowledge receipt of the error.
+// Use CloseContext to wait for the peer's acknowledgement.
+func (s *Stream) Reset(code uint64) {
+ s.outgate.lock()
+ defer s.outUnlock()
+ if s.outreset.isSet() {
+ return
+ }
+ // We could check here to see if the stream is closed and the
+ // peer has acked all the data and the FIN, but sending an
+ // extra RESET_STREAM in this case is harmless.
+ s.outreset.set()
+ s.outresetcode = code
+ s.out.discardBefore(s.out.end)
+ s.outunsent = rangeset[int64]{}
+ s.outblocked.clear()
+}
+
// inUnlock unlocks s.ingate.
// It sets the gate condition if reads from s will not block.
// If s has receive-related frames to write, it notifies the Conn.
@@ -263,11 +315,13 @@
// inUnlockNoQueue is inUnlock,
// but reports whether s has frames to write rather than notifying the Conn.
func (s *Stream) inUnlockNoQueue() (shouldSend bool) {
- // TODO: STOP_SENDING
canRead := s.inset.contains(s.in.start) || // data available to read
- s.insize == s.in.start // at EOF
- s.ingate.unlock(canRead)
- return s.insendmax.shouldSend() // STREAM_MAX_DATA
+ s.insize == s.in.start || // at EOF
+ s.inresetcode != -1 || // reset by peer
+ s.inclosed.isSet() // closed locally
+ defer s.ingate.unlock(canRead)
+ return s.insendmax.shouldSend() || // STREAM_MAX_DATA
+ s.inclosed.shouldSend() // STOP_SENDING
}
// outUnlock unlocks s.outgate.
@@ -282,10 +336,24 @@
// outUnlockNoQueue is outUnlock,
// but reports whether s has frames to write rather than notifying the Conn.
func (s *Stream) outUnlockNoQueue() (shouldSend bool) {
+ isDone := s.outclosed.isReceived() && s.outacked.isrange(0, s.out.end) || // all data acked
+ s.outreset.isSet() // reset locally
+ if isDone {
+ select {
+ case <-s.outdone:
+ default:
+ close(s.outdone)
+ }
+ }
lim := min(s.out.start+s.outmaxbuf, s.outwin)
canWrite := lim > s.out.end || // available flow control
- s.outclosed.isSet() // closed
- s.outgate.unlock(canWrite)
+ s.outclosed.isSet() || // closed locally
+ s.outreset.isSet() // reset locally
+ defer s.outgate.unlock(canWrite)
+ if s.outreset.isSet() {
+ // If the stream is reset locally, the only frame we'll send is RESET_STREAM.
+ return s.outreset.shouldSend()
+ }
return len(s.outunsent) > 0 || // STREAM frame with data
s.outclosed.shouldSend() || // STREAM frame with FIN bit
s.outopened.shouldSend() || // STREAM frame with no data
@@ -297,6 +365,44 @@
s.ingate.lock()
defer s.inUnlock()
end := off + int64(len(b))
+ if err := s.checkStreamBounds(end, fin); err != nil {
+ return err
+ }
+ if s.inclosed.isSet() || s.inresetcode != -1 {
+ // The user read-closed the stream, or the peer reset it.
+ // Either way, we can discard this frame.
+ return nil
+ }
+ s.in.writeAt(b, off)
+ s.inset.add(off, end)
+ if fin {
+ s.insize = end
+ // The peer has enough flow control window to send the entire stream.
+ s.insendmax.clear()
+ }
+ return nil
+}
+
+// handleReset handles a RESET_STREAM frame.
+func (s *Stream) handleReset(code uint64, finalSize int64) error {
+ s.ingate.lock()
+ defer s.inUnlock()
+ const fin = true
+ if err := s.checkStreamBounds(finalSize, fin); err != nil {
+ return err
+ }
+ if s.inresetcode != -1 {
+ // The stream was already reset.
+ return nil
+ }
+ s.in.discardBefore(s.in.end)
+ s.inresetcode = int64(code)
+ s.insize = finalSize
+ return nil
+}
+
+// checkStreamBounds validates the stream offset in a STREAM or RESET_STREAM frame.
+func (s *Stream) checkStreamBounds(end int64, fin bool) error {
if end > s.inwin {
// The peer sent us data past the maximum flow control window we gave them.
return localTransportError(errFlowControl)
@@ -305,17 +411,22 @@
// The peer sent us data past the final size of the stream they previously gave us.
return localTransportError(errFinalSize)
}
- s.in.writeAt(b, off)
- s.inset.add(off, end)
- if fin {
- if s.insize != -1 && s.insize != end {
- // The peer changed the final size of the stream.
- return localTransportError(errFinalSize)
- }
- s.insize = end
- // The peer has enough flow control window to send the entire stream.
- s.insendmax.clear()
+ if fin && s.insize != -1 && end != s.insize {
+ // The peer changed the final size of the stream.
+ return localTransportError(errFinalSize)
}
+ if fin && end < s.in.end {
+ // The peer has previously sent us data past the final size.
+ return localTransportError(errFinalSize)
+ }
+ return nil
+}
+
+// handleStopSending handles a STOP_SENDING frame.
+func (s *Stream) handleStopSending(code uint64) error {
+ // Peer requests that we reset this stream.
+ // https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4
+ s.Reset(code)
return nil
}
@@ -336,6 +447,14 @@
// Frames which are always the same (STOP_SENDING, RESET_STREAM)
// can be marked as received if any packet carrying this frame is acked.
switch ftype {
+ case frameTypeResetStream:
+ s.outgate.lock()
+ s.outreset.ackOrLoss(pnum, fate)
+ s.outUnlock()
+ case frameTypeStopSending:
+ s.ingate.lock()
+ s.inclosed.ackOrLoss(pnum, fate)
+ s.inUnlock()
case frameTypeMaxStreamData:
s.ingate.lock()
s.insendmax.ackLatestOrLoss(pnum, fate)
@@ -345,7 +464,6 @@
s.outblocked.ackLatestOrLoss(pnum, fate)
s.outUnlock()
default:
- // TODO: Handle STOP_SENDING, RESET_STREAM.
panic("unhandled frame type")
}
}
@@ -358,6 +476,10 @@
if fin {
s.outclosed.ackOrLoss(pnum, fate)
}
+ if s.outreset.isSet() {
+ // If the stream has been reset, we don't care any more.
+ return
+ }
switch fate {
case packetAcked:
s.outacked.add(start, end)
@@ -385,6 +507,15 @@
func (s *Stream) appendInFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
s.ingate.lock()
defer s.inUnlockNoQueue()
+ if s.inclosed.shouldSendPTO(pto) {
+ // We don't currently have an API for setting the error code.
+ // Just send zero.
+ code := uint64(0)
+ if !w.appendStopSendingFrame(s.id, code) {
+ return false
+ }
+ s.inclosed.setSent(pnum)
+ }
// TODO: STOP_SENDING
if s.insendmax.shouldSendPTO(pto) {
// MAX_STREAM_DATA
@@ -406,7 +537,17 @@
func (s *Stream) appendOutFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
s.outgate.lock()
defer s.outUnlockNoQueue()
- // TODO: RESET_STREAM
+ if s.outreset.isSet() {
+ // RESET_STREAM
+ if s.outreset.shouldSendPTO(pto) {
+ if !w.appendResetStreamFrame(s.id, s.outresetcode, s.out.end) {
+ return false
+ }
+ s.outreset.setSent(pnum)
+ s.frameOpensStream(pnum)
+ }
+ return true
+ }
if s.outblocked.shouldSendPTO(pto) {
// STREAM_DATA_BLOCKED
if !w.appendStreamDataBlockedFrame(s.id, s.out.end) {
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index d158e72..5904a93 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -10,6 +10,7 @@
"bytes"
"context"
"crypto/rand"
+ "errors"
"fmt"
"io"
"reflect"
@@ -489,32 +490,76 @@
})
}
-func TestStreamFinalSizeChangedByStreamFrame(t *testing.T) {
- // "If a [...] STREAM frame is received indicating a change
- // in the final size for the stream, an endpoint SHOULD
- // respond with an error of type FINAL_SIZE_ERROR [...]"
- // https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
+func finalSizeTest(t *testing.T, wantErr transportError, f func(tc *testConn, sid streamID) (finalSize int64), opts ...any) {
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
- tc := newTestConn(t, serverSide)
- tc.handshake()
- sid := newStreamID(clientSide, styp, 0)
+ for _, test := range []struct {
+ name string
+ finalFrame func(tc *testConn, sid streamID, finalSize int64)
+ }{{
+ name: "FIN",
+ finalFrame: func(tc *testConn, sid streamID, finalSize int64) {
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: finalSize,
+ fin: true,
+ })
+ },
+ }, {
+ name: "RESET_STREAM",
+ finalFrame: func(tc *testConn, sid streamID, finalSize int64) {
+ tc.writeFrames(packetType1RTT, debugFrameResetStream{
+ id: sid,
+ finalSize: finalSize,
+ })
+ },
+ }} {
+ t.Run(test.name, func(t *testing.T) {
+ tc := newTestConn(t, serverSide, opts...)
+ tc.handshake()
+ sid := newStreamID(clientSide, styp, 0)
+ finalSize := f(tc, sid)
+ test.finalFrame(tc, sid, finalSize)
+ tc.wantFrame("change in final size of stream is an error",
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: wantErr,
+ },
+ )
+ })
+ }
+ })
+}
- const write1size = 4
+func TestStreamFinalSizeChangedAfterFin(t *testing.T) {
+ // "If a RESET_STREAM or STREAM frame is received indicating a change
+ // in the final size for the stream, an endpoint SHOULD respond with
+ // an error of type FINAL_SIZE_ERROR [...]"
+ // https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
+ finalSizeTest(t, errFinalSize, func(tc *testConn, sid streamID) (finalSize int64) {
tc.writeFrames(packetType1RTT, debugFrameStream{
id: sid,
off: 10,
fin: true,
})
+ return 9
+ })
+}
+
+func TestStreamFinalSizeBeforePreviousData(t *testing.T) {
+ finalSizeTest(t, errFinalSize, func(tc *testConn, sid streamID) (finalSize int64) {
tc.writeFrames(packetType1RTT, debugFrameStream{
- id: sid,
- off: 9,
- fin: true,
+ id: sid,
+ off: 10,
+ data: []byte{0},
})
- tc.wantFrame("change in final size of stream is an error",
- packetType1RTT, debugFrameConnectionCloseTransport{
- code: errFinalSize,
- },
- )
+ return 9
+ })
+}
+
+func TestStreamFinalSizePastMaxStreamData(t *testing.T) {
+ finalSizeTest(t, errFlowControl, func(tc *testConn, sid streamID) (finalSize int64) {
+ return 11
+ }, func(c *Config) {
+ c.StreamReadBufferSize = 10
})
}
@@ -637,6 +682,19 @@
})
}
+func TestStreamResetStreamInvalidState(t *testing.T) {
+ // "An endpoint that receives a RESET_STREAM frame for a send-only
+ // stream MUST terminate the connection with error STREAM_STATE_ERROR."
+ // https://www.rfc-editor.org/rfc/rfc9000#section-19.4-3
+ testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
+ return debugFrameResetStream{
+ id: sid,
+ code: 0,
+ finalSize: 0,
+ }
+ })
+}
+
func TestStreamStreamFrameInvalidState(t *testing.T) {
// "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
// if it receives a STREAM frame for a locally initiated stream
@@ -689,6 +747,20 @@
})
}
+func TestStreamStopSendingInvalidState(t *testing.T) {
+ // "Receiving a STOP_SENDING frame for a locally initiated stream
+ // that has not yet been created MUST be treated as a connection error
+ // of type STREAM_STATE_ERROR. An endpoint that receives a STOP_SENDING
+ // frame for a receive-only stream MUST terminate the connection with
+ // error STREAM_STATE_ERROR."
+ // https://www.rfc-editor.org/rfc/rfc9000#section-19.5-2
+ testStreamReceiveFrameInvalidState(t, func(sid streamID) debugFrame {
+ return debugFrameStopSending{
+ id: sid,
+ }
+ })
+}
+
func TestStreamMaxStreamDataInvalidState(t *testing.T) {
// "Receiving a MAX_STREAM_DATA frame for a locally initiated stream
// that has not yet been created MUST be treated as a connection error
@@ -743,13 +815,47 @@
}
}
-func TestStreamWriteToClosedStream(t *testing.T) {
- tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, func(p *transportParameters) {
- p.initialMaxStreamsBidi = 1
- p.initialMaxData = 1 << 20
- p.initialMaxStreamDataBidiRemote = 1 << 20
+func TestStreamReadFromClosedStream(t *testing.T) {
+ tc, s := newTestConnAndRemoteStream(t, serverSide, bidiStream, permissiveTransportParameters)
+ s.CloseRead()
+ tc.wantFrame("CloseRead sends a STOP_SENDING frame",
+ packetType1RTT, debugFrameStopSending{
+ id: s.id,
+ })
+ wantErr := "read from closed stream"
+ if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
+ t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
+ }
+ // Data which shows up after STOP_SENDING is discarded.
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: []byte{1, 2, 3},
+ fin: true,
})
- s.Close()
+ if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
+ t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
+ }
+}
+
+func TestStreamCloseReadWithAllDataReceived(t *testing.T) {
+ tc, s := newTestConnAndRemoteStream(t, serverSide, bidiStream, permissiveTransportParameters)
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: []byte{1, 2, 3},
+ fin: true,
+ })
+ s.CloseRead()
+ tc.wantIdle("CloseRead in Data Recvd state doesn't need to send STOP_SENDING")
+ // We had all the data for the stream, but CloseRead discarded it.
+ wantErr := "read from closed stream"
+ if n, err := s.Read(make([]byte, 16)); err == nil || !strings.Contains(err.Error(), wantErr) {
+ t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
+ }
+}
+
+func TestStreamWriteToClosedStream(t *testing.T) {
+ tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, permissiveTransportParameters)
+ s.CloseWrite()
tc.wantFrame("stream is opened after being closed",
packetType1RTT, debugFrameStream{
id: s.id,
@@ -763,6 +869,45 @@
}
}
+func TestStreamResetBlockedStream(t *testing.T) {
+ tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, func(p *transportParameters) {
+ p.initialMaxStreamsBidi = 1
+ p.initialMaxData = 1 << 20
+ p.initialMaxStreamDataBidiRemote = 4
+ })
+ tc.ignoreFrame(frameTypeStreamDataBlocked)
+ writing := runAsync(tc, func(ctx context.Context) (int, error) {
+ return s.WriteContext(ctx, []byte{0, 1, 2, 3, 4, 5, 6, 7})
+ })
+ tc.wantFrame("stream writes data until blocked by flow control",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: []byte{0, 1, 2, 3},
+ })
+ s.Reset(42)
+ tc.wantFrame("stream is reset",
+ packetType1RTT, debugFrameResetStream{
+ id: s.id,
+ code: 42,
+ finalSize: 4,
+ })
+ wantErr := "write to reset stream"
+ if n, err := writing.result(); n != 4 || !strings.Contains(err.Error(), wantErr) {
+ t.Errorf("s.Write() interrupted by Reset: %v, %q; want 4, %q", n, err, wantErr)
+ }
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 1 << 20,
+ })
+ tc.wantIdle("flow control is available, but stream has been reset")
+ s.Reset(100)
+ tc.wantIdle("resetting stream a second time has no effect")
+ if n, err := s.Write([]byte{}); err == nil || !strings.Contains(err.Error(), wantErr) {
+ t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
+ }
+}
+
func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
p.initialMaxStreamsUni = 1
@@ -797,6 +942,209 @@
}
}
+func TestStreamCloseWaitsForAcks(t *testing.T) {
+ ctx := canceledContext()
+ tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
+ data := make([]byte, 100)
+ s.WriteContext(ctx, data)
+ tc.wantFrame("conn sends data for the stream",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: data,
+ })
+ if err := s.CloseContext(ctx); err != context.Canceled {
+ t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
+ }
+ tc.wantFrame("conn sends FIN for closed stream",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: int64(len(data)),
+ fin: true,
+ data: []byte{},
+ })
+ closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
+ return struct{}{}, s.CloseContext(ctx)
+ })
+ if _, err := closing.result(); err != errNotDone {
+ t.Fatalf("s.CloseContext() = %v, want it to block waiting for acks", err)
+ }
+ tc.writeFrames(packetType1RTT, debugFrameAck{
+ ranges: []i64range[packetNumber]{{0, tc.sentFramePacket.num + 1}},
+ })
+ if _, err := closing.result(); err != nil {
+ t.Fatalf("s.CloseContext() = %v, want nil (all data acked)", err)
+ }
+}
+
+func TestStreamCloseUnblocked(t *testing.T) {
+ for _, test := range []struct {
+ name string
+ unblock func(tc *testConn, s *Stream)
+ }{{
+ name: "data received",
+ unblock: func(tc *testConn, s *Stream) {
+ tc.writeFrames(packetType1RTT, debugFrameAck{
+ ranges: []i64range[packetNumber]{{0, tc.sentFramePacket.num + 1}},
+ })
+ },
+ }, {
+ name: "stop sending received",
+ unblock: func(tc *testConn, s *Stream) {
+ tc.writeFrames(packetType1RTT, debugFrameStopSending{
+ id: s.id,
+ })
+ },
+ }, {
+ name: "stream reset",
+ unblock: func(tc *testConn, s *Stream) {
+ s.Reset(0)
+ tc.wait() // wait for test conn to process the Reset
+ },
+ }} {
+ t.Run(test.name, func(t *testing.T) {
+ ctx := canceledContext()
+ tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters)
+ data := make([]byte, 100)
+ s.WriteContext(ctx, data)
+ tc.wantFrame("conn sends data for the stream",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: data,
+ })
+ if err := s.CloseContext(ctx); err != context.Canceled {
+ t.Fatalf("s.Close() = %v, want context.Canceled (data not acked yet)", err)
+ }
+ tc.wantFrame("conn sends FIN for closed stream",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: int64(len(data)),
+ fin: true,
+ data: []byte{},
+ })
+ closing := runAsync(tc, func(ctx context.Context) (struct{}, error) {
+ return struct{}{}, s.CloseContext(ctx)
+ })
+ if _, err := closing.result(); err != errNotDone {
+ t.Fatalf("s.CloseContext() = %v, want it to block waiting for acks", err)
+ }
+ test.unblock(tc, s)
+ if _, err := closing.result(); err != nil {
+ t.Fatalf("s.CloseContext() = %v, want nil (all data acked)", err)
+ }
+ })
+ }
+}
+
+func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ ctx := canceledContext()
+ tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
+ data := []byte{0, 1, 2, 3, 4, 5, 6, 7}
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: data,
+ })
+ got := make([]byte, 4)
+ if n, err := s.ReadContext(ctx, got); n != len(got) || err != nil {
+ t.Fatalf("Read start of stream: got %v, %v; want %v, nil", n, err, len(got))
+ }
+ const sentCode = 42
+ tc.writeFrames(packetType1RTT, debugFrameResetStream{
+ id: s.id,
+ finalSize: 20,
+ code: sentCode,
+ })
+ wantErr := StreamErrorCode(sentCode)
+ if n, err := s.ReadContext(ctx, got); n != 0 || !errors.Is(err, wantErr) {
+ t.Fatalf("Read reset stream: got %v, %v; want 0, %v", n, err, wantErr)
+ }
+ })
+}
+
+func TestStreamPeerResetWakesBlockedRead(t *testing.T) {
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
+ reader := runAsync(tc, func(ctx context.Context) (int, error) {
+ got := make([]byte, 4)
+ return s.ReadContext(ctx, got)
+ })
+ const sentCode = 42
+ tc.writeFrames(packetType1RTT, debugFrameResetStream{
+ id: s.id,
+ finalSize: 20,
+ code: sentCode,
+ })
+ wantErr := StreamErrorCode(sentCode)
+ if n, err := reader.result(); n != 0 || !errors.Is(err, wantErr) {
+ t.Fatalf("Read reset stream: got %v, %v; want 0, %v", n, err, wantErr)
+ }
+ })
+}
+
+func TestStreamPeerResetFollowedByData(t *testing.T) {
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ tc, s := newTestConnAndRemoteStream(t, serverSide, styp)
+ tc.writeFrames(packetType1RTT, debugFrameResetStream{
+ id: s.id,
+ finalSize: 4,
+ code: 1,
+ })
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: []byte{0, 1, 2, 3},
+ })
+ // Another reset with a different code, for good measure.
+ tc.writeFrames(packetType1RTT, debugFrameResetStream{
+ id: s.id,
+ finalSize: 4,
+ code: 2,
+ })
+ wantErr := StreamErrorCode(1)
+ if n, err := s.Read(make([]byte, 16)); n != 0 || !errors.Is(err, wantErr) {
+ t.Fatalf("Read from reset stream: got %v, %v; want 0, %v", n, err, wantErr)
+ }
+ })
+}
+
+func TestStreamPeerStopSendingForActiveStream(t *testing.T) {
+ // "An endpoint that receives a STOP_SENDING frame MUST send a RESET_STREAM frame if
+ // the stream is in the "Ready" or "Send" state."
+ // https://www.rfc-editor.org/rfc/rfc9000#section-3.5-4
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ tc, s := newTestConnAndLocalStream(t, serverSide, styp, permissiveTransportParameters)
+ for i := 0; i < 4; i++ {
+ s.Write([]byte{byte(i)})
+ tc.wantFrame("write sends a STREAM frame to peer",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: int64(i),
+ data: []byte{byte(i)},
+ })
+ }
+ tc.writeFrames(packetType1RTT, debugFrameStopSending{
+ id: s.id,
+ code: 42,
+ })
+ tc.wantFrame("receiving STOP_SENDING causes stream reset",
+ packetType1RTT, debugFrameResetStream{
+ id: s.id,
+ code: 42,
+ finalSize: 4,
+ })
+ if n, err := s.Write([]byte{0}); err == nil {
+ t.Errorf("s.Write() after STOP_SENDING = %v, %v; want error", n, err)
+ }
+ // This ack will result in some of the previous frames being marked as lost.
+ tc.writeFrames(packetType1RTT, debugFrameAck{
+ ranges: []i64range[packetNumber]{{
+ tc.sentFramePacket.num,
+ tc.sentFramePacket.num + 1,
+ }},
+ })
+ tc.wantIdle("lost STREAM frames for reset stream are not resent")
+ })
+}
+
func newTestConnAndLocalStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
t.Helper()
ctx := canceledContext()
@@ -825,3 +1173,13 @@
}
return tc, s
}
+
+// permissiveTransportParameters may be passed as an option to newTestConn.
+func permissiveTransportParameters(p *transportParameters) {
+ p.initialMaxStreamsBidi = maxVarint
+ p.initialMaxStreamsUni = maxVarint
+ p.initialMaxData = maxVarint
+ p.initialMaxStreamDataBidiRemote = maxVarint
+ p.initialMaxStreamDataBidiLocal = maxVarint
+ p.initialMaxStreamDataUni = maxVarint
+}
diff --git a/internal/quic/wire.go b/internal/quic/wire.go
index f0643c9..8486029 100644
--- a/internal/quic/wire.go
+++ b/internal/quic/wire.go
@@ -8,7 +8,10 @@
import "encoding/binary"
-const maxVarintSize = 8
+const (
+ maxVarintSize = 8 // encoded size in bytes
+ maxVarint = (1 << 62) - 1
+)
// consumeVarint parses a variable-length integer, reporting its length.
// It returns a negative length upon an error.