http2: return flow control for closed streams
For both the server and the transport, return connection-level flow control in
two cases: 1) when a stream is closed with buffered data not read by the user,
or 2) when a DATA frame arrives but there the stream has since been closed.
Fixes golang/go#16481
Change-Id: Ic7404180ed04a2903e8fd6e9599a907f88b4f72e
Reviewed-on: https://go-review.googlesource.com/25231
Reviewed-by: Andrew Gerrand <adg@golang.org>
diff --git a/http2/server.go b/http2/server.go
index f368738..dbe6c87 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -1176,6 +1176,10 @@
}
delete(sc.streams, st.id)
if p := st.body; p != nil {
+ // Return any buffered unread bytes worth of conn-level flow control.
+ // See golang.org/issue/16481
+ sc.sendWindowUpdate(nil, p.Len())
+
p.CloseWithError(err)
}
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
@@ -1277,6 +1281,8 @@
func (sc *serverConn) processData(f *DataFrame) error {
sc.serveG.check()
+ data := f.Data()
+
// "If a DATA frame is received whose stream is not in "open"
// or "half closed (local)" state, the recipient MUST respond
// with a stream error (Section 5.4.2) of type STREAM_CLOSED."
@@ -1288,12 +1294,25 @@
// the http.Handler returned, so it's done reading &
// done writing). Try to stop the client from sending
// more DATA.
+
+ // But still enforce their connection-level flow control,
+ // and return any flow control bytes since we're not going
+ // to consume them.
+ if int(sc.inflow.available()) < len(data) {
+ return StreamError{id, ErrCodeFlowControl}
+ }
+ // Deduct the flow control from inflow, since we're
+ // going to immediately add it back in
+ // sendWindowUpdate, which also schedules sending the
+ // frames.
+ sc.inflow.take(int32(len(data)))
+ sc.sendWindowUpdate(nil, len(data)) // conn-level
+
return StreamError{id, ErrCodeStreamClosed}
}
if st.body == nil {
panic("internal error: should have a body in this state")
}
- data := f.Data()
// Sender sending more than they'd declared?
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
diff --git a/http2/server_test.go b/http2/server_test.go
index a45905f..ac4d351 100644
--- a/http2/server_test.go
+++ b/http2/server_test.go
@@ -2167,6 +2167,9 @@
// it did before.
st.writeData(1, true, []byte("foo"))
+ // Get our flow control bytes back, since the handler didn't get them.
+ st.wantWindowUpdate(0, uint32(len("foo")))
+
// Sent after a peer sends data anyway (admittedly the
// previous RST_STREAM might've still been in-flight),
// but they'll get the more friendly 'cancel' code
@@ -3301,3 +3304,43 @@
t.Fatalf("second msg = %q; want %q", buf, msg2)
}
}
+
+type funcReader func([]byte) (n int, err error)
+
+func (f funcReader) Read(p []byte) (n int, err error) { return f(p) }
+
+// golang.org/issue/16481 -- return flow control when streams close with unread data.
+// (The Server version of the bug. See also TestUnreadFlowControlReturned_Transport)
+func TestUnreadFlowControlReturned_Server(t *testing.T) {
+ unblock := make(chan bool, 1)
+ defer close(unblock)
+
+ st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+ // Don't read the 16KB request body. Wait until the client's
+ // done sending it and then return. This should cause the Server
+ // to then return those 16KB of flow control to the client.
+ <-unblock
+ }, optOnlyServer)
+ defer st.Close()
+
+ tr := &Transport{TLSClientConfig: tlsConfigInsecure}
+ defer tr.CloseIdleConnections()
+
+ // This previously hung on the 4th iteration.
+ for i := 0; i < 6; i++ {
+ body := io.MultiReader(
+ io.LimitReader(neverEnding('A'), 16<<10),
+ funcReader(func([]byte) (n int, err error) {
+ unblock <- true
+ return 0, io.EOF
+ }),
+ )
+ req, _ := http.NewRequest("POST", st.ts.URL, body)
+ res, err := tr.RoundTrip(req)
+ if err != nil {
+ t.Fatal(err)
+ }
+ res.Body.Close()
+ }
+
+}
diff --git a/http2/transport.go b/http2/transport.go
index 642f256..b6f6f95 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -1537,10 +1537,27 @@
func (b transportResponseBody) Close() error {
cs := b.cs
- if cs.bufPipe.Err() != io.EOF {
- // TODO: write test for this
- cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+ cc := cs.cc
+
+ serverSentStreamEnd := cs.bufPipe.Err() == io.EOF
+ unread := cs.bufPipe.Len()
+
+ if unread > 0 || !serverSentStreamEnd {
+ cc.mu.Lock()
+ cc.wmu.Lock()
+ if !serverSentStreamEnd {
+ cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
+ }
+ // Return connection-level flow control.
+ if unread > 0 {
+ cc.inflow.add(int32(unread))
+ cc.fr.WriteWindowUpdate(0, uint32(unread))
+ }
+ cc.bw.Flush()
+ cc.wmu.Unlock()
+ cc.mu.Unlock()
}
+
cs.bufPipe.BreakWithError(errClosedResponseBody)
return nil
}
@@ -1548,6 +1565,7 @@
func (rl *clientConnReadLoop) processData(f *DataFrame) error {
cc := rl.cc
cs := cc.streamByID(f.StreamID, f.StreamEnded())
+ data := f.Data()
if cs == nil {
cc.mu.Lock()
neverSent := cc.nextStreamID
@@ -1561,9 +1579,17 @@
// TODO: be stricter here? only silently ignore things which
// we canceled, but not things which were closed normally
// by the peer? Tough without accumulating too much state.
+
+ // But at least return their flow control:
+ if len(data) > 0 {
+ cc.wmu.Lock()
+ cc.fr.WriteWindowUpdate(0, uint32(len(data)))
+ cc.bw.Flush()
+ cc.wmu.Unlock()
+ }
return nil
}
- if data := f.Data(); len(data) > 0 {
+ if len(data) > 0 {
if cs.bufPipe.b == nil {
// Data frame after it's already closed?
cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
@@ -1730,8 +1756,10 @@
}
func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, err error) {
- // TODO: do something with err? send it as a debug frame to the peer?
- // But that's only in GOAWAY. Invent a new frame type? Is there one already?
+ // TODO: map err to more interesting error codes, once the
+ // HTTP community comes up with some. But currently for
+ // RST_STREAM there's no equivalent to GOAWAY frame's debug
+ // data, and the error codes are all pretty vague ("cancel").
cc.wmu.Lock()
cc.fr.WriteRSTStream(streamID, code)
cc.bw.Flush()
diff --git a/http2/transport_test.go b/http2/transport_test.go
index 5887714..f22eeca 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -2097,3 +2097,83 @@
}
ct.run()
}
+
+// See golang.org/issue/16481
+func TestTransportReturnsUnusedFlowControl(t *testing.T) {
+ ct := newClientTester(t)
+
+ clientClosed := make(chan bool, 1)
+ serverWroteBody := make(chan bool, 1)
+
+ ct.client = func() error {
+ req, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
+ res, err := ct.tr.RoundTrip(req)
+ if err != nil {
+ return err
+ }
+ <-serverWroteBody
+
+ if n, err := res.Body.Read(make([]byte, 1)); err != nil || n != 1 {
+ return fmt.Errorf("body read = %v, %v; want 1, nil", n, err)
+ }
+ res.Body.Close() // leaving 4999 bytes unread
+ clientClosed <- true
+
+ return nil
+ }
+ ct.server = func() error {
+ ct.greet()
+
+ var hf *HeadersFrame
+ for {
+ f, err := ct.fr.ReadFrame()
+ if err != nil {
+ return fmt.Errorf("ReadFrame while waiting for Headers: %v", err)
+ }
+ switch f.(type) {
+ case *WindowUpdateFrame, *SettingsFrame:
+ continue
+ }
+ var ok bool
+ hf, ok = f.(*HeadersFrame)
+ if !ok {
+ return fmt.Errorf("Got %T; want HeadersFrame", f)
+ }
+ break
+ }
+
+ var buf bytes.Buffer
+ enc := hpack.NewEncoder(&buf)
+ enc.WriteField(hpack.HeaderField{Name: ":status", Value: "200"})
+ enc.WriteField(hpack.HeaderField{Name: "content-length", Value: "5000"})
+ ct.fr.WriteHeaders(HeadersFrameParam{
+ StreamID: hf.StreamID,
+ EndHeaders: true,
+ EndStream: false,
+ BlockFragment: buf.Bytes(),
+ })
+ ct.fr.WriteData(hf.StreamID, false, make([]byte, 5000)) // without ending stream
+ serverWroteBody <- true
+
+ <-clientClosed
+
+ f, err := ct.fr.ReadFrame()
+ if err != nil {
+ return fmt.Errorf("ReadFrame while waiting for RSTStreamFrame: %v", err)
+ }
+ if rf, ok := f.(*RSTStreamFrame); !ok || rf.ErrCode != ErrCodeCancel {
+ return fmt.Errorf("Expected a WindowUpdateFrame with code cancel; got %v", summarizeFrame(f))
+ }
+
+ // And wait for our flow control tokens back:
+ f, err = ct.fr.ReadFrame()
+ if err != nil {
+ return fmt.Errorf("ReadFrame while waiting for WindowUpdateFrame: %v", err)
+ }
+ if wuf, ok := f.(*WindowUpdateFrame); !ok || wuf.Increment != 4999 {
+ return fmt.Errorf("Expected WindowUpdateFrame for 4999 bytes; got %v", summarizeFrame(f))
+ }
+ return nil
+ }
+ ct.run()
+}