http2: Send WindowUpdates when remaining bytes are below a threshold

This rolls-forward CL 150197 with an added fix for
TestProtocolErrorAfterGoAway.

Rather than send a WindowUpdate on every chunk of bytes read, allow them
to collect until we go past half the configured window size. Once the
threshold is reached, send a single WindowUpdate to reset the amount
back to the maximum amount configured.

Fixes golang/go#28732

Change-Id: Icee93dedf68d6166aa6fe0c3845d717e66586e73
Reviewed-on: https://go-review.googlesource.com/c/net/+/432038
Run-TryBot: Damien Neil <dneil@google.com>
Auto-Submit: Damien Neil <dneil@google.com>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Tatiana Bradley <tatiana@golang.org>
diff --git a/http2/server.go b/http2/server.go
index 967c9e7..a894e69 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -869,9 +869,7 @@
 
 	// Each connection starts with initialWindowSize inflow tokens.
 	// If a higher value is configured, we add more tokens.
-	if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
-		sc.sendWindowUpdate(nil, int(diff))
-	}
+	sc.sendWindowUpdate(nil)
 
 	if err := sc.readPreface(); err != nil {
 		sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
@@ -1588,7 +1586,7 @@
 	if p := st.body; p != nil {
 		// Return any buffered unread bytes worth of conn-level flow control.
 		// See golang.org/issue/16481
-		sc.sendWindowUpdate(nil, p.Len())
+		sc.sendWindowUpdate(nil)
 
 		p.CloseWithError(err)
 	}
@@ -1736,7 +1734,7 @@
 		// sendWindowUpdate, which also schedules sending the
 		// frames.
 		sc.inflow.take(int32(f.Length))
-		sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
+		sc.sendWindowUpdate(nil) // conn-level
 
 		if st != nil && st.resetQueued {
 			// Already have a stream error in flight. Don't send another.
@@ -1754,7 +1752,7 @@
 			return sc.countError("data_flow", streamError(id, ErrCodeFlowControl))
 		}
 		sc.inflow.take(int32(f.Length))
-		sc.sendWindowUpdate(nil, int(f.Length)) // conn-level
+		sc.sendWindowUpdate(nil) // conn-level
 
 		st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
 		// RFC 7540, sec 8.1.2.6: A request or response is also malformed if the
@@ -1772,7 +1770,7 @@
 		if len(data) > 0 {
 			wrote, err := st.body.Write(data)
 			if err != nil {
-				sc.sendWindowUpdate(nil, int(f.Length)-wrote)
+				sc.sendWindowUpdate32(nil, int32(f.Length)-int32(wrote))
 				return sc.countError("body_write_err", streamError(id, ErrCodeStreamClosed))
 			}
 			if wrote != len(data) {
@@ -2324,17 +2322,32 @@
 
 func (sc *serverConn) noteBodyRead(st *stream, n int) {
 	sc.serveG.check()
-	sc.sendWindowUpdate(nil, n) // conn-level
+	sc.sendWindowUpdate(nil) // conn-level
 	if st.state != stateHalfClosedRemote && st.state != stateClosed {
 		// Don't send this WINDOW_UPDATE if the stream is closed
 		// remotely.
-		sc.sendWindowUpdate(st, n)
+		sc.sendWindowUpdate(st)
 	}
 }
 
 // st may be nil for conn-level
-func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
+func (sc *serverConn) sendWindowUpdate(st *stream) {
 	sc.serveG.check()
+
+	var n int32
+	if st == nil {
+		if avail, windowSize := sc.inflow.available(), sc.srv.initialConnRecvWindowSize(); avail > windowSize/2 {
+			return
+		} else {
+			n = windowSize - avail
+		}
+	} else {
+		if avail, windowSize := st.inflow.available(), sc.srv.initialStreamRecvWindowSize(); avail > windowSize/2 {
+			return
+		} else {
+			n = windowSize - avail
+		}
+	}
 	// "The legal range for the increment to the flow control
 	// window is 1 to 2^31-1 (2,147,483,647) octets."
 	// A Go Read call on 64-bit machines could in theory read
diff --git a/http2/server_test.go b/http2/server_test.go
index 654d53f..50ea4b7 100644
--- a/http2/server_test.go
+++ b/http2/server_test.go
@@ -809,9 +809,6 @@
 				EndHeaders: true,
 			})
 			st.writeData(1, true, []byte("12345"))
-			// Return flow control bytes back, since the data handler closed
-			// the stream.
-			st.wantWindowUpdate(0, 5)
 		})
 }
 
@@ -1253,20 +1250,51 @@
 		EndStream:     false, // data coming
 		EndHeaders:    true,
 	})
