quic: outbound connection-level flow control

Track the peer-provided flow control window.
Only send stream data when the window permits.

For golang/go#58547

Change-Id: I30c054346623e389b3d1cff1de629f1bbf918635
Reviewed-on: https://go-review.googlesource.com/c/net/+/527376
Reviewed-by: Jonathan Amsterdam <jba@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/internal/quic/conn.go b/internal/quic/conn.go
index 0ab6f69..c24e790 100644
--- a/internal/quic/conn.go
+++ b/internal/quic/conn.go
@@ -169,6 +169,7 @@
 
 // receiveTransportParameters applies transport parameters sent by the peer.
 func (c *Conn) receiveTransportParameters(p transportParameters) error {
+	c.streams.outflow.setMaxData(p.initialMaxData)
 	c.streams.localLimit[bidiStream].setMax(p.initialMaxStreamsBidi)
 	c.streams.localLimit[uniStream].setMax(p.initialMaxStreamsUni)
 	c.streams.peerInitialMaxStreamDataBidiLocal = p.initialMaxStreamDataBidiLocal
diff --git a/internal/quic/conn_flow.go b/internal/quic/conn_flow.go
index 790210b..265fdaf 100644
--- a/internal/quic/conn_flow.go
+++ b/internal/quic/conn_flow.go
@@ -109,3 +109,26 @@
 func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
 	c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
 }
+
+// connOutflow tracks connection-level flow control for data sent by us to the peer.
+type connOutflow struct {
+	max  int64 // largest MAX_DATA received from peer
+	used int64 // total bytes of STREAM data sent to peer
+}
+
+// setMaxData updates the connection-level flow control limit
+// with the initial limit conveyed in transport parameters
+// or an update from a MAX_DATA frame.
+func (f *connOutflow) setMaxData(maxData int64) {
+	f.max = max(f.max, maxData)
+}
+
+// avail returns the number of connection-level flow control bytes available.
+func (f *connOutflow) avail() int64 {
+	return f.max - f.used
+}
+
+// consume records consumption of n bytes of flow.
+func (f *connOutflow) consume(n int64) {
+	f.used += n
+}
diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go
index f01a738..28559b4 100644
--- a/internal/quic/conn_flow_test.go
+++ b/internal/quic/conn_flow_test.go
@@ -6,7 +6,9 @@
 
 package quic
 
-import "testing"
+import (
+	"testing"
+)
 
 func TestConnInflowReturnOnRead(t *testing.T) {
 	ctx := canceledContext()
@@ -184,3 +186,149 @@
 			max: 128 + 32 + 1 + 32 + 1,
 		})
 }
