http2: track unread bytes when the pipe is broken

Once the pipe is broken, any remaining data needs to be reported as well
as any data that is written but dropped.

The client side flow control can eventually run out of available bytes
to be sent since no WINDOW_UPDATE is sent to reflect the data that is
never read in the pipe.

Updates golang/go#28634

Change-Id: I83f3c9d3614cd92517af2687489d2ccbf3a65456
Reviewed-on: https://go-review.googlesource.com/c/net/+/187377
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
diff --git a/http2/pipe.go b/http2/pipe.go
index a614009..2a5399e 100644
--- a/http2/pipe.go
+++ b/http2/pipe.go
@@ -17,6 +17,7 @@
 	mu       sync.Mutex
 	c        sync.Cond     // c.L lazily initialized to &p.mu
 	b        pipeBuffer    // nil when done reading
+	unread   int           // bytes unread when done
 	err      error         // read error once empty. non-nil means closed.
 	breakErr error         // immediate read error (caller doesn't see rest of b)
 	donec    chan struct{} // closed on error
@@ -33,7 +34,7 @@
 	p.mu.Lock()
 	defer p.mu.Unlock()
 	if p.b == nil {
-		return 0
+		return p.unread
 	}
 	return p.b.Len()
 }
@@ -80,6 +81,7 @@
 		return 0, errClosedPipeWrite
 	}
 	if p.breakErr != nil {
+		p.unread += len(d)
 		return len(d), nil // discard when there is no reader
 	}
 	return p.b.Write(d)
@@ -117,6 +119,9 @@
 	}
 	p.readFn = fn
 	if dst == &p.breakErr {
+		if p.b != nil {
+			p.unread += p.b.Len()
+		}
 		p.b = nil
 	}
 	*dst = err
diff --git a/http2/pipe_test.go b/http2/pipe_test.go
index 1bf351f..83d2dfd 100644
--- a/http2/pipe_test.go
+++ b/http2/pipe_test.go
@@ -92,6 +92,9 @@
 	if err != a {
 		t.Logf("read error = %v, %v", err, a)
 	}
+	if p.Len() != 0 {
+		t.Errorf("pipe should have 0 unread bytes")
+	}
 	// Read and Write should fail.
 	if n, err := p.Write([]byte("abc")); err != errClosedPipeWrite || n != 0 {
 		t.Errorf("Write(abc) after close\ngot %v, %v\nwant 0, %v", n, err, errClosedPipeWrite)
@@ -99,6 +102,9 @@
 	if n, err := p.Read(make([]byte, 1)); err == nil || n != 0 {
 		t.Errorf("Read() after close\ngot %v, nil\nwant 0, %v", n, errClosedPipeWrite)
 	}
+	if p.Len() != 0 {
+		t.Errorf("pipe should have 0 unread bytes")
+	}
 }
 
 func TestPipeBreakWithError(t *testing.T) {
@@ -116,6 +122,9 @@
 	if p.b != nil {
 		t.Errorf("buffer should be nil after BreakWithError")
 	}
+	if p.Len() != 3 {
+		t.Errorf("pipe should have 3 unread bytes")
+	}
 	// Write should succeed silently.
 	if n, err := p.Write([]byte("abc")); err != nil || n != 3 {
 		t.Errorf("Write(abc) after break\ngot %v, %v\nwant 0, nil", n, err)
@@ -123,6 +132,9 @@
 	if p.b != nil {
 		t.Errorf("buffer should be nil after Write")
 	}
+	if p.Len() != 6 {
+		t.Errorf("pipe should have 6 unread bytes")
+	}
 	// Read should fail.
 	if n, err := p.Read(make([]byte, 1)); err == nil || n != 0 {
 		t.Errorf("Read() after close\ngot %v, nil\nwant 0, not nil", n)
diff --git a/http2/server_test.go b/http2/server_test.go
index 7115a3e..0c9fa3e 100644
--- a/http2/server_test.go
+++ b/http2/server_test.go
@@ -3658,37 +3658,73 @@
 // golang.org/issue/16481 -- return flow control when streams close with unread data.
 // (The Server version of the bug. See also TestUnreadFlowControlReturned_Transport)
 func TestUnreadFlowControlReturned_Server(t *testing.T) {
-	unblock := make(chan bool, 1)
-	defer close(unblock)
+	for _, tt := range []struct {
+		name  string
+		reqFn func(r *http.Request)
+	}{
+		{
+			"body-open",
+			func(r *http.Request) {},
+		},
+		{
+			"body-closed",
+			func(r *http.Request) {
+				r.Body.Close()
+			},
+		},
+		{
+			"read-1-byte-and-close",
+			func(r *http.Request) {
+				b := make([]byte, 1)
+				r.Body.Read(b)
+				r.Body.Close()
+			},
+		},
+	} {
+		t.Run(tt.name, func(t *testing.T) {
+			unblock := make(chan bool, 1)
+			defer close(unblock)
 
-	st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
-		// Don't read the 16KB request body. Wait until the client's
-		// done sending it and then return. This should cause the Server
-		// to then return those 16KB of flow control to the client.
-		<-unblock
-	}, optOnlyServer)
-	defer st.Close()
+			timeOut := time.NewTimer(5 * time.Second)
+			defer timeOut.Stop()
+			st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+				// Don't read the 16KB request body. Wait until the client's
+				// done sending it and then return. This should cause the Server
+				// to then return those 16KB of flow control to the client.
+				tt.reqFn(r)
+				select {
+				case <-unblock:
+				case <-timeOut.C:
+					t.Fatal(tt.name, "timedout")
+				}
+			}, optOnlyServer)
+			defer st.Close()
 
-	tr := &Transport{TLSClientConfig: tlsConfigInsecure}
-	defer tr.CloseIdleConnections()
+			tr := &Transport{TLSClientConfig: tlsConfigInsecure}
+			defer tr.CloseIdleConnections()
 
-	// This previously hung on the 4th iteration.
-	for i := 0; i < 6; i++ {
-		body := io.MultiReader(
-			io.LimitReader(neverEnding('A'), 16<<10),
-			funcReader(func([]byte) (n int, err error) {
-				unblock <- true
-				return 0, io.EOF
-			}),
-		)
-		req, _ := http.NewRequest("POST", st.ts.URL, body)
-		res, err := tr.RoundTrip(req)
-		if err != nil {
-			t.Fatal(err)
-		}
-		res.Body.Close()
+			// This previously hung on the 4th iteration.
+			iters := 100
+			if testing.Short() {
+				iters = 20
+			}
+			for i := 0; i < iters; i++ {
+				body := io.MultiReader(
+					io.LimitReader(neverEnding('A'), 16<<10),
+					funcReader(func([]byte) (n int, err error) {
+						unblock <- true
+						return 0, io.EOF
+					}),
+				)
+				req, _ := http.NewRequest("POST", st.ts.URL, body)
+				res, err := tr.RoundTrip(req)
+				if err != nil {
+					t.Fatal(tt.name, err)
+				}
+				res.Body.Close()
+			}
+		})
 	}
-
 }
 
 func TestServerIdleTimeout(t *testing.T) {