quic: fix several bugs in flow control accounting
Connection-level flow control sets a bound on the total maximum
stream offset of all data sent, not the total amount of bytes sent
in STREAM frames. For example, if we send the bytes [0,10) for a
stream, and then retransmit the same bytes due to packet loss,
that consumes 10 bytes of connection-level flow, not 20.
We were incorrectly tracking total bytes sent. Fix this.
We were blocking retransmission of data in lost STREAM frames
on availability of connection-level flow control.
We now place a stream with retransmitted data on queueMeta
(non-flow-controlled data), since we have already
accounted for the flow control window consumption of the
data.
We were incorrectly marking a stream as being able to send
an empty STREAM frame with a FIN bit, when the stream was
actually blocked on stream-level flow control. Fix this.
For golang/go#58547
Change-Id: Ib2ace94183750078a19d945256507060ea786735
Reviewed-on: https://go-review.googlesource.com/c/net/+/532716
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go
index d5ee74e..03e0757 100644
--- a/internal/quic/conn_flow_test.go
+++ b/internal/quic/conn_flow_test.go
@@ -394,3 +394,37 @@
data: data,
})
}
+
+func TestConnOutflowResentData(t *testing.T) {
+ tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream,
+ permissiveTransportParameters,
+ func(p *transportParameters) {
+ p.initialMaxData = 10
+ })
+ tc.ignoreFrame(frameTypeAck)
+
+ data := makeTestData(15)
+ s.Write(data[:8])
+ tc.wantFrame("data is under MAX_DATA limit, all sent",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: data[:8],
+ })
+
+ // Lose the last STREAM packet.
+ const pto = false
+ tc.triggerLossOrPTO(packetType1RTT, false)
+ tc.wantFrame("lost STREAM data is retransmitted",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: data[:8],
+ })
+
+ s.Write(data[8:])
+ tc.wantFrame("new data is sent up to the MAX_DATA limit",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 8,
+ data: data[8:10],
+ })
+}
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 9310811..89036b1 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -39,6 +39,7 @@
outgate gate
out pipe // buffered data to send
outwin int64 // maximum MAX_STREAM_DATA received from the peer
+ outmaxsent int64 // maximum data offset we've sent to the peer
outmaxbuf int64 // maximum amount of data we will buffer
outunsent rangeset[int64] // ranges buffered but not yet sent
outacked rangeset[int64] // ranges sent and acknowledged
@@ -494,8 +495,12 @@
case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED
state = streamOutSendMeta
case len(s.outunsent) > 0: // STREAM frame with data
- state = streamOutSendData
- case s.outclosed.shouldSend(): // STREAM frame with FIN bit, all data already sent
+ if s.outunsent.min() < s.outmaxsent {
+ state = streamOutSendMeta // resent data, will not consume flow control
+ } else {
+ state = streamOutSendData // new data, requires flow control
+ }
+ case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit
state = streamOutSendMeta
case s.outopened.shouldSend(): // STREAM frame with no data
state = streamOutSendMeta
@@ -725,7 +730,11 @@
for {
// STREAM
off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto)
- size = min(size, s.conn.streams.outflow.avail())
+ if end := off + size; end > s.outmaxsent {
+ // This will require connection-level flow control to send.
+ end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
+ size = end - off
+ }
fin := s.outclosed.isSet() && off+size == s.out.end
shouldSend := size > 0 || // have data to send
s.outopened.shouldSendPTO(pto) || // should open the stream
@@ -738,8 +747,12 @@
return false
}
s.out.copy(off, b)
- s.conn.streams.outflow.consume(int64(len(b)))
- s.outunsent.sub(off, off+int64(len(b)))
+ end := off + int64(len(b))
+ if end > s.outmaxsent {
+ s.conn.streams.outflow.consume(end - s.outmaxsent)
+ s.outmaxsent = end
+ }
+ s.outunsent.sub(off, end)
s.frameOpensStream(pnum)
if fin {
s.outclosed.setSent(pnum)
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index 86eebc6..7c1377f 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -1094,6 +1094,44 @@
}
}
+func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) {
+ ctx := canceledContext()
+ tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters,
+ func(p *transportParameters) {
+ //p.initialMaxData = 0
+ p.initialMaxStreamDataUni = 0
+ })
+ tc.ignoreFrame(frameTypeStreamDataBlocked)
+ if _, err := s.WriteContext(ctx, []byte{0, 1}); err != nil {
+ t.Fatalf("s.Write = %v", err)
+ }
+ s.CloseWrite()
+ tc.wantIdle("stream write is blocked by flow control")
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 1,
+ })
+ tc.wantFrame("send data up to flow control limit",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ data: []byte{0},
+ })
+ tc.wantIdle("stream write is again blocked by flow control")
+
+ tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+ id: s.id,
+ max: 2,
+ })
+ tc.wantFrame("send remaining data and FIN",
+ packetType1RTT, debugFrameStream{
+ id: s.id,
+ off: 1,
+ data: []byte{1},
+ fin: true,
+ })
+}
+
func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
testStreamTypes(t, "", func(t *testing.T, styp streamType) {
ctx := canceledContext()