+
+func TestConnOutflowBlocked(t *testing.T) {
+	tc, s := newTestConnAndLocalStream(t, clientSide, uniStream,
+		permissiveTransportParameters,
+		func(p *transportParameters) {
+			p.initialMaxData = 10
+		})
+	tc.ignoreFrame(frameTypeAck)
+
+	data := makeTestData(32)
+	n, err := s.Write(data)
+	if n != len(data) || err != nil {
+		t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
+	}
+
+	tc.wantFrame("stream writes data up to MAX_DATA limit",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			data: data[:10],
+		})
+	tc.wantIdle("stream is blocked by MAX_DATA limit")
+
+	tc.writeFrames(packetType1RTT, debugFrameMaxData{
+		max: 20,
+	})
+	tc.wantFrame("stream writes data up to new MAX_DATA limit",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  10,
+			data: data[10:20],
+		})
+	tc.wantIdle("stream is blocked by new MAX_DATA limit")
+
+	tc.writeFrames(packetType1RTT, debugFrameMaxData{
+		max: 100,
+	})
+	tc.wantFrame("stream writes remaining data",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  20,
+			data: data[20:],
+		})
+}
+
+func TestConnOutflowMaxDataDecreases(t *testing.T) {
+	tc, s := newTestConnAndLocalStream(t, clientSide, uniStream,
+		permissiveTransportParameters,
+		func(p *transportParameters) {
+			p.initialMaxData = 10
+		})
+	tc.ignoreFrame(frameTypeAck)
+
+	// Decrease in MAX_DATA is ignored.
+	tc.writeFrames(packetType1RTT, debugFrameMaxData{
+		max: 5,
+	})
+
+	data := makeTestData(32)
+	n, err := s.Write(data)
+	if n != len(data) || err != nil {
+		t.Fatalf("s.Write() = %v, %v; want %v, nil", n, err, len(data))
+	}
+
+	tc.wantFrame("stream writes data up to MAX_DATA limit",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			data: data[:10],
+		})
+}
+
+func TestConnOutflowMaxDataRoundRobin(t *testing.T) {
+	ctx := canceledContext()
+	tc := newTestConn(t, clientSide, permissiveTransportParameters,
+		func(p *transportParameters) {
+			p.initialMaxData = 0
+		})
+	tc.handshake()
+	tc.ignoreFrame(frameTypeAck)
+
+	s1, err := tc.conn.newLocalStream(ctx, uniStream)
+	if err != nil {
+		t.Fatalf("conn.newLocalStream(%v) = %v", uniStream, err)
+	}
+	s2, err := tc.conn.newLocalStream(ctx, uniStream)
+	if err != nil {
+		t.Fatalf("conn.newLocalStream(%v) = %v", uniStream, err)
+	}
+
+	s1.Write(make([]byte, 10))
+	s2.Write(make([]byte, 10))
+
+	tc.writeFrames(packetType1RTT, debugFrameMaxData{
+		max: 1,
+	})
+	tc.wantFrame("stream 1 writes data up to MAX_DATA limit",
+		packetType1RTT, debugFrameStream{
+			id:   s1.id,
+			data: []byte{0},
+		})
+
+	tc.writeFrames(packetType1RTT, debugFrameMaxData{
+		max: 2,
+	})
+	tc.wantFrame("stream 2 writes data up to MAX_DATA limit",
+		packetType1RTT, debugFrameStream{
+			id:   s2.id,
+			data: []byte{0},
+		})
+
+	tc.writeFrames(packetType1RTT, debugFrameMaxData{
+		max: 3,
+	})
+	tc.wantFrame("stream 1 writes data up to MAX_DATA limit",
+		packetType1RTT, debugFrameStream{
+			id:   s1.id,
+			off:  1,
+			data: []byte{0},
+		})
+}
+
+func TestConnOutflowMetaAndData(t *testing.T) {
+	tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream,
+		permissiveTransportParameters,
+		func(p *transportParameters) {
+			p.initialMaxData = 0
+		})
+	tc.ignoreFrame(frameTypeAck)
+
+	data := makeTestData(32)
+	s.Write(data)
+
+	s.CloseRead()
+	tc.wantFrame("CloseRead sends a STOP_SENDING, not flow controlled",
+		packetType1RTT, debugFrameStopSending{
+			id: s.id,
+		})
+
+	tc.writeFrames(packetType1RTT, debugFrameMaxData{
+		max: 100,
+	})
+	tc.wantFrame("unblocked MAX_DATA",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			data: data,
+		})
+}
diff --git a/internal/quic/conn_recv.go b/internal/quic/conn_recv.go
index faf3a37..07f17e3 100644
--- a/internal/quic/conn_recv.go
+++ b/internal/quic/conn_recv.go
@@ -186,7 +186,7 @@
 			if !frameOK(c, ptype, __01) {
 				return
 			}
-			_, n = consumeMaxDataFrame(payload)
+			n = c.handleMaxDataFrame(now, payload)
 		case frameTypeMaxStreamData:
 			if !frameOK(c, ptype, __01) {
 				return
@@ -280,6 +280,15 @@
 	return n
 }
 
