http2: refund connection flow control on DATA frames received after reset

If the transport had previously sent a RST_STREAM but had not yet
deleted the stream from its list of active streams, we should refund
connection-level flow control for any DATA frame received as such
DATA frames will never be read.

We already refund connection-level flow control if a stream closes with
unread data in bufPipe. However, when we receive a DATA frame after
reset, we don't bother writing it to bufPipe, so we have to refund the
flow control separately.

Updates golang/go#20469

Change-Id: I5a9810a5d6b1bd7e291173af53646246545a6665
Reviewed-on: https://go-review.googlesource.com/46591
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/http2/transport.go b/http2/transport.go
index 24d0af8..850d7ae 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -1713,16 +1713,27 @@
 		}
 		// Return any padded flow control now, since we won't
 		// refund it later on body reads.
-		if pad := int32(f.Length) - int32(len(data)); pad > 0 {
-			cs.inflow.add(pad)
-			cc.inflow.add(pad)
+		var refund int
+		if pad := int(f.Length) - len(data); pad > 0 {
+			refund += pad
+		}
+		// Return len(data) now if the stream is already closed,
+		// since data will never be read.
+		didReset := cs.didReset
+		if didReset {
+			refund += len(data)
+		}
+		if refund > 0 {
+			cc.inflow.add(int32(refund))
 			cc.wmu.Lock()
-			cc.fr.WriteWindowUpdate(0, uint32(pad))
-			cc.fr.WriteWindowUpdate(cs.ID, uint32(pad))
+			cc.fr.WriteWindowUpdate(0, uint32(refund))
+			if !didReset {
+				cs.inflow.add(int32(refund))
+				cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
+			}
 			cc.bw.Flush()
 			cc.wmu.Unlock()
 		}
-		didReset := cs.didReset
 		cc.mu.Unlock()
 
 		if len(data) > 0 && !didReset {
diff --git a/http2/transport_test.go b/http2/transport_test.go
index bf34fc9..15dfa07 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -2210,12 +2210,11 @@
 	ct.run()
 }
 
-// See golang.org/issue/16481
-func TestTransportReturnsUnusedFlowControl(t *testing.T) {
+func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) {
 	ct := newClientTester(t)
 
-	clientClosed := make(chan bool, 1)
-	serverWroteBody := make(chan bool, 1)
+	clientClosed := make(chan struct{})
+	serverWroteFirstByte := make(chan struct{})
 
 	ct.client = func() error {
 		req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
@@ -2223,13 +2222,13 @@
 		if err != nil {
 			return err
 		}
-		<-serverWroteBody
+		<-serverWroteFirstByte
 
 		if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 {
 			return fmt.Errorf("body read = %v, %v; want 1, nil", n, err)
 		}
 		res.Body.Close() // leaving 4999 bytes unread
-		clientClosed <- true
+		close(clientClosed)
 
 		return nil
 	}
@@ -2264,10 +2263,27 @@
 			EndStream:     false,
 			BlockFragment: buf.Bytes(),
 		})
-		ct.fr.WriteData(hf.StreamID, false, make([]byte, 5000)) // without ending stream
-		serverWroteBody <- true
 
-		<-clientClosed
+		// Two cases:
+		// - Send one DATA frame with 5000 bytes.
+		// - Send two DATA frames with 1 and 4999 bytes each.
+		//
+		// In both cases, the client should consume one byte of data,
+		// refund that byte, then refund the following 4999 bytes.
+		//
+		// In the second case, the server waits for the client connection to
+		// close before seconding the second DATA frame. This tests the case
+		// where the client receives a DATA frame after it has reset the stream.
+		if oneDataFrame {
+			ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 5000))
+			close(serverWroteFirstByte)
+			<-clientClosed
+		} else {
+			ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 1))
+			close(serverWroteFirstByte)
+			<-clientClosed
+			ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 4999))
+		}
 
 		waitingFor := "RSTStreamFrame"
 		for {
@@ -2281,7 +2297,7 @@
 			switch waitingFor {
 			case "RSTStreamFrame":
 				if rf, ok := f.(*RSTStreamFrame); !ok || rf.ErrCode != ErrCodeCancel {
-					return fmt.Errorf("Expected a WindowUpdateFrame with code cancel; got %v", summarizeFrame(f))
+					return fmt.Errorf("Expected a RSTStreamFrame with code cancel; got %v", summarizeFrame(f))
 				}
 				waitingFor = "WindowUpdateFrame"
 			case "WindowUpdateFrame":
@@ -2295,6 +2311,16 @@
 	ct.run()
 }
 
+// See golang.org/issue/16481
+func TestTransportReturnsUnusedFlowControlSingleWrite(t *testing.T) {
+	testTransportReturnsUnusedFlowControl(t, true)
+}
+
+// See golang.org/issue/20469
+func TestTransportReturnsUnusedFlowControlMultipleWrites(t *testing.T) {
+	testTransportReturnsUnusedFlowControl(t, false)
+}
+
 // Issue 16612: adjust flow control on open streams when transport
 // receives SETTINGS with INITIAL_WINDOW_SIZE from server.
 func TestTransportAdjustsFlowControl(t *testing.T) {