http2: refund connection flow control on DATA frames received after reset
If the transport had previously sent a RST_STREAM but had not yet
deleted the stream from its list of active streams, we should refund
connection-level flow control for any DATA frame received as such
DATA frames will never be read.
We already refund connection-level flow control if a stream closes with
unread data in bufPipe. However, when we receive a DATA frame after
reset, we don't bother writing it to bufPipe, so we have to refund the
flow control separately.
Updates golang/go#20469
Change-Id: I5a9810a5d6b1bd7e291173af53646246545a6665
Reviewed-on: https://go-review.googlesource.com/46591
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/http2/transport.go b/http2/transport.go
index 24d0af8..850d7ae 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -1713,16 +1713,27 @@
}
// Return any padded flow control now, since we won't
// refund it later on body reads.
- if pad := int32(f.Length) - int32(len(data)); pad > 0 {
- cs.inflow.add(pad)
- cc.inflow.add(pad)
+ var refund int
+ if pad := int(f.Length) - len(data); pad > 0 {
+ refund += pad
+ }
+ // Return len(data) now if the stream is already closed,
+ // since data will never be read.
+ didReset := cs.didReset
+ if didReset {
+ refund += len(data)
+ }
+ if refund > 0 {
+ cc.inflow.add(int32(refund))
cc.wmu.Lock()
- cc.fr.WriteWindowUpdate(0, uint32(pad))
- cc.fr.WriteWindowUpdate(cs.ID, uint32(pad))
+ cc.fr.WriteWindowUpdate(0, uint32(refund))
+ if !didReset {
+ cs.inflow.add(int32(refund))
+ cc.fr.WriteWindowUpdate(cs.ID, uint32(refund))
+ }
cc.bw.Flush()
cc.wmu.Unlock()
}
- didReset := cs.didReset
cc.mu.Unlock()
if len(data) > 0 && !didReset {
diff --git a/http2/transport_test.go b/http2/transport_test.go
index bf34fc9..15dfa07 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -2210,12 +2210,11 @@
ct.run()
}
-// See golang.org/issue/16481
-func TestTransportReturnsUnusedFlowControl(t *testing.T) {
+func testTransportReturnsUnusedFlowControl(t *testing.T, oneDataFrame bool) {
ct := newClientTester(t)
- clientClosed := make(chan bool, 1)
- serverWroteBody := make(chan bool, 1)
+ clientClosed := make(chan struct{})
+ serverWroteFirstByte := make(chan struct{})
ct.client = func() error {
req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
@@ -2223,13 +2222,13 @@
if err != nil {
return err
}
- <-serverWroteBody
+ <-serverWroteFirstByte
if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 {
return fmt.Errorf("body read = %v, %v; want 1, nil", n, err)
}
res.Body.Close() // leaving 4999 bytes unread
- clientClosed <- true
+ close(clientClosed)
return nil
}
@@ -2264,10 +2263,27 @@
EndStream: false,
BlockFragment: buf.Bytes(),
})
- ct.fr.WriteData(hf.StreamID, false, make([]byte, 5000)) // without ending stream
- serverWroteBody <- true
- <-clientClosed
+ // Two cases:
+ // - Send one DATA frame with 5000 bytes.
+ // - Send two DATA frames with 1 and 4999 bytes each.
+ //
+ // In both cases, the client should consume one byte of data,
+ // refund that byte, then refund the following 4999 bytes.
+ //
+ // In the second case, the server waits for the client connection to
+ // close before seconding the second DATA frame. This tests the case
+ // where the client receives a DATA frame after it has reset the stream.
+ if oneDataFrame {
+ ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 5000))
+ close(serverWroteFirstByte)
+ <-clientClosed
+ } else {
+ ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 1))
+ close(serverWroteFirstByte)
+ <-clientClosed
+ ct.fr.WriteData(hf.StreamID, false /* don't end stream */, make([]byte, 4999))
+ }
waitingFor := "RSTStreamFrame"
for {
@@ -2281,7 +2297,7 @@
switch waitingFor {
case "RSTStreamFrame":
if rf, ok := f.(*RSTStreamFrame); !ok || rf.ErrCode != ErrCodeCancel {
- return fmt.Errorf("Expected a WindowUpdateFrame with code cancel; got %v", summarizeFrame(f))
+ return fmt.Errorf("Expected a RSTStreamFrame with code cancel; got %v", summarizeFrame(f))
}
waitingFor = "WindowUpdateFrame"
case "WindowUpdateFrame":
@@ -2295,6 +2311,16 @@
ct.run()
}
+// See golang.org/issue/16481
+func TestTransportReturnsUnusedFlowControlSingleWrite(t *testing.T) {
+ testTransportReturnsUnusedFlowControl(t, true)
+}
+
+// See golang.org/issue/20469
+func TestTransportReturnsUnusedFlowControlMultipleWrites(t *testing.T) {
+ testTransportReturnsUnusedFlowControl(t, false)
+}
+
// Issue 16612: adjust flow control on open streams when transport
// receives SETTINGS with INITIAL_WINDOW_SIZE from server.
func TestTransportAdjustsFlowControl(t *testing.T) {