+func (c *Conn) handleMaxDataFrame(now time.Time, payload []byte) int {
+	maxData, n := consumeMaxDataFrame(payload)
+	if n < 0 {
+		return -1
+	}
+	c.streams.outflow.setMaxData(maxData)
+	return n
+}
+
 func (c *Conn) handleMaxStreamDataFrame(now time.Time, payload []byte) int {
 	id, maxStreamData, n := consumeMaxStreamDataFrame(payload)
 	if n < 0 {
diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go
index 0a72d26..7c6c8be 100644
--- a/internal/quic/conn_streams.go
+++ b/internal/quic/conn_streams.go
@@ -28,15 +28,15 @@
 	peerInitialMaxStreamDataBidiLocal int64                  // streams opened by them
 
 	// Connection-level flow control.
-	inflow connInflow
+	inflow  connInflow
+	outflow connOutflow
 
-	// Streams with frames to send are stored in a circular linked list.
-	// sendHead is the next stream to write, or nil if there are no streams
-	// with data to send. sendTail is the last stream to write.
-	needSend atomic.Bool
-	sendMu   sync.Mutex
-	sendHead *Stream
-	sendTail *Stream
+	// Streams with frames to send are stored in one of two circular linked lists,
+	// depending on whether they require connection-level flow control.
+	needSend  atomic.Bool
+	sendMu    sync.Mutex
+	queueMeta streamRing // streams with any non-flow-controlled frames
+	queueData streamRing // streams with only flow-controlled frames
 }
 
 func (c *Conn) streamsInit() {
@@ -188,29 +188,67 @@
 	return s
 }
 
-// queueStreamForSend marks a stream as containing frames that need sending.
-func (c *Conn) queueStreamForSend(s *Stream) {
+// maybeQueueStreamForSend marks a stream as containing frames that need sending.
+func (c *Conn) maybeQueueStreamForSend(s *Stream, state streamState) {
+	if state.wantQueue() == state.inQueue() {
+		return // already on the right queue
+	}
 	c.streams.sendMu.Lock()
 	defer c.streams.sendMu.Unlock()
-	if s.next != nil {
-		// Already in the queue.
-		return
-	}
-	if c.streams.sendHead == nil {
-		// The queue was empty.
-		c.streams.sendHead = s
-		c.streams.sendTail = s
-		s.next = s
-	} else {
-		// Insert this stream at the end of the queue.
-		c.streams.sendTail.next = s
-		c.streams.sendTail = s
-		s.next = c.streams.sendHead
-	}
+	state = s.state.load() // may have changed while waiting
+	c.queueStreamForSendLocked(s, state)
+
 	c.streams.needSend.Store(true)
 	c.wake()
 }
 
+// queueStreamForSendLocked moves a stream to the correct send queue,
+// or removes it from all queues.
+//
+// state is the last known stream state.
+func (c *Conn) queueStreamForSendLocked(s *Stream, state streamState) {
+	for {
+		wantQueue := state.wantQueue()
+		inQueue := state.inQueue()
+		if inQueue == wantQueue {
+			return // already on the right queue
+		}
+
+		switch inQueue {
+		case metaQueue:
+			c.streams.queueMeta.remove(s)
+		case dataQueue:
+			c.streams.queueData.remove(s)
+		}
+
+		switch wantQueue {
+		case metaQueue:
+			c.streams.queueMeta.append(s)
+			state = s.state.set(streamQueueMeta, streamQueueMeta|streamQueueData)
+		case dataQueue:
+			c.streams.queueData.append(s)
+			state = s.state.set(streamQueueData, streamQueueMeta|streamQueueData)
+		case noQueue:
+			state = s.state.set(0, streamQueueMeta|streamQueueData)
+		}
+
+		// If the stream state changed while we were moving the stream,
+		// we might now be on the wrong queue.
+		//
+		// For example:
+		//   - stream has data to send: streamOutSendData|streamQueueData
+		//   - appendStreamFrames sends all the data: streamQueueData
+		//   - concurrently, more data is written: streamOutSendData|streamQueueData
+		//   - appendStreamFrames calls us with the last state it observed
+		//     (streamQueueData).
+		//   - We remove the stream from the queue and observe the updated state:
+		//     streamOutSendData
+		//   - We realize that the stream needs to go back on the data queue.
+		//
+		// Go back around the loop to confirm we're on the correct queue.
+	}
+}
+
 // appendStreamFrames writes stream-related frames to the current packet.
 //
 // It returns true if no more frames need appending,
@@ -237,44 +275,45 @@
 	}
 	c.streams.sendMu.Lock()
 	defer c.streams.sendMu.Unlock()
-	for {
-		s := c.streams.sendHead
-		const pto = false
-
+	// queueMeta contains streams with non-flow-controlled frames to send.
+	for c.streams.queueMeta.head != nil {
+		s := c.streams.queueMeta.head
 		state := s.state.load()
-		if state&streamInSend != 0 {
+		if state&(streamQueueMeta|streamConnRemoved) != streamQueueMeta {
+			panic("BUG: queueMeta stream is not streamQueueMeta")
+		}
+		if state&streamInSendMeta != 0 {
 			s.ingate.lock()
 			ok := s.appendInFramesLocked(w, pnum, pto)
 			state = s.inUnlockNoQueue()
 			if !ok {
 				return false
 			}
-		}
-
-		if state&streamOutSend != 0 {
-			avail := w.avail()
-			s.outgate.lock()
-			ok := s.appendOutFramesLocked(w, pnum, pto)
-			state = s.outUnlockNoQueue()
-			if !ok {
-				// We've sent some data for this stream, but it still has more to send.
-				// If the stream got a reasonable chance to put data in a packet,
-				// advance sendHead to the next stream in line, to avoid starvation.
-				// We'll come back to this stream after going through the others.
-				//
-				// If the packet was already mostly out of space, leave sendHead alone
-				// and come back to this stream again on the next packet.
-				if avail > 512 {
-					c.streams.sendHead = s.next
-					c.streams.sendTail = s
-				}
-				return false
+			if state&streamInSendMeta != 0 {
+				panic("BUG: streamInSendMeta set after successfully appending frames")
 			}
 		}
-
-		if state == streamInDone|streamOutDone {
+		if state&streamOutSendMeta != 0 {
+			s.outgate.lock()
+			// This might also append flow-controlled frames if we have any
+			// and available conn-level quota. That's fine.
+			ok := s.appendOutFramesLocked(w, pnum, pto)
+			state = s.outUnlockNoQueue()
+			// We're checking both ok and state, because appendOutFramesLocked
+			// might have filled up the packet with flow-controlled data.
+			// If so, we want to move the stream to queueData for any remaining frames.
+			if !ok && state&streamOutSendMeta != 0 {
+				return false
+			}
+			if state&streamOutSendMeta != 0 {
+				panic("BUG: streamOutSendMeta set after successfully appending frames")
+			}
+		}
+		// We've sent all frames for this stream, so remove it from the send queue.
+		c.streams.queueMeta.remove(s)
+		if state&(streamInDone|streamOutDone) == streamInDone|streamOutDone {
 			// Stream is finished, remove it from the conn.
-			s.state.set(streamConnRemoved, streamConnRemoved)
+			state = s.state.set(streamConnRemoved, streamQueueMeta|streamConnRemoved)
 			delete(c.streams.streams, s.id)
 
 			// Record finalization of remote streams, to know when
@@ -282,24 +321,59 @@
 			if s.id.initiator() != c.side {
 				c.streams.remoteLimit[s.id.streamType()].close()
 			}
+		} else {
+			state = s.state.set(0, streamQueueMeta|streamConnRemoved)
 		}
-
-		next := s.next
-		s.next = nil
-		if (next == s) != (s == c.streams.sendTail) {
-			panic("BUG: sendable stream list state is inconsistent")
+		// The stream may have flow-controlled data to send,
+		// or something might have added non-flow-controlled frames after we
+		// unlocked the stream.
+		// If so, put the stream back on a queue.
+		c.queueStreamForSendLocked(s, state)
+	}
+	// queueData contains streams with flow-controlled frames.
+	for c.streams.queueData.head != nil {
+		avail := c.streams.outflow.avail()
+		if avail == 0 {
+			break // no flow control quota available
 		}
-		if s == c.streams.sendTail {
-			// This was the last stream.
-			c.streams.sendHead = nil
-			c.streams.sendTail = nil
-			c.streams.needSend.Store(false)
+		s := c.streams.queueData.head
+		s.outgate.lock()
+		ok := s.appendOutFramesLocked(w, pnum, pto)
+		state := s.outUnlockNoQueue()
+		if !ok {
+			// We've sent some data for this stream, but it still has more to send.
+			// If the stream got a reasonable chance to put data in a packet,
+			// advance sendHead to the next stream in line, to avoid starvation.
+			// We'll come back to this stream after going through the others.
+			//
+			// If the packet was already mostly out of space, leave sendHead alone
+			// and come back to this stream again on the next packet.
+			if avail > 512 {
+				c.streams.queueData.head = s.next
+			}
+			return false
+		}
+		if state&streamQueueData == 0 {
+			panic("BUG: queueData stream is not streamQueueData")
+		}
+		if state&streamOutSendData != 0 {
+			// We must have run out of connection-level flow control:
+			// appendOutFramesLocked says it wrote all it can, but there's
+			// still data to send.
+			//
+			// Advance sendHead to the next stream in line to avoid starvation.
+			if c.streams.outflow.avail() != 0 {
+				panic("BUG: streamOutSendData set and flow control available after send")
+			}
+			c.streams.queueData.head = s.next
 			return true
 		}
-		// We've sent all data for this stream, so remove it from the list.
-		c.streams.sendTail.next = next
-		c.streams.sendHead = next
+		c.streams.queueData.remove(s)
+		state = s.state.set(0, streamQueueData)
+		c.queueStreamForSendLocked(s, state)
 	}
+	c.streams.needSend.Store(c.streams.queueData.head != nil)
+	return true
 }
 
 // appendStreamFramesPTO writes stream-related frames to the current packet
@@ -329,3 +403,37 @@
 	}
 	return true
 }
+
+// A streamRing is a circular linked list of streams.
+type streamRing struct {
+	head *Stream
+}
+
+// remove removes s from the ring.
+// s must be on the ring.
+func (r *streamRing) remove(s *Stream) {
+	if s.next == s {
+		r.head = nil // s was the last stream in the ring
+	} else {
+		s.prev.next = s.next
+		s.next.prev = s.prev
+		if r.head == s {
+			r.head = s.next
+		}
+	}
+}
+
+// append places s at the last position in the ring.
+// s must not be attached to any ring.
+func (r *streamRing) append(s *Stream) {
+	if r.head == nil {
+		r.head = s
+		s.next = s
+		s.prev = s
+	} else {
+		s.prev = r.head.prev
+		s.next = r.head
+		s.prev.next = s
+		s.next.prev = s
+	}
+}
diff --git a/internal/quic/conn_streams_test.go b/internal/quic/conn_streams_test.go
index 8ae007c..69f982c 100644
--- a/internal/quic/conn_streams_test.go
+++ b/internal/quic/conn_streams_test.go
@@ -163,7 +163,7 @@
 	if got := len(tc.conn.streams.streams); got != 0 {
 		t.Fatalf("after close, len(tc.conn.streams.streams) = %v, want 0", got)
 	}
-	if tc.conn.streams.sendHead != nil {
+	if tc.conn.streams.queueMeta.head != nil {
 		t.Fatalf("after close, stream send queue is not empty; should be")
 	}
 }
@@ -474,7 +474,7 @@
 	if got := len(tc.conn.streams.streams); got != 0 {
 		t.Fatalf("after test, len(tc.conn.streams.streams) = %v, want 0", got)
 	}
-	if tc.conn.streams.sendHead != nil {
+	if tc.conn.streams.queueMeta.head != nil {
 		t.Fatalf("after test, stream send queue is not empty; should be")
 	}
 }
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 84c437d..923ff23 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -57,6 +57,7 @@
 	// streamIn* bits must be set with ingate held.
 	// streamOut* bits must be set with outgate held.
 	// streamConn* bits are set by the conn's loop.
+	// streamQueue* bits must be set with streamsState.sendMu held.
 	state atomicBits[streamState]
 
 	prev, next *Stream // guarded by streamsState.sendMu
@@ -65,11 +66,19 @@
 type streamState uint32
 
 const (
-	// streamInSend and streamOutSend are set when there are
-	// frames to send for the inbound or outbound sides of the stream.
-	// For example, MAX_STREAM_DATA or STREAM_DATA_BLOCKED.
-	streamInSend = streamState(1 << iota)
-	streamOutSend
+	// streamInSendMeta is set when there are frames to send for the
+	// inbound side of the stream. For example, MAX_STREAM_DATA.
+	// Inbound frames are never flow-controlled.
+	streamInSendMeta = streamState(1 << iota)
+
+	// streamOutSendMeta is set when there are non-flow-controlled frames
+	// to send for the outbound side of the stream. For example, STREAM_DATA_BLOCKED.
+	// streamOutSendData is set when there are no non-flow-controlled outbound frames
+	// and the stream has data to send.
+	//
+	// At most one of streamOutSendMeta and streamOutSendData is set at any time.
+	streamOutSendMeta
+	streamOutSendData
 
 	// streamInDone and streamOutDone are set when the inbound or outbound
 	// sides of the stream are finished. When both are set, the stream
@@ -79,8 +88,48 @@
 
 	// streamConnRemoved is set when the stream has been removed from the conn.
 	streamConnRemoved
+
+	// streamQueueMeta and streamQueueData indicate which of the streamsState
+	// send queues the conn is currently on.
+	streamQueueMeta
+	streamQueueData
 )
 
+type streamQueue int
+
+const (
+	noQueue   = streamQueue(iota)
+	metaQueue // streamsState.queueMeta
+	dataQueue // streamsState.queueData
+)
+
+// wantQueue returns the send queue the stream should be on.
+func (s streamState) wantQueue() streamQueue {
+	switch {
+	case s&(streamInSendMeta|streamOutSendMeta) != 0:
+		return metaQueue
+	case s&(streamInDone|streamOutDone|streamConnRemoved) == streamInDone|streamOutDone:
+		return metaQueue
+	case s&streamOutSendData != 0:
+		// The stream has no non-flow-controlled frames to send,
+		// but does have data. Put it on the data queue, which is only
+		// processed when flow control is available.
+		return dataQueue
+	}
+	return noQueue
+}
+
+// inQueue returns the send queue the stream is currently on.
+func (s streamState) inQueue() streamQueue {
+	switch {
+	case s&streamQueueMeta != 0:
+		return metaQueue
+	case s&streamQueueData != 0:
+		return dataQueue
+	}
+	return noQueue
+}
+
 // newStream returns a new stream.
 //
 // The stream's ingate and outgate are locked.
@@ -365,9 +414,7 @@
 // are done and the stream should be removed, it notifies the Conn.
 func (s *Stream) inUnlock() {
 	state := s.inUnlockNoQueue()
-	if state&streamInSend != 0 || state == streamInDone|streamOutDone {
-		s.conn.queueStreamForSend(s)
-	}
+	s.conn.maybeQueueStreamForSend(s, state)
 }
 
 // inUnlockNoQueue is inUnlock,
@@ -391,11 +438,11 @@
 			state = streamInDone
 		}
 	case s.insendmax.shouldSend(): // STREAM_MAX_DATA
-		state = streamInSend
+		state = streamInSendMeta
 	case s.inclosed.shouldSend(): // STOP_SENDING
-		state = streamInSend
+		state = streamInSendMeta
 	}
-	const mask = streamInDone | streamInSend
+	const mask = streamInDone | streamInSendMeta
 	return s.state.set(state, mask)
 }
 
@@ -405,9 +452,7 @@
 // are done and the stream should be removed, it notifies the Conn.
 func (s *Stream) outUnlock() {
 	state := s.outUnlockNoQueue()
-	if state&streamOutSend != 0 || state == streamInDone|streamOutDone {
-		s.conn.queueStreamForSend(s)
-	}
+	s.conn.maybeQueueStreamForSend(s, state)
 }
 
 // outUnlockNoQueue is outUnlock,
@@ -442,18 +487,18 @@
 			state = streamOutDone
 		}
 	case s.outreset.shouldSend(): // RESET_STREAM