+	updateSize := 1 << 20 / 2 // the conn & stream size before a WindowUpdate
+	st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
+	st.writeData(1, false, bytes.Repeat([]byte("b"), 10))
+	puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
+	puppet.do(readBodyHandler(t, strings.Repeat("b", 10)))
+
+	st.wantWindowUpdate(0, uint32(updateSize))
+	st.wantWindowUpdate(1, uint32(updateSize))
+
+	st.writeData(1, false, bytes.Repeat([]byte("a"), updateSize-10))
+	st.writeData(1, true, bytes.Repeat([]byte("c"), 15)) // END_STREAM here
+	puppet.do(readBodyHandler(t, strings.Repeat("a", updateSize-10)))
+	puppet.do(readBodyHandler(t, strings.Repeat("c", 15)))
+
+	st.wantWindowUpdate(0, uint32(updateSize+5))
+}
+
+func TestServer_Handler_Sends_WindowUpdate_SmallStream(t *testing.T) {
+	puppet := newHandlerPuppet()
+	st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+		puppet.act(w, r)
+	}, func(s *Server) {
+		s.MaxUploadBufferPerStream = 6
+	})
+	defer st.Close()
+	defer puppet.done()
+
+	st.greet()
+
+	st.writeHeaders(HeadersFrameParam{
+		StreamID:      1, // clients send odd numbers
+		BlockFragment: st.encodeHeader(":method", "POST"),
+		EndStream:     false, // data coming
+		EndHeaders:    true,
+	})
 	st.writeData(1, false, []byte("abcdef"))
 	puppet.do(readBodyHandler(t, "abc"))
-	st.wantWindowUpdate(0, 3)
-	st.wantWindowUpdate(1, 3)
+	puppet.do(readBodyHandler(t, "d"))
+	puppet.do(readBodyHandler(t, "ef"))
 
-	puppet.do(readBodyHandler(t, "def"))
-	st.wantWindowUpdate(0, 3)
-	st.wantWindowUpdate(1, 3)
+	st.wantWindowUpdate(1, 6)
 
 	st.writeData(1, true, []byte("ghijkl")) // END_STREAM here
 	puppet.do(readBodyHandler(t, "ghi"))
 	puppet.do(readBodyHandler(t, "jkl"))
-	st.wantWindowUpdate(0, 3)
-	st.wantWindowUpdate(0, 3) // no more stream-level, since END_STREAM
 }
 
 // the version of the TestServer_Handler_Sends_WindowUpdate with padding.
@@ -1295,12 +1323,7 @@
 	st.wantWindowUpdate(1, 5)
 
 	puppet.do(readBodyHandler(t, "abc"))
-	st.wantWindowUpdate(0, 3)
-	st.wantWindowUpdate(1, 3)
-
 	puppet.do(readBodyHandler(t, "def"))
-	st.wantWindowUpdate(0, 3)
-	st.wantWindowUpdate(1, 3)
 }
 
 func TestServer_Send_GoAway_After_Bogus_WindowUpdate(t *testing.T) {
@@ -2296,8 +2319,6 @@
 		// gigantic and/or sensitive "foo" payload now.
 		st.writeData(1, true, []byte(msg))
 
-		st.wantWindowUpdate(0, uint32(len(msg)))
-
 		hf = st.wantHeaders()
 		if hf.StreamEnded() {
 			t.Fatal("expected data to follow")
@@ -2485,9 +2506,6 @@
 		// it did before.
 		st.writeData(1, true, []byte("foo"))
 
-		// Get our flow control bytes back, since the handler didn't get them.
-		st.wantWindowUpdate(0, uint32(len("foo")))
-
 		// Sent after a peer sends data anyway (admittedly the
 		// previous RST_STREAM might've still been in-flight),
 		// but they'll get the more friendly 'cancel' code
@@ -3930,7 +3948,6 @@
 			EndHeaders: true,
 		})
 		st.writeData(1, true, []byte("12345"))
-		st.wantWindowUpdate(0, 5)
 		st.wantRSTStream(1, ErrCodeProtocol)
 	})
 }
@@ -4223,7 +4240,6 @@
 	st.writeData(1, false, []byte(content[5:]))
 	blockCh <- true
 
-	increments := len(content)
 	for {
 		f, err := st.readFrame()
 		if err == io.EOF {
@@ -4232,10 +4248,12 @@
 		if err != nil {
 			t.Fatal(err)
 		}
+		if rs, ok := f.(*RSTStreamFrame); ok && rs.StreamID == 1 {
+			break
+		}
 		if wu, ok := f.(*WindowUpdateFrame); ok && wu.StreamID == 0 {
-			increments -= int(wu.Increment)
-			if increments == 0 {
-				break
+			if e, a := uint32(3), wu.Increment; e != a {
+				t.Errorf("Increment=%d, want %d", a, e)
 			}
 		}
 	}
@@ -4378,22 +4396,22 @@
 
 func TestProtocolErrorAfterGoAway(t *testing.T) {
 	st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+		w.WriteHeader(200)
+		w.(http.Flusher).Flush()
 		io.Copy(io.Discard, r.Body)
 	})
 	defer st.Close()
 
 	st.greet()
-	content := "some content"
 	st.writeHeaders(HeadersFrameParam{
 		StreamID: 1,
 		BlockFragment: st.encodeHeader(
 			":method", "POST",
-			"content-length", strconv.Itoa(len(content)),
+			"content-length", "1",
 		),
 		EndStream:  false,
 		EndHeaders: true,
 	})
-	st.writeData(1, false, []byte(content[:5]))
 
 	_, err := st.readFrame()
 	if err != nil {