http2: Discard DATA frames from the server after the response body is closed

After a response body is closed, we keep writing to the bufPipe. This
accumulates bytes that will never be read, wasting memory. The fix is to
discard the buffer on pipe.BreakWithError.

Updates golang/go#20448

Change-Id: Ia2cf46cb8c401fd8091ef3785eb48fe7b188bb57
Reviewed-on: https://go-review.googlesource.com/43810
Run-TryBot: Tom Bergan <tombergan@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/http2/pipe.go b/http2/pipe.go
index 914aaf8..0b9848b 100644
--- a/http2/pipe.go
+++ b/http2/pipe.go
@@ -15,8 +15,8 @@
 // underlying buffer is an interface. (io.Pipe is always unbuffered)
 type pipe struct {
 	mu       sync.Mutex
-	c        sync.Cond // c.L lazily initialized to &p.mu
-	b        pipeBuffer
+	c        sync.Cond     // c.L lazily initialized to &p.mu
+	b        pipeBuffer    // nil when done reading
 	err      error         // read error once empty. non-nil means closed.
 	breakErr error         // immediate read error (caller doesn't see rest of b)
 	donec    chan struct{} // closed on error
@@ -32,6 +32,9 @@
 func (p *pipe) Len() int {
 	p.mu.Lock()
 	defer p.mu.Unlock()
+	if p.b == nil {
+		return 0
+	}
 	return p.b.Len()
 }
 
@@ -55,6 +58,7 @@
 				p.readFn()     // e.g. copy trailers
 				p.readFn = nil // not sticky like p.err
 			}
+			p.b = nil
 			return 0, p.err
 		}
 		p.c.Wait()
@@ -75,6 +79,9 @@
 	if p.err != nil {
 		return 0, errClosedPipeWrite
 	}
+	if p.breakErr != nil {
+		return len(d), nil // discard when there is no reader
+	}
 	return p.b.Write(d)
 }
 
@@ -109,6 +116,9 @@
 		return
 	}
 	p.readFn = fn
+	if dst == &p.breakErr {
+		p.b = nil
+	}
 	*dst = err
 	p.closeDoneLocked()
 }
diff --git a/http2/pipe_test.go b/http2/pipe_test.go
index 7632299..82bf9a3 100644
--- a/http2/pipe_test.go
+++ b/http2/pipe_test.go
@@ -92,6 +92,10 @@
 	if err != a {
 		t.Logf("read error = %v, %v", err, a)
 	}
+	// Write should fail.
+	if n, err := p.Write([]byte("abc")); err != errClosedPipeWrite || n != 0 {
+		t.Errorf("Write(abc) after close\ngot =%v, %v\nwant 0, %v", n, err, errClosedPipeWrite)
+	}
 }
 
 func TestPipeBreakWithError(t *testing.T) {
@@ -106,4 +110,14 @@
 	if err != a {
 		t.Logf("read error = %v, %v", err, a)
 	}
+	if p.b != nil {
+		t.Errorf("buffer should be nil after BreakWithError")
+	}
+	// Write should succeed silently.
+	if n, err := p.Write([]byte("abc")); err != nil || n != 3 {
+		t.Errorf("Write(abc) after break\ngot =%v, %v\nwant 0, nil", n, err)
+	}
+	if p.b != nil {
+		t.Errorf("buffer should be nil after Write")
+	}
 }
diff --git a/http2/transport.go b/http2/transport.go
index 84d042d..3a85f25 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -1655,6 +1655,7 @@
 		cc.wmu.Lock()
 		if !serverSentStreamEnd {
 			cc.fr.WriteRSTStream(cs.ID, ErrCodeCancel)
+			cs.didReset = true
 		}
 		// Return connection-level flow control.
 		if unread > 0 {
@@ -1702,12 +1703,6 @@
 		return nil
 	}
 	if f.Length > 0 {
-		if len(data) > 0 && cs.bufPipe.b == nil {
-			// Data frame after it's already closed?
-			cc.logf("http2: Transport received DATA frame for closed stream; closing connection")
-			return ConnectionError(ErrCodeProtocol)
-		}
-
 		// Check connection-level flow control.
 		cc.mu.Lock()
 		if cs.inflow.available() >= int32(f.Length) {
diff --git a/http2/transport_test.go b/http2/transport_test.go
index 0b4d375..128ed0d 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -2959,8 +2959,7 @@
 		t.Fatalf("res.Body = %T; want transportResponseBody", res.Body)
 	}
 	if trb.cs.bufPipe.b != nil {
-		// TODO(tombergan,bradfitz): turn this into an error:
-		t.Logf("response body pipe is still open")
+		t.Errorf("response body pipe is still open")
 	}
 
 	gotErr := <-writeErr