quic: outbound connection-level flow control
Track the peer-provided flow control window.
Only send stream data when the window permits.
For golang/go#58547
Change-Id: I30c054346623e389b3d1cff1de629f1bbf918635
Reviewed-on: https://go-review.googlesource.com/c/net/+/527376
Reviewed-by: Jonathan Amsterdam <jba@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/internal/quic/conn.go b/internal/quic/conn.go
index 0ab6f69..c24e790 100644
--- a/internal/quic/conn.go
+++ b/internal/quic/conn.go
@@ -169,6 +169,7 @@
// receiveTransportParameters applies transport parameters sent by the peer.
func (c *Conn) receiveTransportParameters(p transportParameters) error {
+ c.streams.outflow.setMaxData(p.initialMaxData)
c.streams.localLimit[bidiStream].setMax(p.initialMaxStreamsBidi)
c.streams.localLimit[uniStream].setMax(p.initialMaxStreamsUni)
c.streams.peerInitialMaxStreamDataBidiLocal = p.initialMaxStreamDataBidiLocal
diff --git a/internal/quic/conn_flow.go b/internal/quic/conn_flow.go
index 790210b..265fdaf 100644
--- a/internal/quic/conn_flow.go
+++ b/internal/quic/conn_flow.go
@@ -109,3 +109,26 @@
func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
}
+
+// connOutflow tracks connection-level flow control for data sent by us to the peer.
+type connOutflow struct {
+ max int64 // largest MAX_DATA received from peer
+ used int64 // total bytes of STREAM data sent to peer
+}
+
+// setMaxData updates the connection-level flow control limit
+// with the initial limit conveyed in transport parameters
+// or an update from a MAX_DATA frame.
+func (f *connOutflow) setMaxData(maxData int64) {
+ f.max = max(f.max, maxData)
+}
+
+// avail returns the number of connection-level flow control bytes available.
+func (f *connOutflow) avail() int64 {
+ return f.max - f.used
+}
+
+// consume records consumption of n bytes of flow.
+func (f *connOutflow) consume(n int64) {
+ f.used += n
+}
diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go
index f01a738..28559b4 100644
--- a/internal/quic/conn_flow_test.go
+++ b/internal/quic/conn_flow_test.go
@@ -6,7 +6,9 @@
package quic
-import "testing"
+import (
+ "testing"
+)
func TestConnInflowReturnOnRead(t *testing.T) {
ctx := canceledContext()
@@ -184,3 +186,149 @@
max: 128 + 32 + 1 + 32 + 1,
})
}
+
+func TestConnOutflowBlocked(t *testing.T) {
+ tc, s := newTestConnAndLocalStream(t, clientSide, uniStream,
+ permissiveTransportParameters,
+ func(p *transportParameters) {
+ p.initialMaxData = 10
+ })
+ tc.ignoreFrame(frameTypeAck)
+
+ data := makeTestData(32)
+ n, err := s.Write(data)
+ if n != len(data) || err != nil {
+ t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
+ }
+
+ tc.wantFrame("stream writes data up to MAX_DATA limit",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: data[:10],
+ })
+ tc.wantIdle("stream is blocked by MAX_DATA limit")
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxData{
+ max: 20,
+ })
+ tc.wantFrame("stream writes data up to new MAX_DATA limit",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 10,
+ data: data[10:20],
+ })
+ tc.wantIdle("stream is blocked by new MAX_DATA limit")
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxData{
+ max: 100,
+ })
+ tc.wantFrame("stream writes remaining data",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 20,
+ data: data[20:],
+ })
+}
+
+func TestConnOutflowMaxDataDecreases(t *testing.T) {
+ tc, s := newTestConnAndLocalStream(t, clientSide, uniStream,
+ permissiveTransportParameters,
+ func(p *transportParameters) {
+ p.initialMaxData = 10
+ })
+ tc.ignoreFrame(frameTypeAck)
+
+ // Decrease in MAX_DATA is ignored.
+ tc.writeFrames(packetType1RTT, debugFrameMaxData{
+ max: 5,
+ })
+
+ data := makeTestData(32)
+ n, err := s.Write(data)
+ if n != len(data) || err != nil {
+ t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
+ }
+
+ tc.wantFrame("stream writes data up to MAX_DATA limit",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: data[:10],
+ })
+}
+
+func TestConnOutflowMaxDataRoundRobin(t *testing.T) {
+ ctx := canceledContext()
+ tc := newTestConn(t, clientSide, permissiveTransportParameters,
+ func(p *transportParameters) {
+ p.initialMaxData = 0
+ })
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+
+ s1, err := tc.conn.newLocalStream(ctx, uniStream)
+ if err != nil {
+ t.Fatalf("conn.newLocalStream(%v) = %v", uniStream, err)
+ }
+ s2, err := tc.conn.newLocalStream(ctx, uniStream)
+ if err != nil {
+ t.Fatalf("conn.newLocalStream(%v) = %v", uniStream, err)
+ }
+
+ s1.Write(make([]byte, 10))
+ s2.Write(make([]byte, 10))
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxData{
+ max: 1,
+ })
+ tc.wantFrame("stream 1 writes data up to MAX_DATA limit",
+ packetType1RTT, debugFrameStream{
+ id: s1.id,
+ data: []byte{0},
+ })
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxData{
+ max: 2,
+ })
+ tc.wantFrame("stream 2 writes data up to MAX_DATA limit",
+ packetType1RTT, debugFrameStream{
+ id: s2.id,
+ data: []byte{0},
+ })
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxData{
+ max: 3,
+ })
+ tc.wantFrame("stream 1 writes data up to MAX_DATA limit",
+ packetType1RTT, debugFrameStream{
+ id: s1.id,
+ off: 1,
+ data: []byte{0},
+ })
+}
+
+func TestConnOutflowMetaAndData(t *testing.T) {
+ tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream,
+ permissiveTransportParameters,
+ func(p *transportParameters) {
+ p.initialMaxData = 0
+ })
+ tc.ignoreFrame(frameTypeAck)
+
+ data := makeTestData(32)
+ s.Write(data)
+
+ s.CloseRead()
+ tc.wantFrame("CloseRead sends a STOP_SENDING, not flow controlled",
+ packetType1RTT, debugFrameStopSending{
+ id: s.id,
+ })
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxData{
+ max: 100,
+ })
+ tc.wantFrame("unblocked MAX_DATA",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: data,
+ })
+}
diff --git a/internal/quic/conn_recv.go b/internal/quic/conn_recv.go
index faf3a37..07f17e3 100644
--- a/internal/quic/conn_recv.go
+++ b/internal/quic/conn_recv.go
@@ -186,7 +186,7 @@
if !frameOK(c, ptype, __01) {
return
}
- _, n = consumeMaxDataFrame(payload)
+ n = c.handleMaxDataFrame(now, payload)
case frameTypeMaxStreamData:
if !frameOK(c, ptype, __01) {
return
@@ -280,6 +280,15 @@
return n
}
+func (c *Conn) handleMaxDataFrame(now time.Time, payload []byte) int {
+ maxData, n := consumeMaxDataFrame(payload)
+ if n < 0 {
+ return -1
+ }
+ c.streams.outflow.setMaxData(maxData)
+ return n
+}
+
func (c *Conn) handleMaxStreamDataFrame(now time.Time, payload []byte) int {
id, maxStreamData, n := consumeMaxStreamDataFrame(payload)
if n < 0 {
diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go
index 0a72d26..7c6c8be 100644
--- a/internal/quic/conn_streams.go
+++ b/internal/quic/conn_streams.go
@@ -28,15 +28,15 @@
peerInitialMaxStreamDataBidiLocal int64 // streams opened by them
// Connection-level flow control.
- inflow connInflow
+ inflow connInflow
+ outflow connOutflow
- // Streams with frames to send are stored in a circular linked list.
- // sendHead is the next stream to write, or nil if there are no streams
- // with data to send. sendTail is the last stream to write.
- needSend atomic.Bool
- sendMu sync.Mutex
- sendHead *Stream
- sendTail *Stream
+ // Streams with frames to send are stored in one of two circular linked lists,
+ // depending on whether they require connection-level flow control.
+ needSend atomic.Bool
+ sendMu sync.Mutex
+ queueMeta streamRing // streams with any non-flow-controlled frames
+ queueData streamRing // streams with only flow-controlled frames
}
func (c *Conn) streamsInit() {
@@ -188,29 +188,67 @@
return s
}
-// queueStreamForSend marks a stream as containing frames that need sending.
-func (c *Conn) queueStreamForSend(s *Stream) {
+// maybeQueueStreamForSend marks a stream as containing frames that need sending.
+func (c *Conn) maybeQueueStreamForSend(s *Stream, state streamState) {
+ if state.wantQueue() == state.inQueue() {
+ return // already on the right queue
+ }
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
- if s.next != nil {
- // Already in the queue.
- return
- }
- if c.streams.sendHead == nil {
- // The queue was empty.
- c.streams.sendHead = s
- c.streams.sendTail = s
- s.next = s
- } else {
- // Insert this stream at the end of the queue.
- c.streams.sendTail.next = s
- c.streams.sendTail = s
- s.next = c.streams.sendHead
- }
+ state = s.state.load() // may have changed while waiting
+ c.queueStreamForSendLocked(s, state)
+
c.streams.needSend.Store(true)
c.wake()
}
+// queueStreamForSendLocked moves a stream to the correct send queue,
+// or removes it from all queues.
+//
+// state is the last known stream state.
+func (c *Conn) queueStreamForSendLocked(s *Stream, state streamState) {
+ for {
+ wantQueue := state.wantQueue()
+ inQueue := state.inQueue()
+ if inQueue == wantQueue {
+ return // already on the right queue
+ }
+
+ switch inQueue {
+ case metaQueue:
+ c.streams.queueMeta.remove(s)
+ case dataQueue:
+ c.streams.queueData.remove(s)
+ }
+
+ switch wantQueue {
+ case metaQueue:
+ c.streams.queueMeta.append(s)
+ state = s.state.set(streamQueueMeta, streamQueueMeta|streamQueueData)
+ case dataQueue:
+ c.streams.queueData.append(s)
+ state = s.state.set(streamQueueData, streamQueueMeta|streamQueueData)
+ case noQueue:
+ state = s.state.set(0, streamQueueMeta|streamQueueData)
+ }
+
+ // If the stream state changed while we were moving the stream,
+ // we might now be on the wrong queue.
+ //
+ // For example:
+ // - stream has data to send: streamOutSendData|streamQueueData
+ // - appendStreamFrames sends all the data: streamQueueData
+ // - concurrently, more data is written: streamOutSendData|streamQueueData
+ // - appendStreamFrames calls us with the last state it observed
+ // (streamQueueData).
+ // - We remove the stream from the queue and observe the updated state:
+ // streamOutSendData
+ // - We realize that the stream needs to go back on the data queue.
+ //
+ // Go back around the loop to confirm we're on the correct queue.
+ }
+}
+
// appendStreamFrames writes stream-related frames to the current packet.
//
// It returns true if no more frames need appending,
@@ -237,44 +275,45 @@
}
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
- for {
- s := c.streams.sendHead
- const pto = false
-
+ // queueMeta contains streams with non-flow-controlled frames to send.
+ for c.streams.queueMeta.head != nil {
+ s := c.streams.queueMeta.head
state := s.state.load()
- if state&streamInSend != 0 {
+ if state&(streamQueueMeta|streamConnRemoved) != streamQueueMeta {
+ panic("BUG: queueMeta stream is not streamQueueMeta")
+ }
+ if state&streamInSendMeta != 0 {
s.ingate.lock()
ok := s.appendInFramesLocked(w, pnum, pto)
state = s.inUnlockNoQueue()
if !ok {
return false
}
- }
-
- if state&streamOutSend != 0 {
- avail := w.avail()
- s.outgate.lock()
- ok := s.appendOutFramesLocked(w, pnum, pto)
- state = s.outUnlockNoQueue()
- if !ok {
- // We've sent some data for this stream, but it still has more to send.
- // If the stream got a reasonable chance to put data in a packet,
- // advance sendHead to the next stream in line, to avoid starvation.
- // We'll come back to this stream after going through the others.
- //
- // If the packet was already mostly out of space, leave sendHead alone
- // and come back to this stream again on the next packet.
- if avail > 512 {
- c.streams.sendHead = s.next
- c.streams.sendTail = s
- }
- return false
+ if state&streamInSendMeta != 0 {
+ panic("BUG: streamInSendMeta set after successfully appending frames")
}
}
-
- if state == streamInDone|streamOutDone {
+ if state&streamOutSendMeta != 0 {
+ s.outgate.lock()
+ // This might also append flow-controlled frames if we have any
+ // and available conn-level quota. That's fine.
+ ok := s.appendOutFramesLocked(w, pnum, pto)
+ state = s.outUnlockNoQueue()
+ // We're checking both ok and state, because appendOutFramesLocked
+ // might have filled up the packet with flow-controlled data.
+ // If so, we want to move the stream to queueData for any remaining frames.
+ if !ok && state&streamOutSendMeta != 0 {
+ return false
+ }
+ if state&streamOutSendMeta != 0 {
+ panic("BUG: streamOutSendMeta set after successfully appending frames")
+ }
+ }
+ // We've sent all frames for this stream, so remove it from the send queue.
+ c.streams.queueMeta.remove(s)
+ if state&(streamInDone|streamOutDone) == streamInDone|streamOutDone {
// Stream is finished, remove it from the conn.
- s.state.set(streamConnRemoved, streamConnRemoved)
+ state = s.state.set(streamConnRemoved, streamQueueMeta|streamConnRemoved)
delete(c.streams.streams, s.id)
// Record finalization of remote streams, to know when
@@ -282,24 +321,59 @@
if s.id.initiator() != c.side {
c.streams.remoteLimit[s.id.streamType()].close()
}
+ } else {
+ state = s.state.set(0, streamQueueMeta|streamConnRemoved)
}
-
- next := s.next
- s.next = nil
- if (next == s) != (s == c.streams.sendTail) {
- panic("BUG: sendable stream list state is inconsistent")
+ // The stream may have flow-controlled data to send,
+ // or something might have added non-flow-controlled frames after we
+ // unlocked the stream.
+ // If so, put the stream back on a queue.
+ c.queueStreamForSendLocked(s, state)
+ }
+ // queueData contains streams with flow-controlled frames.
+ for c.streams.queueData.head != nil {
+ avail := c.streams.outflow.avail()
+ if avail == 0 {
+ break // no flow control quota available
}
- if s == c.streams.sendTail {
- // This was the last stream.
- c.streams.sendHead = nil
- c.streams.sendTail = nil
- c.streams.needSend.Store(false)
+ s := c.streams.queueData.head
+ s.outgate.lock()
+ ok := s.appendOutFramesLocked(w, pnum, pto)
+ state := s.outUnlockNoQueue()
+ if !ok {
+ // We've sent some data for this stream, but it still has more to send.
+ // If the stream got a reasonable chance to put data in a packet,
+ // advance sendHead to the next stream in line, to avoid starvation.
+ // We'll come back to this stream after going through the others.
+ //
+ // If the packet was already mostly out of space, leave sendHead alone
+ // and come back to this stream again on the next packet.
+ if avail > 512 {
+ c.streams.queueData.head = s.next
+ }
+ return false
+ }
+ if state&streamQueueData == 0 {
+ panic("BUG: queueData stream is not streamQueueData")
+ }
+ if state&streamOutSendData != 0 {
+ // We must have run out of connection-level flow control:
+ // appendOutFramesLocked says it wrote all it can, but there's
+ // still data to send.
+ //
+ // Advance sendHead to the next stream in line to avoid starvation.
+ if c.streams.outflow.avail() != 0 {
+ panic("BUG: streamOutSendData set and flow control available after send")
+ }
+ c.streams.queueData.head = s.next
return true
}
- // We've sent all data for this stream, so remove it from the list.
- c.streams.sendTail.next = next
- c.streams.sendHead = next
+ c.streams.queueData.remove(s)
+ state = s.state.set(0, streamQueueData)
+ c.queueStreamForSendLocked(s, state)
}
+ c.streams.needSend.Store(c.streams.queueData.head != nil)
+ return true
}
// appendStreamFramesPTO writes stream-related frames to the current packet
@@ -329,3 +403,37 @@
}
return true
}
+
+// A streamRing is a circular linked list of streams.
+type streamRing struct {
+ head *Stream
+}
+
+// remove removes s from the ring.
+// s must be on the ring.
+func (r *streamRing) remove(s *Stream) {
+ if s.next == s {
+ r.head = nil // s was the last stream in the ring
+ } else {
+ s.prev.next = s.next
+ s.next.prev = s.prev
+ if r.head == s {
+ r.head = s.next
+ }
+ }
+}
+
+// append places s at the last position in the ring.
+// s must not be attached to any ring.
+func (r *streamRing) append(s *Stream) {
+ if r.head == nil {
+ r.head = s
+ s.next = s
+ s.prev = s
+ } else {
+ s.prev = r.head.prev
+ s.next = r.head
+ s.prev.next = s
+ s.next.prev = s
+ }
+}
diff --git a/internal/quic/conn_streams_test.go b/internal/quic/conn_streams_test.go
index 8ae007c..69f982c 100644
--- a/internal/quic/conn_streams_test.go
+++ b/internal/quic/conn_streams_test.go
@@ -163,7 +163,7 @@
if got := len(tc.conn.streams.streams); got != 0 {
t.Fatalf("after close, len(tc.conn.streams.streams) = %v, want 0", got)
}
- if tc.conn.streams.sendHead != nil {
+ if tc.conn.streams.queueMeta.head != nil {
t.Fatalf("after close, stream send queue is not empty; should be")
}
}
@@ -474,7 +474,7 @@
if got := len(tc.conn.streams.streams); got != 0 {
t.Fatalf("after test, len(tc.conn.streams.streams) = %v, want 0", got)
}
- if tc.conn.streams.sendHead != nil {
+ if tc.conn.streams.queueMeta.head != nil {
t.Fatalf("after test, stream send queue is not empty; should be")
}
}
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 84c437d..923ff23 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -57,6 +57,7 @@
// streamIn* bits must be set with ingate held.
// streamOut* bits must be set with outgate held.
// streamConn* bits are set by the conn's loop.
+ // streamQueue* bits must be set with streamsState.sendMu held.
state atomicBits[streamState]
prev, next *Stream // guarded by streamsState.sendMu
@@ -65,11 +66,19 @@
type streamState uint32
const (
- // streamInSend and streamOutSend are set when there are
- // frames to send for the inbound or outbound sides of the stream.
- // For example, MAX_STREAM_DATA or STREAM_DATA_BLOCKED.
- streamInSend = streamState(1 << iota)
- streamOutSend
+ // streamInSendMeta is set when there are frames to send for the
+ // inbound side of the stream. For example, MAX_STREAM_DATA.
+ // Inbound frames are never flow-controlled.
+ streamInSendMeta = streamState(1 << iota)
+
+ // streamOutSendMeta is set when there are non-flow-controlled frames
+ // to send for the outbound side of the stream. For example, STREAM_DATA_BLOCKED.
+ // streamOutSendData is set when there are no non-flow-controlled outbound frames
+ // and the stream has data to send.
+ //
+ // At most one of streamOutSendMeta and streamOutSendData is set at any time.
+ streamOutSendMeta
+ streamOutSendData
// streamInDone and streamOutDone are set when the inbound or outbound
// sides of the stream are finished. When both are set, the stream
@@ -79,8 +88,48 @@
// streamConnRemoved is set when the stream has been removed from the conn.
streamConnRemoved
+
+ // streamQueueMeta and streamQueueData indicate which of the streamsState
+ // send queues the conn is currently on.
+ streamQueueMeta
+ streamQueueData
)
+type streamQueue int
+
+const (
+ noQueue = streamQueue(iota)
+ metaQueue // streamsState.queueMeta
+ dataQueue // streamsState.queueData
+)
+
+// wantQueue returns the send queue the stream should be on.
+func (s streamState) wantQueue() streamQueue {
+ switch {
+ case s&(streamInSendMeta|streamOutSendMeta) != 0:
+ return metaQueue
+ case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone:
+ return metaQueue
+ case s&streamOutSendData != 0:
+ // The stream has no non-flow-controlled frames to send,
+ // but does have data. Put it on the data queue, which is only
+ // processed when flow control is available.
+ return dataQueue
+ }
+ return noQueue
+}
+
+// inQueue returns the send queue the stream is currently on.
+func (s streamState) inQueue() streamQueue {
+ switch {
+ case s&streamQueueMeta != 0:
+ return metaQueue
+ case s&streamQueueData != 0:
+ return dataQueue
+ }
+ return noQueue
+}
+
// newStream returns a new stream.
//
// The stream's ingate and outgate are locked.
@@ -365,9 +414,7 @@
// are done and the stream should be removed, it notifies the Conn.
func (s *Stream) inUnlock() {
state := s.inUnlockNoQueue()
- if state&streamInSend != 0 || state == streamInDone|streamOutDone {
- s.conn.queueStreamForSend(s)
- }
+ s.conn.maybeQueueStreamForSend(s, state)
}
// inUnlockNoQueue is inUnlock,
@@ -391,11 +438,11 @@
state = streamInDone
}
case s.insendmax.shouldSend(): // STREAM_MAX_DATA
- state = streamInSend
+ state = streamInSendMeta
case s.inclosed.shouldSend(): // STOP_SENDING
- state = streamInSend
+ state = streamInSendMeta
}
- const mask = streamInDone | streamInSend
+ const mask = streamInDone | streamInSendMeta
return s.state.set(state, mask)
}
@@ -405,9 +452,7 @@
// are done and the stream should be removed, it notifies the Conn.
func (s *Stream) outUnlock() {
state := s.outUnlockNoQueue()
- if state&streamOutSend != 0 || state == streamInDone|streamOutDone {
- s.conn.queueStreamForSend(s)
- }
+ s.conn.maybeQueueStreamForSend(s, state)
}
// outUnlockNoQueue is outUnlock,
@@ -442,18 +487,18 @@
state = streamOutDone
}
case s.outreset.shouldSend(): // RESET_STREAM
- state = streamOutSend
+ state = streamOutSendMeta
case s.outreset.isSet(): // RESET_STREAM sent but not acknowledged
- case len(s.outunsent) > 0: // STREAM frame with data
- state = streamOutSend
- case s.outclosed.shouldSend(): // STREAM frame with FIN bit
- state = streamOutSend
- case s.outopened.shouldSend(): // STREAM frame with no data
- state = streamOutSend
case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED
- state = streamOutSend
+ state = streamOutSendMeta
+ case len(s.outunsent) > 0: // STREAM frame with data
+ state = streamOutSendData
+ case s.outclosed.shouldSend(): // STREAM frame with FIN bit, all data already sent
+ state = streamOutSendMeta
+ case s.outopened.shouldSend(): // STREAM frame with no data
+ state = streamOutSendMeta
}
- const mask = streamOutDone | streamOutSend
+ const mask = streamOutDone | streamOutSendMeta | streamOutSendData
return s.state.set(state, mask)
}
@@ -678,6 +723,7 @@
for {
// STREAM
off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto)
+ size = min(size, s.conn.streams.outflow.avail())
fin := s.outclosed.isSet() && off+size == s.out.end
shouldSend := size > 0 || // have data to send
s.outopened.shouldSendPTO(pto) || // should open the stream
@@ -690,6 +736,7 @@
return false
}
s.out.copy(off, b)
+ s.conn.streams.outflow.consume(int64(len(b)))
s.outunsent.sub(off, off+int64(len(b)))
s.frameOpensStream(pnum)
if fin {
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index b014852..7501196 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -1270,3 +1270,11 @@
p.initialMaxStreamDataBidiLocal = maxVarint
p.initialMaxStreamDataUni = maxVarint
}
+
+func makeTestData(n int) []byte {
+ b := make([]byte, n)
+ for i := 0; i < n; i++ {
+ b[i] = byte(i)
+ }
+ return b
+}