http2: properly discard data received after request/response body is closed

A server handler can close an inbound Request.Body to indicate that it
is not interested in the remainder of the request body.

Equivalently, a client can close a Response.Body indicate that it is
not interesed in the remainder of the response body.

In both cases, if we receive DATA frames from the peer for the stream,
we should return connection-level flow control credit for the discarded data.
We do not return stream-level flow control, since we don't want to unblock
further sends of data that we're just going to discard.

Closing either a Response.Body or an inbound Request.Body results in a
pipe.BreakWithError. Reads from a broken pipe fail immediately.

Previously, writes to a broken pipe would succeed, discarding the written
data and incrementing the pipe's unread count. Silently discarding
data written to a broken pipe results in both the Transport and Server
failing to detect the condition where data has been discarded.

Change pipes to return an error when writing to a broken pipe.

Change transportResponseBody.Close to break the response body before
returning flow control credit for unread data in the pipe, avoiding
a race condition where data is added to the pipe in between the
return of flow control credit and the pipe breaking.

Change the Server to treat an error writing to the inbound request
body as an expected condition (since this only happens when a
handler closes the request body), returning connection-level
flow control credit for the discarded data.

Fixes golang/go#57578

Change-Id: I1ed4ea9865818f9c7d7eb4500edfd7556e3cbcbf
Reviewed-on: https://go-review.googlesource.com/c/net/+/475135
Run-TryBot: Damien Neil <dneil@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Roland Shoemaker <roland@golang.org>
diff --git a/http2/pipe.go b/http2/pipe.go
index c15b8a7..684d984 100644
--- a/http2/pipe.go
+++ b/http2/pipe.go
@@ -88,13 +88,9 @@
 		p.c.L = &p.mu
 	}
 	defer p.c.Signal()
-	if p.err != nil {
+	if p.err != nil || p.breakErr != nil {
 		return 0, errClosedPipeWrite
 	}
-	if p.breakErr != nil {
-		p.unread += len(d)
-		return len(d), nil // discard when there is no reader
-	}
 	return p.b.Write(d)
 }
 
diff --git a/http2/pipe_test.go b/http2/pipe_test.go
index 83d2dfd..67562a9 100644
--- a/http2/pipe_test.go
+++ b/http2/pipe_test.go
@@ -125,14 +125,14 @@
 	if p.Len() != 3 {
 		t.Errorf("pipe should have 3 unread bytes")
 	}
-	// Write should succeed silently.
-	if n, err := p.Write([]byte("abc")); err != nil || n != 3 {
-		t.Errorf("Write(abc) after break\ngot %v, %v\nwant 0, nil", n, err)
+	// Write should fail.
+	if n, err := p.Write([]byte("abc")); err != errClosedPipeWrite || n != 0 {
+		t.Errorf("Write(abc) after break\ngot %v, %v\nwant 0, errClosedPipeWrite", n, err)
 	}
 	if p.b != nil {
 		t.Errorf("buffer should be nil after Write")
 	}
-	if p.Len() != 6 {
+	if p.Len() != 3 {
 		t.Errorf("pipe should have 6 unread bytes")
 	}
 	// Read should fail.
diff --git a/http2/server.go b/http2/server.go
index 8cb14f3..cd057f3 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -1822,15 +1822,18 @@
 		}
 
 		if len(data) > 0 {
+			st.bodyBytes += int64(len(data))
 			wrote, err := st.body.Write(data)
 			if err != nil {
+				// The handler has closed the request body.
+				// Return the connection-level flow control for the discarded data,
+				// but not the stream-level flow control.
 				sc.sendWindowUpdate(nil, int(f.Length)-wrote)
-				return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed))
+				return nil
 			}
 			if wrote != len(data) {
 				panic("internal error: bad Writer")
 			}
-			st.bodyBytes += int64(len(data))
 		}
 
 		// Return any padded flow control now, since we won't
diff --git a/http2/server_test.go b/http2/server_test.go
index d32b2d8..40ab750 100644
--- a/http2/server_test.go
+++ b/http2/server_test.go
@@ -3906,6 +3906,32 @@
 	}
 }
 
+func TestServerReturnsStreamAndConnFlowControlOnBodyClose(t *testing.T) {
+	unblockHandler := make(chan struct{})
+	defer close(unblockHandler)
+
+	st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+		r.Body.Close()
+		w.WriteHeader(200)
+		w.(http.Flusher).Flush()
+		<-unblockHandler
+	})
+	defer st.Close()
+
+	st.greet()
+	st.writeHeaders(HeadersFrameParam{
+		StreamID:      1,
+		BlockFragment: st.encodeHeader(),
+		EndHeaders:    true,
+	})
+	st.wantHeaders()
+	const size = inflowMinRefresh // enough to trigger flow control return
+	st.writeData(1, false, make([]byte, size))
+	st.wantWindowUpdate(0, size) // conn-level flow control is returned
+	unblockHandler <- struct{}{}
+	st.wantData()
+}
+
 func TestServerIdleTimeout(t *testing.T) {
 	if testing.Short() {
 		t.Skip("skipping in short mode")
diff --git a/http2/transport.go b/http2/transport.go
index 05ba23d..c9e1115 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -2555,6 +2555,9 @@
 	cs := b.cs
 	cc := cs.cc
 
+	cs.bufPipe.BreakWithError(errClosedResponseBody)
+	cs.abortStream(errClosedResponseBody)
+
 	unread := cs.bufPipe.Len()
 	if unread > 0 {
 		cc.mu.Lock()
@@ -2573,9 +2576,6 @@
 		cc.wmu.Unlock()
 	}
 
-	cs.bufPipe.BreakWithError(errClosedResponseBody)
-	cs.abortStream(errClosedResponseBody)
-
 	select {
 	case <-cs.donec:
 	case <-cs.ctx.Done():