quic: send and receive stream data
Send and receive data in STREAM frames.
Write-close streams and communicate the final size in a
STREAM frame with the FIN bit. Return io.EOF on reads
at the end of a stream.
Handle stream-level flow control. Send window updates
in MAX_STREAM_DATA frames, send STREAM_DATA_BLOCKED
when flow control is not available.
Does not include connection-level flow control,
read-closing, aborting, or removing streams from
a conn after both sides have closed the stream.
For golang/go#58547
Change-Id: Ib2b449bf54eb6cf200c4f6e2dd2c33274dda3387
Reviewed-on: https://go-review.googlesource.com/c/net/+/515815
Run-TryBot: Damien Neil <dneil@google.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
diff --git a/internal/quic/config.go b/internal/quic/config.go
index 7d1b743..df49357 100644
--- a/internal/quic/config.go
+++ b/internal/quic/config.go
@@ -17,4 +17,29 @@
// TLSConfig is the endpoint's TLS configuration.
// It must be non-nil and include at least one certificate or else set GetCertificate.
TLSConfig *tls.Config
+
+ // StreamReadBufferSize is the maximum amount of data sent by the peer that a
+ // stream will buffer for reading.
+ // If zero, the default value of 1MiB is used.
+ // If negative, the limit is zero.
+ StreamReadBufferSize int64
+
+ // StreamWriteBufferSize is the maximum amount of data a stream will buffer for
+ // sending to the peer.
+ // If zero, the default value of 1MiB is used.
+ // If negative, the limit is zero.
+ StreamWriteBufferSize int64
}
+
+func configDefault(v, def int64) int64 {
+ switch v {
+ case -1:
+ return 0
+ case 0:
+ return def
+ }
+ return v
+}
+
+func (c *Config) streamReadBufferSize() int64 { return configDefault(c.StreamReadBufferSize, 1<<20) }
+func (c *Config) streamWriteBufferSize() int64 { return configDefault(c.StreamWriteBufferSize, 1<<20) }
diff --git a/internal/quic/conn.go b/internal/quic/conn.go
index 90e6739..0952a79 100644
--- a/internal/quic/conn.go
+++ b/internal/quic/conn.go
@@ -160,6 +160,9 @@
// receiveTransportParameters applies transport parameters sent by the peer.
func (c *Conn) receiveTransportParameters(p transportParameters) error {
+ c.streams.peerInitialMaxStreamDataBidiLocal = p.initialMaxStreamDataBidiLocal
+ c.streams.peerInitialMaxStreamDataRemote[bidiStream] = p.initialMaxStreamDataBidiRemote
+ c.streams.peerInitialMaxStreamDataRemote[uniStream] = p.initialMaxStreamDataUni
c.peerAckDelayExponent = p.ackDelayExponent
c.loss.setMaxAckDelay(p.maxAckDelay)
if err := c.connIDState.setPeerActiveConnIDLimit(p.activeConnIDLimit, c.newConnIDFunc()); err != nil {
diff --git a/internal/quic/conn_loss.go b/internal/quic/conn_loss.go
index ca17808..f42f7e5 100644
--- a/internal/quic/conn_loss.go
+++ b/internal/quic/conn_loss.go
@@ -44,6 +44,14 @@
case frameTypeCrypto:
start, end := sent.nextRange()
c.crypto[space].ackOrLoss(start, end, fate)
+ case frameTypeMaxStreamData,
+ frameTypeStreamDataBlocked:
+ id := streamID(sent.nextInt())
+ s := c.streamForID(id)
+ if s == nil {
+ continue
+ }
+ s.ackOrLoss(sent.num, f, fate)
case frameTypeStreamBase,
frameTypeStreamBase | streamFinBit:
id := streamID(sent.nextInt())
diff --git a/internal/quic/conn_loss_test.go b/internal/quic/conn_loss_test.go
index e3d16a7..d944515 100644
--- a/internal/quic/conn_loss_test.go
+++ b/internal/quic/conn_loss_test.go
@@ -7,7 +7,9 @@
package quic
import (
+ "context"
"crypto/tls"
+ "fmt"
"testing"
)
@@ -145,7 +147,275 @@
data: []byte{},
})
})
+}
+func TestLostStreamWithData(t *testing.T) {
+ // "Application data sent in STREAM frames is retransmitted in new STREAM
+ // frames unless the endpoint has sent a RESET_STREAM for that stream."
+ // https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.2
+ //
+ // TODO: Lost stream frame after RESET_STREAM
+ lostFrameTest(t, func(t *testing.T, pto bool) {
+ data := []byte{0, 1, 2, 3, 4, 5, 6, 7}
+ tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+ p.initialMaxStreamsUni = 1
+ p.initialMaxData = 1 << 20
+ p.initialMaxStreamDataUni = 1 << 20
+ })
+ s.Write(data[:4])
+ tc.wantFrame("send [0,4)",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: data[:4],
+ })
+ s.Write(data[4:8])
+ tc.wantFrame("send [4,8)",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 4,
+ data: data[4:8],
+ })
+ s.Close()
+ tc.wantFrame("send FIN",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 8,
+ fin: true,
+ data: []byte{},
+ })
+
+ tc.triggerLossOrPTO(packetType1RTT, pto)
+ tc.wantFrame("resend data",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ fin: true,
+ data: data[:8],
+ })
+ })
+}
+
+func TestLostStreamPartialLoss(t *testing.T) {
+ // Conn sends four STREAM packets.
+ // ACKs are received for the packets containing bytes 0 and 2.
+ // The remaining packets are declared lost.
+ // The Conn resends only the lost data.
+ //
+ // This test doesn't have a PTO mode, because the ACK for the packet containing byte 2
+ // starts the loss timer for the packet containing byte 1, and the PTO timer is not
+ // armed when the loss timer is.
+ data := []byte{0, 1, 2, 3}
+ tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+ p.initialMaxStreamsUni = 1
+ p.initialMaxData = 1 << 20
+ p.initialMaxStreamDataUni = 1 << 20
+ })
+ for i := range data {
+ s.Write(data[i : i+1])
+ tc.wantFrame(fmt.Sprintf("send STREAM frame with byte %v", i),
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: int64(i),
+ data: data[i : i+1],
+ })
+ if i%2 == 0 {
+ num := tc.sentFramePacket.num
+ tc.writeFrames(packetType1RTT, debugFrameAck{
+ ranges: []i64range[packetNumber]{
+ {num, num + 1},
+ },
+ })
+ }
+ }
+ const pto = false
+ tc.triggerLossOrPTO(packetType1RTT, pto)
+ tc.wantFrame("resend byte 1",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 1,
+ data: data[1:2],
+ })
+ tc.wantFrame("resend byte 3",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 3,
+ data: data[3:4],
+ })
+ tc.wantIdle("no more frames sent after packet loss")
+}
+
+func TestLostMaxStreamDataFrame(t *testing.T) {
+ // "[...] an updated value is sent when the packet containing
+ // the most recent MAX_STREAM_DATA frame for a stream is lost"
+ // https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.8
+ lostFrameTest(t, func(t *testing.T, pto bool) {
+ const maxWindowSize = 10
+ buf := make([]byte, maxWindowSize)
+ tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+ c.StreamReadBufferSize = maxWindowSize
+ })
+
+ // We send MAX_STREAM_DATA = 19.
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: make([]byte, maxWindowSize),
+ })
+ if n, err := s.Read(buf[:maxWindowSize-1]); err != nil || n != maxWindowSize-1 {
+ t.Fatalf("Read() = %v, %v; want %v, nil", n, err, maxWindowSize-1)
+ }
+ tc.wantFrame("stream window is extended after reading data",
+ packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: (maxWindowSize * 2) - 1,
+ })
+
+ // MAX_STREAM_DATA = 20, which is only one more byte, so we don't send the frame.
+ if n, err := s.Read(buf); err != nil || n != 1 {
+ t.Fatalf("Read() = %v, %v; want %v, nil", n, err, 1)
+ }
+ tc.wantIdle("read doesn't extend window enough to send another MAX_STREAM_DATA")
+
+ // The MAX_STREAM_DATA = 19 packet was lost, so we send 20.
+ tc.triggerLossOrPTO(packetType1RTT, pto)
+ tc.wantFrame("resent MAX_STREAM_DATA includes most current value",
+ packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: maxWindowSize * 2,
+ })
+ })
+}
+
+func TestLostMaxStreamDataFrameAfterStreamFinReceived(t *testing.T) {
+ // "An endpoint SHOULD stop sending MAX_STREAM_DATA frames when
+ // the receiving part of the stream enters a "Size Known" or "Reset Recvd" state."
+ // https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.8
+ lostFrameTest(t, func(t *testing.T, pto bool) {
+ const maxWindowSize = 10
+ buf := make([]byte, maxWindowSize)
+ tc, s := newTestConnAndRemoteStream(t, serverSide, uniStream, func(c *Config) {
+ c.StreamReadBufferSize = maxWindowSize
+ })
+
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: make([]byte, maxWindowSize),
+ })
+ if n, err := s.Read(buf); err != nil || n != maxWindowSize {
+ t.Fatalf("Read() = %v, %v; want %v, nil", n, err, maxWindowSize)
+ }
+ tc.wantFrame("stream window is extended after reading data",
+ packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 2 * maxWindowSize,
+ })
+
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: maxWindowSize,
+ fin: true,
+ })
+
+ tc.ignoreFrame(frameTypePing)
+ tc.triggerLossOrPTO(packetType1RTT, pto)
+ tc.wantIdle("lost MAX_STREAM_DATA not resent for stream in 'size known'")
+ })
+}
+
+func TestLostStreamDataBlockedFrame(t *testing.T) {
+ // "A new [STREAM_DATA_BLOCKED] frame is sent if a packet containing
+ // the most recent frame for a scope is lost [...]"
+ // https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.10
+ lostFrameTest(t, func(t *testing.T, pto bool) {
+ tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+ p.initialMaxStreamsUni = 1
+ p.initialMaxData = 1 << 20
+ })
+
+ w := runAsync(tc, func(ctx context.Context) (int, error) {
+ return s.WriteContext(ctx, []byte{0, 1, 2, 3})
+ })
+ defer w.cancel()
+ tc.wantFrame("write is blocked by flow control",
+ packetType1RTT, debugFrameStreamDataBlocked{
+ id: s.id,
+ max: 0,
+ })
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 1,
+ })
+ tc.wantFrame("write makes some progress, but is still blocked by flow control",
+ packetType1RTT, debugFrameStreamDataBlocked{
+ id: s.id,
+ max: 1,
+ })
+ tc.wantFrame("write consuming available window",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: []byte{0},
+ })
+
+ tc.triggerLossOrPTO(packetType1RTT, pto)
+ tc.wantFrame("STREAM_DATA_BLOCKED is resent",
+ packetType1RTT, debugFrameStreamDataBlocked{
+ id: s.id,
+ max: 1,
+ })
+ tc.wantFrame("STREAM is resent as well",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: []byte{0},
+ })
+ })
+}
+
+func TestLostStreamDataBlockedFrameAfterStreamUnblocked(t *testing.T) {
+ // "A new [STREAM_DATA_BLOCKED] frame is sent [...] only while
+ // the endpoint is blocked on the corresponding limit."
+ // https://www.rfc-editor.org/rfc/rfc9000#section-13.3-3.10
+ lostFrameTest(t, func(t *testing.T, pto bool) {
+ tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+ p.initialMaxStreamsUni = 1
+ p.initialMaxData = 1 << 20
+ })
+
+ data := []byte{0, 1, 2, 3}
+ w := runAsync(tc, func(ctx context.Context) (int, error) {
+ return s.WriteContext(ctx, data)
+ })
+ defer w.cancel()
+ tc.wantFrame("write is blocked by flow control",
+ packetType1RTT, debugFrameStreamDataBlocked{
+ id: s.id,
+ max: 0,
+ })
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 10,
+ })
+ tc.wantFrame("write completes after flow control available",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: data,
+ })
+
+ tc.triggerLossOrPTO(packetType1RTT, pto)
+ tc.wantFrame("STREAM data is resent",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: data,
+ })
+ tc.wantIdle("STREAM_DATA_BLOCKED is not resent, since the stream is not blocked")
+ })
}
func TestLostNewConnectionIDFrame(t *testing.T) {
diff --git a/internal/quic/conn_recv.go b/internal/quic/conn_recv.go
index 45ef384..00985b6 100644
--- a/internal/quic/conn_recv.go
+++ b/internal/quic/conn_recv.go
@@ -191,7 +191,7 @@
if !frameOK(c, ptype, __01) {
return
}
- _, _, n = consumeMaxStreamDataFrame(payload)
+ n = c.handleMaxStreamDataFrame(now, payload)
case frameTypeMaxStreamsBidi, frameTypeMaxStreamsUni:
if !frameOK(c, ptype, __01) {
return
@@ -280,6 +280,17 @@
return n
}
+func (c *Conn) handleMaxStreamDataFrame(now time.Time, payload []byte) int {
+ id, maxStreamData, n := consumeMaxStreamDataFrame(payload)
+ if s := c.streamForFrame(now, id, sendStream); s != nil {
+ if err := s.handleMaxStreamData(maxStreamData); err != nil {
+ c.abort(now, err)
+ return -1
+ }
+ }
+ return n
+}
+
func (c *Conn) handleCryptoFrame(now time.Time, space numberSpace, payload []byte) int {
off, data, n := consumeCryptoFrame(payload)
err := c.handleCrypto(now, space, off, data)
diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go
index f626323..7a531f5 100644
--- a/internal/quic/conn_streams.go
+++ b/internal/quic/conn_streams.go
@@ -20,6 +20,10 @@
streams map[streamID]*Stream
opened [streamTypeCount]int64 // number of streams opened by us
+ // Peer configuration provided in transport parameters.
+ peerInitialMaxStreamDataRemote [streamTypeCount]int64 // streams opened by us
+ peerInitialMaxStreamDataBidiLocal int64 // streams opened by them
+
// Streams with frames to send are stored in a circular linked list.
// sendHead is the next stream to write, or nil if there are no streams
// with data to send. sendTail is the last stream to write.
@@ -55,15 +59,24 @@
return c.newLocalStream(ctx, uniStream)
}
-func (c *Conn) newLocalStream(ctx context.Context, typ streamType) (*Stream, error) {
+func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, error) {
// TODO: Stream limits.
c.streams.streamsMu.Lock()
defer c.streams.streamsMu.Unlock()
- num := c.streams.opened[typ]
- c.streams.opened[typ]++
+ num := c.streams.opened[styp]
+ c.streams.opened[styp]++
- s := newStream(c, newStreamID(c.side, typ, num))
+ s := newStream(c, newStreamID(c.side, styp, num))
+ s.outmaxbuf = c.config.streamWriteBufferSize()
+ s.outwin = c.streams.peerInitialMaxStreamDataRemote[styp]
+ if styp == bidiStream {
+ s.inmaxbuf = c.config.streamReadBufferSize()
+ s.inwin = c.config.streamReadBufferSize()
+ }
+ s.inUnlock()
+ s.outUnlock()
+
c.streams.streams[s.id] = s
return s, nil
}
@@ -117,7 +130,17 @@
c.abort(now, localTransportError(errStreamState))
return nil
}
+
s := newStream(c, id)
+ s.inmaxbuf = c.config.streamReadBufferSize()
+ s.inwin = c.config.streamReadBufferSize()
+ if id.streamType() == bidiStream {
+ s.outmaxbuf = c.config.streamWriteBufferSize()
+ s.outwin = c.streams.peerInitialMaxStreamDataBidiLocal
+ }
+ s.inUnlock()
+ s.outUnlock()
+
c.streams.streams[id] = s
c.streams.queue.put(s)
return s
diff --git a/internal/quic/conn_test.go b/internal/quic/conn_test.go
index 5aad69f..2480f9c 100644
--- a/internal/quic/conn_test.go
+++ b/internal/quic/conn_test.go
@@ -179,6 +179,8 @@
peerProvidedParams := defaultTransportParameters()
for _, o := range opts {
switch o := o.(type) {
+ case func(*Config):
+ o(config)
case func(*tls.Config):
o(config.TLSConfig)
case func(p *transportParameters):
diff --git a/internal/quic/crypto_stream.go b/internal/quic/crypto_stream.go
index 6cda657..75dea87 100644
--- a/internal/quic/crypto_stream.go
+++ b/internal/quic/crypto_stream.go
@@ -118,28 +118,7 @@
// copy the data it wants into position.
func (s *cryptoStream) dataToSend(pto bool, f func(off, size int64) (sent int64)) {
for {
- var off, size int64
- if pto {
- // On PTO, resend unacked data that fits in the probe packet.
- // For simplicity, we send the range starting at s.out.start
- // (which is definitely unacked, or else we would have discarded it)
- // up to the next acked byte (if any).
- //
- // This may miss unacked data starting after that acked byte,
- // but avoids resending data the peer has acked.
- off = s.out.start
- end := s.out.end
- for _, r := range s.outacked {
- if r.start > off {
- end = r.start
- break
- }
- }
- size = end - s.out.start
- } else if s.outunsent.numRanges() > 0 {
- off = s.outunsent.min()
- size = s.outunsent[0].size()
- }
+ off, size := dataToSend(s.out, s.outunsent, s.outacked, pto)
if size == 0 {
return
}
diff --git a/internal/quic/gate.go b/internal/quic/gate.go
index efb28da..27ab07a 100644
--- a/internal/quic/gate.go
+++ b/internal/quic/gate.go
@@ -20,13 +20,19 @@
unset chan struct{}
}
+// newGate returns a new, unlocked gate with the condition unset.
func newGate() gate {
- g := gate{
+ g := newLockedGate()
+ g.unlock(false)
+ return g
+}
+
+// newLocked gate returns a new, locked gate.
+func newLockedGate() gate {
+ return gate{
set: make(chan struct{}, 1),
unset: make(chan struct{}, 1),
}
- g.unset <- struct{}{}
- return g
}
// lock acquires the gate unconditionally.
diff --git a/internal/quic/quic_test.go b/internal/quic/quic_test.go
new file mode 100644
index 0000000..1281b54
--- /dev/null
+++ b/internal/quic/quic_test.go
@@ -0,0 +1,37 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.21
+
+package quic
+
+import (
+ "testing"
+)
+
+func testSides(t *testing.T, name string, f func(*testing.T, connSide)) {
+ if name != "" {
+ name += "/"
+ }
+ t.Run(name+"server", func(t *testing.T) { f(t, serverSide) })
+ t.Run(name+"client", func(t *testing.T) { f(t, clientSide) })
+}
+
+func testStreamTypes(t *testing.T, name string, f func(*testing.T, streamType)) {
+ if name != "" {
+ name += "/"
+ }
+ t.Run(name+"bidi", func(t *testing.T) { f(t, bidiStream) })
+ t.Run(name+"uni", func(t *testing.T) { f(t, uniStream) })
+}
+
+func testSidesAndStreamTypes(t *testing.T, name string, f func(*testing.T, connSide, streamType)) {
+ if name != "" {
+ name += "/"
+ }
+ t.Run(name+"server/bidi", func(t *testing.T) { f(t, serverSide, bidiStream) })
+ t.Run(name+"client/bidi", func(t *testing.T) { f(t, clientSide, bidiStream) })
+ t.Run(name+"server/uni", func(t *testing.T) { f(t, serverSide, uniStream) })
+ t.Run(name+"client/uni", func(t *testing.T) { f(t, clientSide, uniStream) })
+}
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index b55f927..83215df 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -9,34 +9,57 @@
import (
"context"
"errors"
+ "io"
)
type Stream struct {
id streamID
conn *Conn
+ // ingate's lock guards all receive-related state.
+ //
+ // The gate condition is set if a read from the stream will not block,
+ // either because the stream has available data or because the read will fail.
+ ingate gate
+ in pipe // received data
+ inwin int64 // last MAX_STREAM_DATA sent to the peer
+ insendmax sentVal // set when we should send MAX_STREAM_DATA to the peer
+ inmaxbuf int64 // maximum amount of data we will buffer
+ insize int64 // stream final size; -1 before this is known
+ inset rangeset[int64] // received ranges
+
// outgate's lock guards all send-related state.
//
// The gate condition is set if a write to the stream will not block,
// either because the stream has available flow control or because
// the write will fail.
- outgate gate
- outopened sentVal // set if we should open the stream
+ outgate gate
+ out pipe // buffered data to send
+ outwin int64 // maximum MAX_STREAM_DATA received from the peer
+ outmaxbuf int64 // maximum amount of data we will buffer
+ outunsent rangeset[int64] // ranges buffered but not yet sent
+ outacked rangeset[int64] // ranges sent and acknowledged
+ outopened sentVal // set if we should open the stream
+ outclosed sentVal // set by CloseWrite
+ outblocked sentVal // set when a write to the stream is blocked by flow control
prev, next *Stream // guarded by streamsState.sendMu
}
+// newStream returns a new stream.
+//
+// The stream's ingate and outgate are locked.
+// (We create the stream with locked gates so after the caller
+// initializes the flow control window,
+// unlocking outgate will set the stream writability state.)
func newStream(c *Conn, id streamID) *Stream {
s := &Stream{
conn: c,
id: id,
- outgate: newGate(),
+ insize: -1, // -1 indicates the stream size is unknown
+ ingate: newLockedGate(),
+ outgate: newLockedGate(),
}
-
- // Lock and unlock outgate to update the stream writability state.
- s.outgate.lock()
- s.outUnlock()
-
return s
}
@@ -66,8 +89,48 @@
// returning all data sent by the peer.
// If the peer terminates reads abruptly, ReadContext returns StreamResetError.
func (s *Stream) ReadContext(ctx context.Context, b []byte) (n int, err error) {
- // TODO: implement
- return 0, errors.New("unimplemented")
+ if s.IsWriteOnly() {
+ return 0, errors.New("read from write-only stream")
+ }
+ // Wait until data is available.
+ if err := s.conn.waitAndLockGate(ctx, &s.ingate); err != nil {
+ return 0, err
+ }
+ defer s.inUnlock()
+ if s.insize == s.in.start {
+ return 0, io.EOF
+ }
+ // Getting here indicates the stream contains data to be read.
+ if len(s.inset) < 1 || s.inset[0].start != 0 || s.inset[0].end <= s.in.start {
+ panic("BUG: inconsistent input stream state")
+ }
+ if size := int(s.inset[0].end - s.in.start); size < len(b) {
+ b = b[:size]
+ }
+ start := s.in.start
+ end := start + int64(len(b))
+ s.in.copy(start, b)
+ s.in.discardBefore(end)
+ if s.insize == -1 || s.insize > s.inwin {
+ if shouldUpdateFlowControl(s.inwin-s.in.start, s.inmaxbuf) {
+ // Update stream flow control with a STREAM_MAX_DATA frame.
+ s.insendmax.setUnsent()
+ }
+ }
+ if end == s.insize {
+ return len(b), io.EOF
+ }
+ return len(b), nil
+}
+
+// shouldUpdateFlowControl determines whether to send a flow control window update.
+//
+// We want to balance keeping the peer well-supplied with flow control with not sending
+// many small updates.
+func shouldUpdateFlowControl(curwin, maxwin int64) bool {
+ // Update flow control if doing so gives the peer at least 64k tokens,
+ // or if it will double the current window.
+ return maxwin-curwin >= 64<<10 || curwin*2 < maxwin
}
// Write writes data to the stream.
@@ -87,65 +150,330 @@
if s.IsReadOnly() {
return 0, errors.New("write to read-only stream")
}
- if len(b) > 0 {
- // TODO: implement
- return 0, errors.New("unimplemented")
+ canWrite := s.outgate.lock()
+ if s.outclosed.isSet() {
+ s.outUnlock()
+ return 0, errors.New("write to closed stream")
}
- if err := s.outgate.waitAndLockContext(ctx); err != nil {
- return 0, err
+ if len(b) == 0 {
+ // We aren't writing any data, but send a STREAM frame to open the stream
+ // if we haven't done so already.
+ s.outopened.set()
}
- defer s.outUnlock()
-
- // Set outopened to send a STREAM frame with no data,
- // opening the stream on the peer.
- s.outopened.set()
-
+ for len(b) > 0 {
+ // The first time through this loop, we may or may not be write blocked.
+ // We exit the loop after writing all data, so on subsequent passes through
+ // the loop we are always write blocked.
+ if !canWrite {
+ // We're blocked, either by flow control or by our own buffer limit.
+ // We either need the peer to extend our flow control window,
+ // or ack some of our outstanding packets.
+ if s.out.end == s.outwin {
+ // We're blocked by flow control.
+ // Send a STREAM_DATA_BLOCKED frame to let the peer know.
+ s.outblocked.setUnsent()
+ }
+ s.outUnlock()
+ if err := s.conn.waitAndLockGate(ctx, &s.outgate); err != nil {
+ return n, err
+ }
+ // Successfully returning from waitAndLockGate means we are no longer
+ // write blocked. (Unlike traditional condition variables, gates do not
+ // have spurious wakeups.)
+ }
+ s.outblocked.clear()
+ // Write limit is min(our own buffer limit, the peer-provided flow control window).
+ // This is a stream offset.
+ lim := min(s.out.start+s.outmaxbuf, s.outwin)
+ // Amount to write is min(the full buffer, data up to the write limit).
+ // This is a number of bytes.
+ nn := min(int64(len(b)), lim-s.out.end)
+ // Copy the data into the output buffer and mark it as unsent.
+ s.outunsent.add(s.out.end, s.out.end+nn)
+ s.out.writeAt(b[:nn], s.out.end)
+ s.outopened.set()
+ b = b[nn:]
+ n += int(nn)
+ // If we have bytes left to send, we're blocked.
+ canWrite = false
+ }
+ s.outUnlock()
return n, nil
}
+// Close closes the stream.
+// See CloseContext for more details.
+func (s *Stream) Close() error {
+ return s.CloseContext(context.Background())
+}
+
+// CloseContext closes the stream.
+// Any blocked stream operations will be unblocked and return errors.
+//
+// CloseContext flushes any data in the stream write buffer and waits for the peer to
+// acknowledge receipt of the data.
+// If the stream has been reset, it waits for the peer to acknowledge the reset.
+// If the context expires before the peer receives the stream's data,
+// CloseContext discards the buffer and returns the context error.
+func (s *Stream) CloseContext(ctx context.Context) error {
+ s.CloseRead()
+ s.CloseWrite()
+ // TODO: wait for peer to acknowledge data
+ // TODO: Return code from peer's RESET_STREAM frame?
+ return nil
+}
+
+// CloseRead aborts reads on the stream.
+// Any blocked reads will be unblocked and return errors.
+//
+// CloseRead notifies the peer that the stream has been closed for reading.
+// It does not wait for the peer to acknowledge the closure.
+// Use CloseContext to wait for the peer's acknowledgement.
+func (s *Stream) CloseRead() {
+ if s.IsWriteOnly() {
+ return
+ }
+ // TODO: support read-closing streams with a STOP_SENDING frame
+}
+
+// CloseWrite aborts writes on the stream.
+// Any blocked writes will be unblocked and return errors.
+//
+// CloseWrite sends any data in the stream write buffer to the peer.
+// It does not wait for the peer to acknowledge receipt of the data.
+// Use CloseContext to wait for the peer's acknowledgement.
+func (s *Stream) CloseWrite() {
+ if s.IsReadOnly() {
+ return
+ }
+ s.outgate.lock()
+ defer s.outUnlock()
+ s.outclosed.set()
+}
+
+// inUnlock unlocks s.ingate.
+// It sets the gate condition if reads from s will not block.
+// If s has receive-related frames to write, it notifies the Conn.
+func (s *Stream) inUnlock() {
+ if s.inUnlockNoQueue() {
+ s.conn.queueStreamForSend(s)
+ }
+}
+
+// inUnlockNoQueue is inUnlock,
+// but reports whether s has frames to write rather than notifying the Conn.
+func (s *Stream) inUnlockNoQueue() (shouldSend bool) {
+ // TODO: STOP_SENDING
+ canRead := s.inset.contains(s.in.start) || // data available to read
+ s.insize == s.in.start // at EOF
+ s.ingate.unlock(canRead)
+ return s.insendmax.shouldSend() // STREAM_MAX_DATA
+}
+
// outUnlock unlocks s.outgate.
// It sets the gate condition if writes to s will not block.
-// If s has frames to write, it notifies the Conn.
+// If s has send-related frames to write, it notifies the Conn.
func (s *Stream) outUnlock() {
- if s.outopened.shouldSend() {
+ if s.outUnlockNoQueue() {
s.conn.queueStreamForSend(s)
}
- canSend := true // TODO: set sendability status based on flow control
- s.outgate.unlock(canSend)
+}
+
+// outUnlockNoQueue is outUnlock,
+// but reports whether s has frames to write rather than notifying the Conn.
+func (s *Stream) outUnlockNoQueue() (shouldSend bool) {
+ lim := min(s.out.start+s.outmaxbuf, s.outwin)
+ canWrite := lim > s.out.end || // available flow control
+ s.outclosed.isSet() // closed
+ s.outgate.unlock(canWrite)
+ return len(s.outunsent) > 0 || // STREAM frame with data
+ s.outclosed.shouldSend() || // STREAM frame with FIN bit
+ s.outopened.shouldSend() || // STREAM frame with no data
+ s.outblocked.shouldSend() // STREAM_DATA_BLOCKED
}
// handleData handles data received in a STREAM frame.
func (s *Stream) handleData(off int64, b []byte, fin bool) error {
- // TODO
+ s.ingate.lock()
+ defer s.inUnlock()
+ end := off + int64(len(b))
+ if end > s.inwin {
+ // The peer sent us data past the maximum flow control window we gave them.
+ return localTransportError(errFlowControl)
+ }
+ if s.insize != -1 && end > s.insize {
+ // The peer sent us data past the final size of the stream they previously gave us.
+ return localTransportError(errFinalSize)
+ }
+ s.in.writeAt(b, off)
+ s.inset.add(off, end)
+ if fin {
+ if s.insize != -1 && s.insize != end {
+ // The peer changed the final size of the stream.
+ return localTransportError(errFinalSize)
+ }
+ s.insize = end
+ // The peer has enough flow control window to send the entire stream.
+ s.insendmax.clear()
+ }
return nil
}
+// handleMaxStreamData handles an update received in a MAX_STREAM_DATA frame.
+func (s *Stream) handleMaxStreamData(maxStreamData int64) error {
+ s.outgate.lock()
+ defer s.outUnlock()
+ s.outwin = max(maxStreamData, s.outwin)
+ return nil
+}
+
+// ackOrLoss handles the fate of stream frames other than STREAM.
+func (s *Stream) ackOrLoss(pnum packetNumber, ftype byte, fate packetFate) {
+ // Frames which carry new information each time they are sent
+ // (MAX_STREAM_DATA, STREAM_DATA_BLOCKED) must only be marked
+ // as received if the most recent packet carrying this frame is acked.
+ //
+ // Frames which are always the same (STOP_SENDING, RESET_STREAM)
+ // can be marked as received if any packet carrying this frame is acked.
+ switch ftype {
+ case frameTypeMaxStreamData:
+ s.ingate.lock()
+ s.insendmax.ackLatestOrLoss(pnum, fate)
+ s.inUnlock()
+ case frameTypeStreamDataBlocked:
+ s.outgate.lock()
+ s.outblocked.ackLatestOrLoss(pnum, fate)
+ s.outUnlock()
+ default:
+ // TODO: Handle STOP_SENDING, RESET_STREAM.
+ panic("unhandled frame type")
+ }
+}
+
// ackOrLossData handles the fate of a STREAM frame.
func (s *Stream) ackOrLossData(pnum packetNumber, start, end int64, fin bool, fate packetFate) {
s.outgate.lock()
defer s.outUnlock()
s.outopened.ackOrLoss(pnum, fate)
+ if fin {
+ s.outclosed.ackOrLoss(pnum, fate)
+ }
+ switch fate {
+ case packetAcked:
+ s.outacked.add(start, end)
+ s.outunsent.sub(start, end)
+ // If this ack is for data at the start of the send buffer, we can now discard it.
+ if s.outacked.contains(s.out.start) {
+ s.out.discardBefore(s.outacked[0].end)
+ }
+ case packetLost:
+ // Mark everything lost, but not previously acked, as needing retransmission.
+ // We do this by adding all the lost bytes to outunsent, and then
+ // removing everything already acked.
+ s.outunsent.add(start, end)
+ for _, a := range s.outacked {
+ s.outunsent.sub(a.start, a.end)
+ }
+ }
}
+// appendInFrames appends STOP_SENDING and MAX_STREAM_DATA frames
+// to the current packet.
+//
+// It returns true if no more frames need appending,
+// false if not everything fit in the current packet.
func (s *Stream) appendInFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
+ s.ingate.lock()
+ defer s.inUnlockNoQueue()
// TODO: STOP_SENDING
- // TODO: MAX_STREAM_DATA
+ if s.insendmax.shouldSendPTO(pto) {
+ // MAX_STREAM_DATA
+ maxStreamData := s.in.start + s.inmaxbuf
+ if !w.appendMaxStreamDataFrame(s.id, maxStreamData) {
+ return false
+ }
+ s.inwin = maxStreamData
+ s.insendmax.setSent(pnum)
+ }
return true
}
+// appendOutFrames appends RESET_STREAM, STREAM_DATA_BLOCKED, and STREAM frames
+// to the current packet.
+//
+// It returns true if no more frames need appending,
+// false if not everything fit in the current packet.
func (s *Stream) appendOutFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
+ s.outgate.lock()
+ defer s.outUnlockNoQueue()
// TODO: RESET_STREAM
- // TODO: STREAM_DATA_BLOCKED
- // TODO: STREAM frames with data
- if s.outopened.shouldSendPTO(pto) {
- off := int64(0)
- size := 0
- fin := false
- _, added := w.appendStreamFrame(s.id, off, size, fin)
+ if s.outblocked.shouldSendPTO(pto) {
+ // STREAM_DATA_BLOCKED
+ if !w.appendStreamDataBlockedFrame(s.id, s.out.end) {
+ return false
+ }
+ s.outblocked.setSent(pnum)
+ s.frameOpensStream(pnum)
+ }
+ // STREAM
+ for {
+ off, size := dataToSend(s.out, s.outunsent, s.outacked, pto)
+ fin := s.outclosed.isSet() && off+size == s.out.end
+ shouldSend := size > 0 || // have data to send
+ s.outopened.shouldSendPTO(pto) || // should open the stream
+ (fin && s.outclosed.shouldSendPTO(pto)) // should close the stream
+ if !shouldSend {
+ return true
+ }
+ b, added := w.appendStreamFrame(s.id, off, int(size), fin)
if !added {
return false
}
+ s.out.copy(off, b)
+ s.outunsent.sub(off, off+int64(len(b)))
+ s.frameOpensStream(pnum)
+ if fin {
+ s.outclosed.setSent(pnum)
+ }
+ if pto {
+ return true
+ }
+ if int64(len(b)) < size {
+ return false
+ }
+ }
+}
+
+// frameOpensStream records that we're sending a frame that will open the stream.
+//
+// If we don't have an acknowledgement from the peer for a previous frame opening the stream,
+// record this packet as being the latest one to open it.
+func (s *Stream) frameOpensStream(pnum packetNumber) {
+ if !s.outopened.isReceived() {
s.outopened.setSent(pnum)
}
- return true
+}
+
+// dataToSend returns the next range of data to send in a STREAM or CRYPTO_STREAM.
+func dataToSend(out pipe, outunsent, outacked rangeset[int64], pto bool) (start, size int64) {
+ switch {
+ case pto:
+ // On PTO, resend unacked data that fits in the probe packet.
+ // For simplicity, we send the range starting at s.out.start
+ // (which is definitely unacked, or else we would have discarded it)
+ // up to the next acked byte (if any).
+ //
+ // This may miss unacked data starting after that acked byte,
+ // but avoids resending data the peer has acked.
+ for _, r := range outacked {
+ if r.start > out.start {
+ return out.start, r.start - out.start
+ }
+ }
+ return out.start, out.end - out.start
+ case outunsent.numRanges() > 0:
+ return outunsent.min(), outunsent[0].size()
+ default:
+ return out.end, 0
+ }
}
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index 8ae9dbc..d158e72 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -7,10 +7,703 @@
package quic
import (
+ "bytes"
+ "context"
+ "crypto/rand"
+ "fmt"
+ "io"
"reflect"
+ "strings"
"testing"
)
+func TestStreamWriteBlockedByStreamFlowControl(t *testing.T) {
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ ctx := canceledContext()
+ want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+ tc := newTestConn(t, clientSide, func(p *transportParameters) {
+ p.initialMaxStreamsBidi = 100
+ p.initialMaxStreamsUni = 100
+ p.initialMaxData = 1 << 20
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+
+ // Non-blocking write with no flow control.
+ s, err := tc.conn.newLocalStream(ctx, styp)
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = s.WriteContext(ctx, want)
+ if err != context.Canceled {
+ t.Fatalf("write to stream with no flow control: err = %v, want context.Canceled", err)
+ }
+ tc.wantFrame("write blocked by flow control triggers a STREAM_DATA_BLOCKED frame",
+ packetType1RTT, debugFrameStreamDataBlocked{
+ id: s.id,
+ max: 0,
+ })
+
+ // Blocking write waiting for flow control.
+ w := runAsync(tc, func(ctx context.Context) (int, error) {
+ return s.WriteContext(ctx, want)
+ })
+ tc.wantFrame("second blocked write triggers another STREAM_DATA_BLOCKED",
+ packetType1RTT, debugFrameStreamDataBlocked{
+ id: s.id,
+ max: 0,
+ })
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 4,
+ })
+ tc.wantFrame("stream window extended, but still more data to write",
+ packetType1RTT, debugFrameStreamDataBlocked{
+ id: s.id,
+ max: 4,
+ })
+ tc.wantFrame("stream window extended to 4, expect blocked write to progress",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: want[:4],
+ })
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: int64(len(want)),
+ })
+ tc.wantFrame("stream window extended further, expect blocked write to finish",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 4,
+ data: want[4:],
+ })
+ n, err := w.result()
+ if n != len(want) || err != nil {
+ t.Errorf("Write() = %v, %v; want %v, nil", n, err, len(want))
+ }
+ })
+}
+
+func TestStreamIgnoresMaxStreamDataReduction(t *testing.T) {
+ // "A sender MUST ignore any MAX_STREAM_DATA [...] frames that
+ // do not increase flow control limits."
+ // https://www.rfc-editor.org/rfc/rfc9000#section-4.1-9
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ ctx := canceledContext()
+ want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+ tc := newTestConn(t, clientSide, func(p *transportParameters) {
+ if styp == uniStream {
+ p.initialMaxStreamsUni = 1
+ p.initialMaxStreamDataUni = 4
+ } else {
+ p.initialMaxStreamsBidi = 1
+ p.initialMaxStreamDataBidiRemote = 4
+ }
+ p.initialMaxData = 1 << 20
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+ tc.ignoreFrame(frameTypeStreamDataBlocked)
+
+ // Write [0,1).
+ s, err := tc.conn.newLocalStream(ctx, styp)
+ if err != nil {
+ t.Fatal(err)
+ }
+ s.WriteContext(ctx, want[:1])
+ tc.wantFrame("sent data (1 byte) fits within flow control limit",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: want[:1],
+ })
+
+ // MAX_STREAM_DATA tries to decrease limit, and is ignored.
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 2,
+ })
+
+ // Write [1,4).
+ s.WriteContext(ctx, want[1:])
+ tc.wantFrame("stream limit is 4 bytes, ignoring decrease in MAX_STREAM_DATA",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 1,
+ data: want[1:4],
+ })
+
+ // MAX_STREAM_DATA increases limit.
+ // Second MAX_STREAM_DATA decreases it, and is ignored.
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 8,
+ })
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 6,
+ })
+
+ // Write [1,4).
+ s.WriteContext(ctx, want[4:])
+ tc.wantFrame("stream limit is 8 bytes, ignoring decrease in MAX_STREAM_DATA",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 4,
+ data: want[4:8],
+ })
+ })
+}
+
+func TestStreamWriteBlockedByWriteBufferLimit(t *testing.T) {
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ ctx := canceledContext()
+ want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+ const maxWriteBuffer = 4
+ tc := newTestConn(t, clientSide, func(p *transportParameters) {
+ p.initialMaxStreamsBidi = 100
+ p.initialMaxStreamsUni = 100
+ p.initialMaxData = 1 << 20
+ p.initialMaxStreamDataBidiRemote = 1 << 20
+ p.initialMaxStreamDataUni = 1 << 20
+ }, func(c *Config) {
+ c.StreamWriteBufferSize = maxWriteBuffer
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+
+ // Write more data than StreamWriteBufferSize.
+ // The peer has given us plenty of flow control,
+ // so we're just blocked by our local limit.
+ s, err := tc.conn.newLocalStream(ctx, styp)
+ if err != nil {
+ t.Fatal(err)
+ }
+ w := runAsync(tc, func(ctx context.Context) (int, error) {
+ return s.WriteContext(ctx, want)
+ })
+ tc.wantFrame("stream write should send as much data as write buffer allows",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ data: want[:maxWriteBuffer],
+ })
+ tc.wantIdle("no STREAM_DATA_BLOCKED, we're blocked locally not by flow control")
+
+ // ACK for previously-sent data allows making more progress.
+ tc.writeFrames(packetType1RTT, debugFrameAck{
+ ranges: []i64range[packetNumber]{{0, tc.sentFramePacket.num + 1}},
+ })
+ tc.wantFrame("ACK for previous data allows making progress",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: maxWriteBuffer,
+ data: want[maxWriteBuffer:][:maxWriteBuffer],
+ })
+
+ // Cancel the write with data left to send.
+ w.cancel()
+ n, err := w.result()
+ if n != 2*maxWriteBuffer || err == nil {
+ t.Fatalf("WriteContext() = %v, %v; want %v bytes, error", n, err, 2*maxWriteBuffer)
+ }
+ })
+}
+
+func TestStreamReceive(t *testing.T) {
+ // "Endpoints MUST be able to deliver stream data to an application as
+ // an ordered byte stream."
+ // https://www.rfc-editor.org/rfc/rfc9000#section-2.2-2
+ want := make([]byte, 5000)
+ for i := range want {
+ want[i] = byte(i)
+ }
+ type frame struct {
+ start int64
+ end int64
+ fin bool
+ want int
+ wantEOF bool
+ }
+ for _, test := range []struct {
+ name string
+ frames []frame
+ }{{
+ name: "linear",
+ frames: []frame{{
+ start: 0,
+ end: 1000,
+ want: 1000,
+ }, {
+ start: 1000,
+ end: 2000,
+ want: 2000,
+ }, {
+ start: 2000,
+ end: 3000,
+ want: 3000,
+ fin: true,
+ wantEOF: true,
+ }},
+ }, {
+ name: "out of order",
+ frames: []frame{{
+ start: 1000,
+ end: 2000,
+ }, {
+ start: 2000,
+ end: 3000,
+ }, {
+ start: 0,
+ end: 1000,
+ want: 3000,
+ }},
+ }, {
+ name: "resent",
+ frames: []frame{{
+ start: 0,
+ end: 1000,
+ want: 1000,
+ }, {
+ start: 0,
+ end: 1000,
+ want: 1000,
+ }, {
+ start: 1000,
+ end: 2000,
+ want: 2000,
+ }, {
+ start: 0,
+ end: 1000,
+ want: 2000,
+ }, {
+ start: 1000,
+ end: 2000,
+ want: 2000,
+ }},
+ }, {
+ name: "overlapping",
+ frames: []frame{{
+ start: 0,
+ end: 1000,
+ want: 1000,
+ }, {
+ start: 3000,
+ end: 4000,
+ want: 1000,
+ }, {
+ start: 2000,
+ end: 3000,
+ want: 1000,
+ }, {
+ start: 1000,
+ end: 3000,
+ want: 4000,
+ }},
+ }, {
+ name: "early eof",
+ frames: []frame{{
+ start: 3000,
+ end: 3000,
+ fin: true,
+ want: 0,
+ }, {
+ start: 1000,
+ end: 2000,
+ want: 0,
+ }, {
+ start: 0,
+ end: 1000,
+ want: 2000,
+ }, {
+ start: 2000,
+ end: 3000,
+ want: 3000,
+ wantEOF: true,
+ }},
+ }, {
+ name: "empty eof",
+ frames: []frame{{
+ start: 0,
+ end: 1000,
+ want: 1000,
+ }, {
+ start: 1000,
+ end: 1000,
+ fin: true,
+ want: 1000,
+ wantEOF: true,
+ }},
+ }} {
+ testStreamTypes(t, test.name, func(t *testing.T, styp streamType) {
+ ctx := canceledContext()
+ tc := newTestConn(t, serverSide)
+ tc.handshake()
+ sid := newStreamID(clientSide, styp, 0)
+ var s *Stream
+ got := make([]byte, len(want))
+ var total int
+ for _, f := range test.frames {
+ t.Logf("receive [%v,%v)", f.start, f.end)
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: f.start,
+ data: want[f.start:f.end],
+ fin: f.fin,
+ })
+ if s == nil {
+ var err error
+ s, err = tc.conn.AcceptStream(ctx)
+ if err != nil {
+ tc.t.Fatalf("conn.AcceptStream() = %v", err)
+ }
+ }
+ for {
+ n, err := s.ReadContext(ctx, got[total:])
+ t.Logf("s.ReadContext() = %v, %v", n, err)
+ total += n
+ if f.wantEOF && err != io.EOF {
+ t.Fatalf("ReadContext() error = %v; want io.EOF", err)
+ }
+ if !f.wantEOF && err == io.EOF {
+ t.Fatalf("ReadContext() error = io.EOF, want something else")
+ }
+ if err != nil {
+ break
+ }
+ }
+ if total != f.want {
+ t.Fatalf("total bytes read = %v, want %v", total, f.want)
+ }
+ for i := 0; i < total; i++ {
+ if got[i] != want[i] {
+ t.Fatalf("byte %v differs: got %v, want %v", i, got[i], want[i])
+ }
+ }
+ }
+ })
+ }
+
+}
+
+func TestStreamReceiveExtendsStreamWindow(t *testing.T) {
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ const maxWindowSize = 20
+ ctx := canceledContext()
+ tc := newTestConn(t, serverSide, func(c *Config) {
+ c.StreamReadBufferSize = maxWindowSize
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+ sid := newStreamID(clientSide, styp, 0)
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: 0,
+ data: make([]byte, maxWindowSize),
+ })
+ s, err := tc.conn.AcceptStream(ctx)
+ if err != nil {
+ t.Fatalf("AcceptStream: %v", err)
+ }
+ tc.wantIdle("stream window is not extended before data is read")
+ buf := make([]byte, maxWindowSize+1)
+ if n, err := s.ReadContext(ctx, buf); n != maxWindowSize || err != nil {
+ t.Fatalf("s.ReadContext() = %v, %v; want %v, nil", n, err, maxWindowSize)
+ }
+ tc.wantFrame("stream window is extended after reading data",
+ packetType1RTT, debugFrameMaxStreamData{
+ id: sid,
+ max: maxWindowSize * 2,
+ })
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: maxWindowSize,
+ data: make([]byte, maxWindowSize),
+ fin: true,
+ })
+ if n, err := s.ReadContext(ctx, buf); n != maxWindowSize || err != io.EOF {
+ t.Fatalf("s.ReadContext() = %v, %v; want %v, io.EOF", n, err, maxWindowSize)
+ }
+ tc.wantIdle("stream window is not extended after FIN")
+ })
+}
+
+func TestStreamReceiveViolatesStreamDataLimit(t *testing.T) {
+ // "A receiver MUST close the connection with an error of type FLOW_CONTROL_ERROR if
+ // the sender violates the advertised [...] stream data limits [...]"
+ // https://www.rfc-editor.org/rfc/rfc9000#section-4.1-8
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ const maxStreamData = 10
+ for _, test := range []struct {
+ off int64
+ size int64
+ }{{
+ off: maxStreamData,
+ size: 1,
+ }, {
+ off: 0,
+ size: maxStreamData + 1,
+ }, {
+ off: maxStreamData - 1,
+ size: 2,
+ }} {
+ tc := newTestConn(t, serverSide, func(c *Config) {
+ c.StreamReadBufferSize = maxStreamData
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, styp, 0),
+ off: test.off,
+ data: make([]byte, test.size),
+ })
+ tc.wantFrame(
+ fmt.Sprintf("data [%v,%v) violates stream data limit and closes connection",
+ test.off, test.off+test.size),
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: errFlowControl,
+ },
+ )
+ }
+ })
+}
+
+func TestStreamReceiveDuplicateDataDoesNotViolateLimits(t *testing.T) {
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ const maxData = 10
+ tc := newTestConn(t, serverSide, func(c *Config) {
+ // TODO: Add connection-level maximum data here as well.
+ c.StreamReadBufferSize = maxData
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+ for i := 0; i < 3; i++ {
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, styp, 0),
+ off: 0,
+ data: make([]byte, maxData),
+ })
+ tc.wantIdle(fmt.Sprintf("conn sends no frames after receiving data frame %v", i))
+ }
+ })
+}
+
+func TestStreamFinalSizeChangedByStreamFrame(t *testing.T) {
+ // "If a [...] STREAM frame is received indicating a change
+ // in the final size for the stream, an endpoint SHOULD
+ // respond with an error of type FINAL_SIZE_ERROR [...]"
+ // https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ tc := newTestConn(t, serverSide)
+ tc.handshake()
+ sid := newStreamID(clientSide, styp, 0)
+
+ const write1size = 4
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: 10,
+ fin: true,
+ })
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: 9,
+ fin: true,
+ })
+ tc.wantFrame("change in final size of stream is an error",
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: errFinalSize,
+ },
+ )
+ })
+}
+
+func TestStreamDataBeyondFinalSize(t *testing.T) {
+ // "A receiver SHOULD treat receipt of data at or beyond
+ // the final size as an error of type FINAL_SIZE_ERROR [...]"
+ // https://www.rfc-editor.org/rfc/rfc9000#section-4.5-5
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ tc := newTestConn(t, serverSide)
+ tc.handshake()
+ sid := newStreamID(clientSide, styp, 0)
+
+ const write1size = 4
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: 0,
+ data: make([]byte, 16),
+ fin: true,
+ })
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: 16,
+ data: []byte{0},
+ })
+ tc.wantFrame("received data past final size of stream",
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: errFinalSize,
+ },
+ )
+ })
+}
+
+func TestStreamReceiveUnblocksReader(t *testing.T) {
+ testStreamTypes(t, "", func(t *testing.T, styp streamType) {
+ tc := newTestConn(t, serverSide)
+ tc.handshake()
+ want := []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}
+ sid := newStreamID(clientSide, styp, 0)
+
+ // AcceptStream blocks until a STREAM frame is received.
+ accept := runAsync(tc, func(ctx context.Context) (*Stream, error) {
+ return tc.conn.AcceptStream(ctx)
+ })
+ const write1size = 4
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: 0,
+ data: want[:write1size],
+ })
+ s, err := accept.result()
+ if err != nil {
+ t.Fatalf("AcceptStream() = %v", err)
+ }
+
+ // ReadContext succeeds immediately, since we already have data.
+ got := make([]byte, len(want))
+ read := runAsync(tc, func(ctx context.Context) (int, error) {
+ return s.ReadContext(ctx, got)
+ })
+ if n, err := read.result(); n != write1size || err != nil {
+ t.Fatalf("ReadContext = %v, %v; want %v, nil", n, err, write1size)
+ }
+
+ // ReadContext blocks waiting for more data.
+ read = runAsync(tc, func(ctx context.Context) (int, error) {
+ return s.ReadContext(ctx, got[write1size:])
+ })
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: sid,
+ off: write1size,
+ data: want[write1size:],
+ fin: true,
+ })
+ if n, err := read.result(); n != len(want)-write1size || err != io.EOF {
+ t.Fatalf("ReadContext = %v, %v; want %v, io.EOF", n, err, len(want)-write1size)
+ }
+ if !bytes.Equal(got, want) {
+ t.Fatalf("read bytes %x, want %x", got, want)
+ }
+ })
+}
+
+// testStreamSendFrameInvalidState calls the test func with a stream ID for:
+//
+// - a remote bidirectional stream that the peer has not created
+// - a remote unidirectional stream
+//
+// It then sends the returned frame (STREAM, STREAM_DATA_BLOCKED, etc.)
+// to the conn and expects a STREAM_STATE_ERROR.
+func testStreamSendFrameInvalidState(t *testing.T, f func(sid streamID) debugFrame) {
+ testSides(t, "stream_not_created", func(t *testing.T, side connSide) {
+ tc := newTestConn(t, side)
+ tc.handshake()
+ tc.writeFrames(packetType1RTT, f(newStreamID(side, bidiStream, 0)))
+ tc.wantFrame("frame for local stream which has not been created",
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: errStreamState,
+ })
+ })
+ testSides(t, "uni_stream", func(t *testing.T, side connSide) {
+ ctx := canceledContext()
+ tc := newTestConn(t, side)
+ tc.handshake()
+ sid := newStreamID(side, uniStream, 0)
+ s, err := tc.conn.NewSendOnlyStream(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ s.Write(nil) // open the stream
+ tc.wantFrame("new stream is opened",
+ packetType1RTT, debugFrameStream{
+ id: sid,
+ data: []byte{},
+ })
+ tc.writeFrames(packetType1RTT, f(sid))
+ tc.wantFrame("send-oriented frame for send-only stream",
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: errStreamState,
+ })
+ })
+}
+
+func TestStreamStreamFrameInvalidState(t *testing.T) {
+ // "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
+ // if it receives a STREAM frame for a locally initiated stream
+ // that has not yet been created, or for a send-only stream."
+ // https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
+ testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
+ return debugFrameStream{
+ id: sid,
+ }
+ })
+}
+
+func TestStreamDataBlockedInvalidState(t *testing.T) {
+ // "An endpoint MUST terminate the connection with error STREAM_STATE_ERROR
+ // if it receives a STREAM frame for a locally initiated stream
+ // that has not yet been created, or for a send-only stream."
+ // https://www.rfc-editor.org/rfc/rfc9000.html#section-19.8-3
+ testStreamSendFrameInvalidState(t, func(sid streamID) debugFrame {
+ return debugFrameStream{
+ id: sid,
+ }
+ })
+}
+
+// testStreamReceiveFrameInvalidState calls the test func with a stream ID for:
+//
+// - a remote bidirectional stream that the peer has not created
+// - a local unidirectional stream
+//
+// It then sends the returned frame (MAX_STREAM_DATA, STOP_SENDING, etc.)
+// to the conn and expects a STREAM_STATE_ERROR.
+func testStreamReceiveFrameInvalidState(t *testing.T, f func(sid streamID) debugFrame) {
+ testSides(t, "stream_not_created", func(t *testing.T, side connSide) {
+ tc := newTestConn(t, side)
+ tc.handshake()
+ tc.writeFrames(packetType1RTT, f(newStreamID(side, bidiStream, 0)))
+ tc.wantFrame("frame for local stream which has not been created",
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: errStreamState,
+ })
+ })
+ testSides(t, "uni_stream", func(t *testing.T, side connSide) {
+ tc := newTestConn(t, side)
+ tc.handshake()
+ tc.writeFrames(packetType1RTT, f(newStreamID(side.peer(), uniStream, 0)))
+ tc.wantFrame("receive-oriented frame for receive-only stream",
+ packetType1RTT, debugFrameConnectionCloseTransport{
+ code: errStreamState,
+ })
+ })
+}
+
+func TestStreamMaxStreamDataInvalidState(t *testing.T) {
+ // "Receiving a MAX_STREAM_DATA frame for a locally initiated stream
+ // that has not yet been created MUST be treated as a connection error
+ // of type STREAM_STATE_ERROR. An endpoint that receives a MAX_STREAM_DATA
+ // frame for a receive-only stream MUST terminate the connection
+ // with error STREAM_STATE_ERROR."
+ // https://www.rfc-editor.org/rfc/rfc9000#section-19.10-2
+ testStreamReceiveFrameInvalidState(t, func(sid streamID) debugFrame {
+ return debugFrameMaxStreamData{
+ id: sid,
+ max: 1000,
+ }
+ })
+}
+
func TestStreamOffsetTooLarge(t *testing.T) {
// "Receipt of a frame that exceeds [2^62-1] MUST be treated as a
// connection error of type FRAME_ENCODING_ERROR or FLOW_CONTROL_ERROR."
@@ -31,3 +724,104 @@
t.Fatalf("STREAM offset exceeds 2^62-1\ngot: %v\nwant: %v\n or: %v", got, want1, want2)
}
}
+
+func TestStreamReadFromWriteOnlyStream(t *testing.T) {
+ _, s := newTestConnAndLocalStream(t, serverSide, uniStream)
+ buf := make([]byte, 10)
+ wantErr := "read from write-only stream"
+ if n, err := s.Read(buf); err == nil || !strings.Contains(err.Error(), wantErr) {
+ t.Errorf("s.Read() = %v, %v; want error %q", n, err, wantErr)
+ }
+}
+
+func TestStreamWriteToReadOnlyStream(t *testing.T) {
+ _, s := newTestConnAndRemoteStream(t, serverSide, uniStream)
+ buf := make([]byte, 10)
+ wantErr := "write to read-only stream"
+ if n, err := s.Write(buf); err == nil || !strings.Contains(err.Error(), wantErr) {
+ t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
+ }
+}
+
+func TestStreamWriteToClosedStream(t *testing.T) {
+ tc, s := newTestConnAndLocalStream(t, serverSide, bidiStream, func(p *transportParameters) {
+ p.initialMaxStreamsBidi = 1
+ p.initialMaxData = 1 << 20
+ p.initialMaxStreamDataBidiRemote = 1 << 20
+ })
+ s.Close()
+ tc.wantFrame("stream is opened after being closed",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 0,
+ fin: true,
+ data: []byte{},
+ })
+ wantErr := "write to closed stream"
+ if n, err := s.Write([]byte{}); err == nil || !strings.Contains(err.Error(), wantErr) {
+ t.Errorf("s.Write() = %v, %v; want error %q", n, err, wantErr)
+ }
+}
+
+func TestStreamWriteMoreThanOnePacketOfData(t *testing.T) {
+ tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, func(p *transportParameters) {
+ p.initialMaxStreamsUni = 1
+ p.initialMaxData = 1 << 20
+ p.initialMaxStreamDataUni = 1 << 20
+ })
+ want := make([]byte, 4096)
+ rand.Read(want) // doesn't need to be crypto/rand, but non-deprecated and harmless
+ w := runAsync(tc, func(ctx context.Context) (int, error) {
+ return s.WriteContext(ctx, want)
+ })
+ got := make([]byte, 0, len(want))
+ for {
+ f, _ := tc.readFrame()
+ if f == nil {
+ break
+ }
+ sf, ok := f.(debugFrameStream)
+ if !ok {
+ t.Fatalf("unexpected frame: %v", sf)
+ }
+ if len(got) != int(sf.off) {
+ t.Fatalf("got frame: %v\nwant offset %v", sf, len(got))
+ }
+ got = append(got, sf.data...)
+ }
+ if n, err := w.result(); n != len(want) || err != nil {
+ t.Fatalf("s.WriteContext() = %v, %v; want %v, nil", n, err, len(want))
+ }
+ if !bytes.Equal(got, want) {
+ t.Fatalf("mismatch in received stream data")
+ }
+}
+
+func newTestConnAndLocalStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
+ t.Helper()
+ ctx := canceledContext()
+ tc := newTestConn(t, side, opts...)
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+ s, err := tc.conn.newLocalStream(ctx, styp)
+ if err != nil {
+ t.Fatalf("conn.newLocalStream(%v) = %v", styp, err)
+ }
+ return tc, s
+}
+
+func newTestConnAndRemoteStream(t *testing.T, side connSide, styp streamType, opts ...any) (*testConn, *Stream) {
+ t.Helper()
+ ctx := canceledContext()
+ tc := newTestConn(t, side, opts...)
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(side.peer(), styp, 0),
+ })
+ s, err := tc.conn.AcceptStream(ctx)
+ if err != nil {
+ t.Fatalf("conn.AcceptStream() = %v", err)
+ }
+ return tc, s
+}