-		state = streamOutSend
+		state = streamOutSendMeta
 	case s.outreset.isSet(): // RESET_STREAM sent but not acknowledged
-	case len(s.outunsent) > 0: // STREAM frame with data
-		state = streamOutSend
-	case s.outclosed.shouldSend(): // STREAM frame with FIN bit
-		state = streamOutSend
-	case s.outopened.shouldSend(): // STREAM frame with no data
-		state = streamOutSend
 	case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED
-		state = streamOutSend
+		state = streamOutSendMeta
+	case len(s.outunsent) > 0: // STREAM frame with data
+		state = streamOutSendData
+	case s.outclosed.shouldSend(): // STREAM frame with FIN bit, all data already sent
+		state = streamOutSendMeta
+	case s.outopened.shouldSend(): // STREAM frame with no data
+		state = streamOutSendMeta
 	}
-	const mask = streamOutDone | streamOutSend
+	const mask = streamOutDone | streamOutSendMeta | streamOutSendData
 	return s.state.set(state, mask)
 }
 
@@ -678,6 +723,7 @@
 	for {
 		// STREAM
 		off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto)
+		size = min(size, s.conn.streams.outflow.avail())
 		fin := s.outclosed.isSet() && off+size == s.out.end
 		shouldSend := size > 0 || // have data to send
 			s.outopened.shouldSendPTO(pto) || // should open the stream
@@ -690,6 +736,7 @@
 			return false
 		}
 		s.out.copy(off, b)
+		s.conn.streams.outflow.consume(int64(len(b)))
 		s.outunsent.sub(off, off+int64(len(b)))
 		s.frameOpensStream(pnum)
 		if fin {
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index b014852..7501196 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -1270,3 +1270,11 @@
 	p.initialMaxStreamDataBidiLocal = maxVarint
 	p.initialMaxStreamDataUni = maxVarint
 }
+
+func makeTestData(n int) []byte {
+	b := make([]byte, n)
+	for i := 0; i < n; i++ {
+		b[i] = byte(i)
+	}
+	return b
+}