quic: fix several bugs in flow control accounting

Connection-level flow control sets a bound on the total maximum
stream offset of all data sent, not the total amount of bytes sent
in STREAM frames. For example, if we send the bytes [0,10) for a
stream, and then retransmit the same bytes due to packet loss,
that consumes 10 bytes of connection-level flow, not 20.

We were incorrectly tracking total bytes sent. Fix this.

We were blocking retransmission of data in lost STREAM frames
on availability of connection-level flow control.
We now place a stream with retransmitted data on queueMeta
(non-flow-controlled data), since we have already
accounted for the flow control window consumption of the
data.

We were incorrectly marking a stream as being able to send
an empty STREAM frame with a FIN bit, when the stream was
actually blocked on stream-level flow control. Fix this.

For golang/go#58547

Change-Id: Ib2ace94183750078a19d945256507060ea786735
Reviewed-on: https://go-review.googlesource.com/c/net/+/532716
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Jonathan Amsterdam <jba@google.com>
diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go
index d5ee74e..03e0757 100644
--- a/internal/quic/conn_flow_test.go
+++ b/internal/quic/conn_flow_test.go
@@ -394,3 +394,37 @@
 			data: data,
 		})
 }
+
+func TestConnOutflowResentData(t *testing.T) {
+	tc, s := newTestConnAndLocalStream(t, clientSide, bidiStream,
+		permissiveTransportParameters,
+		func(p *transportParameters) {
+			p.initialMaxData = 10
+		})
+	tc.ignoreFrame(frameTypeAck)
+
+	data := makeTestData(15)
+	s.Write(data[:8])
+	tc.wantFrame("data is under MAX_DATA limit, all sent",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			data: data[:8],
+		})
+
+	// Lose the last STREAM packet.
+	const pto = false
+	tc.triggerLossOrPTO(packetType1RTT, false)
+	tc.wantFrame("lost STREAM data is retransmitted",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			data: data[:8],
+		})
+
+	s.Write(data[8:])
+	tc.wantFrame("new data is sent up to the MAX_DATA limit",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  8,
+			data: data[8:10],
+		})
+}
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 9310811..89036b1 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -39,6 +39,7 @@
 	outgate      gate
 	out          pipe            // buffered data to send
 	outwin       int64           // maximum MAX_STREAM_DATA received from the peer
+	outmaxsent   int64           // maximum data offset we've sent to the peer
 	outmaxbuf    int64           // maximum amount of data we will buffer
 	outunsent    rangeset[int64] // ranges buffered but not yet sent
 	outacked     rangeset[int64] // ranges sent and acknowledged
@@ -494,8 +495,12 @@
 	case s.outblocked.shouldSend(): // STREAM_DATA_BLOCKED
 		state = streamOutSendMeta
 	case len(s.outunsent) > 0: // STREAM frame with data
-		state = streamOutSendData
-	case s.outclosed.shouldSend(): // STREAM frame with FIN bit, all data already sent
+		if s.outunsent.min() < s.outmaxsent {
+			state = streamOutSendMeta // resent data, will not consume flow control
+		} else {
+			state = streamOutSendData // new data, requires flow control
+		}
+	case s.outclosed.shouldSend() && s.out.end == s.outmaxsent: // empty STREAM frame with FIN bit
 		state = streamOutSendMeta
 	case s.outopened.shouldSend(): // STREAM frame with no data
 		state = streamOutSendMeta
@@ -725,7 +730,11 @@
 	for {
 		// STREAM
 		off, size := dataToSend(min(s.out.start, s.outwin), min(s.out.end, s.outwin), s.outunsent, s.outacked, pto)
-		size = min(size, s.conn.streams.outflow.avail())
+		if end := off + size; end > s.outmaxsent {
+			// This will require connection-level flow control to send.
+			end = min(end, s.outmaxsent+s.conn.streams.outflow.avail())
+			size = end - off
+		}
 		fin := s.outclosed.isSet() && off+size == s.out.end
 		shouldSend := size > 0 || // have data to send
 			s.outopened.shouldSendPTO(pto) || // should open the stream
@@ -738,8 +747,12 @@
 			return false
 		}
 		s.out.copy(off, b)
-		s.conn.streams.outflow.consume(int64(len(b)))
-		s.outunsent.sub(off, off+int64(len(b)))
+		end := off + int64(len(b))
+		if end > s.outmaxsent {
+			s.conn.streams.outflow.consume(end - s.outmaxsent)
+			s.outmaxsent = end
+		}
+		s.outunsent.sub(off, end)
 		s.frameOpensStream(pnum)
 		if fin {
 			s.outclosed.setSent(pnum)
diff --git a/internal/quic/stream_test.go b/internal/quic/stream_test.go
index 86eebc6..7c1377f 100644
--- a/internal/quic/stream_test.go
+++ b/internal/quic/stream_test.go
@@ -1094,6 +1094,44 @@
 	}
 }
 
+func TestStreamCloseWriteWhenBlockedByStreamFlowControl(t *testing.T) {
+	ctx := canceledContext()
+	tc, s := newTestConnAndLocalStream(t, serverSide, uniStream, permissiveTransportParameters,
+		func(p *transportParameters) {
+			//p.initialMaxData = 0
+			p.initialMaxStreamDataUni = 0
+		})
+	tc.ignoreFrame(frameTypeStreamDataBlocked)
+	if _, err := s.WriteContext(ctx, []byte{0, 1}); err != nil {
+		t.Fatalf("s.Write = %v", err)
+	}
+	s.CloseWrite()
+	tc.wantIdle("stream write is blocked by flow control")
+
+	tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+		id:  s.id,
+		max: 1,
+	})
+	tc.wantFrame("send data up to flow control limit",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			data: []byte{0},
+		})
+	tc.wantIdle("stream write is again blocked by flow control")
+
+	tc.writeFrames(packetType1RTT, debugFrameMaxStreamData{
+		id:  s.id,
+		max: 2,
+	})
+	tc.wantFrame("send remaining data and FIN",
+		packetType1RTT, debugFrameStream{
+			id:   s.id,
+			off:  1,
+			data: []byte{1},
+			fin:  true,
+		})
+}
+
 func TestStreamPeerResetsWithUnreadAndUnsentData(t *testing.T) {
 	testStreamTypes(t, "", func(t *testing.T, styp streamType) {
 		ctx := canceledContext()