http2: make Server reuse 64k request body buffer between requests

name           old time/op    new time/op    delta
ServerPosts-4     192µs ± 1%     164µs ± 1%  -14.16%  (p=0.000 n=17+19)

name           old alloc/op   new alloc/op   delta
ServerPosts-4    69.8kB ± 0%     2.8kB ± 0%  -95.95%  (p=0.000 n=18+18)

name           old allocs/op  new allocs/op  delta
ServerPosts-4      42.0 ± 0%      40.0 ± 0%   -4.76%  (p=0.000 n=20+20)

This is a redo of git rev e7da8eda (golang.org/cl/20542) which had a race
and was later reverted in golang.org/cl/21160.

Updates golang/go#14960
Updates grpc/grpc-go#604

Change-Id: Ie216e45730dce4fc0c58f295bcbc669973145599
Reviewed-on: https://go-review.googlesource.com/31447
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Ian Lance Taylor <iant@golang.org>
diff --git a/http2/server.go b/http2/server.go
index c986bc1..808b011 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -390,7 +390,6 @@
 	goAwayCode            ErrCode
 	shutdownTimerCh       <-chan time.Time // nil until used
 	shutdownTimer         *time.Timer      // nil until used
-	freeRequestBodyBuf    []byte           // if non-nil, a free initialWindowSize buffer for getRequestBodyBuf
 
 	// Owned by the writeFrameAsync goroutine:
 	headerWriteBuf bytes.Buffer
@@ -434,11 +433,11 @@
 	numTrailerValues int64
 	weight           uint8
 	state            streamState
-	sentReset        bool // only true once detached from streams map
-	gotReset         bool // only true once detacted from streams map
-	gotTrailerHeader bool // HEADER frame for trailers was seen
-	wroteHeaders     bool // whether we wrote headers (not status 100)
-	reqBuf           []byte
+	sentReset        bool   // only true once detached from streams map
+	gotReset         bool   // only true once detacted from streams map
+	gotTrailerHeader bool   // HEADER frame for trailers was seen
+	wroteHeaders     bool   // whether we wrote headers (not status 100)
+	reqBuf           []byte // if non-nil, body pipe buffer to return later at EOF
 
 	trailer    http.Header // accumulated trailers
 	reqTrailer http.Header // handler's Request.Trailer
@@ -916,11 +915,11 @@
 			// Here we would go to stateHalfClosedLocal in
 			// theory, but since our handler is done and
 			// the net/http package provides no mechanism
-			// for finishing writing to a ResponseWriter
-			// while still reading data (see possible TODO
-			// at top of this file), we go into closed
-			// state here anyway, after telling the peer
-			// we're hanging up on them.
+			// for closing a ResponseWriter while still
+			// reading data (see possible TODO at top of
+			// this file), we go into closed state here
+			// anyway, after telling the peer we're
+			// hanging up on them.
 			st.state = stateHalfClosedLocal // won't last long, but necessary for closeStream via resetStream
 			errCancel := streamError(st.id, ErrCodeCancel)
 			sc.resetStream(errCancel)
@@ -1184,18 +1183,6 @@
 	}
 	st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
 	sc.writeSched.forgetStream(st.id)
-	if st.reqBuf != nil {
-		// Stash this request body buffer (64k) away for reuse
-		// by a future POST/PUT/etc.
-		//
-		// TODO(bradfitz): share on the server? sync.Pool?
-		// Server requires locks and might hurt contention.
-		// sync.Pool might work, or might be worse, depending
-		// on goroutine CPU migrations. (get and put on
-		// separate CPUs).  Maybe a mix of strategies. But
-		// this is an easy win for now.
-		sc.freeRequestBodyBuf = st.reqBuf
-	}
 }
 
 func (sc *serverConn) processSettings(f *SettingsFrame) error {
@@ -1672,13 +1659,9 @@
 	}
 	req = requestWithContext(req, st.ctx)
 	if bodyOpen {
-		// Disabled, per golang.org/issue/14960:
-		// st.reqBuf = sc.getRequestBodyBuf()
-		// TODO: remove this 64k of garbage per request (again, but without a data race):
-		buf := make([]byte, initialWindowSize)
-
+		st.reqBuf = getRequestBodyBuf()
 		body.pipe = &pipe{
-			b: &fixedBuffer{buf: buf},
+			b: &fixedBuffer{buf: st.reqBuf},
 		}
 
 		if vv, ok := header["Content-Length"]; ok {
@@ -1702,13 +1685,22 @@
 	return rw, req, nil
 }
 
-func (sc *serverConn) getRequestBodyBuf() []byte {
-	sc.serveG.check()
-	if buf := sc.freeRequestBodyBuf; buf != nil {
-		sc.freeRequestBodyBuf = nil
-		return buf
+var reqBodyCache = make(chan []byte, 8)
+
+func getRequestBodyBuf() []byte {
+	select {
+	case b := <-reqBodyCache:
+		return b
+	default:
+		return make([]byte, initialWindowSize)
 	}
-	return make([]byte, initialWindowSize)
+}
+
+func putRequestBodyBuf(b []byte) {
+	select {
+	case reqBodyCache <- b:
+	default:
+	}
 }
 
 // Run on its own goroutine.
@@ -1796,11 +1788,19 @@
 // called from handler goroutines.
 // Notes that the handler for the given stream ID read n bytes of its body
 // and schedules flow control tokens to be sent.
-func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int) {
+func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int, err error) {
 	sc.serveG.checkNotOn() // NOT on
-	select {
-	case sc.bodyReadCh <- bodyReadMsg{st, n}:
-	case <-sc.doneServing:
+	if n > 0 {
+		select {
+		case sc.bodyReadCh <- bodyReadMsg{st, n}:
+		case <-sc.doneServing:
+		}
+	}
+	if err == io.EOF {
+		if buf := st.reqBuf; buf != nil {
+			st.reqBuf = nil // shouldn't matter; field unused by other
+			putRequestBodyBuf(buf)
+		}
 	}
 }
 
@@ -1883,9 +1883,10 @@
 		return 0, io.EOF
 	}
 	n, err = b.pipe.Read(p)
-	if n > 0 {
-		b.conn.noteBodyReadFromHandler(b.stream, n)
+	if err == io.EOF {
+		b.pipe = nil
 	}
+	b.conn.noteBodyReadFromHandler(b.stream, n, err)
 	return
 }