http2: Add opt-in option to Framer to allow DataFrame struct reuse

The existing Framer in net/http2 allocates a new DataFrame struct
for each DataFrame read on calls to ReadFrame. The SetReuseFrame
option introduced here, if set on a Framer, allows the
Framer to reuse Frame objects and changes the ReadFrame API
so that returned Frame objects are only valid until the next call
to ReadFrame. This opt-in API now only implements reuse of DataFrames,
but it allows the Framer to reuse of any type of Frame.

The footprint caused by creation of new DataFrame structs per data
frame was noticed in micro benchmarks of "gRPC" server "streaming
throuhgput", which uses the Framer in this package. This benchmark
happened to use long lived http2 streams that do client-server "ping-pong"
requests with small data frames, and DataFrames were seen to be a
significant source of allocations.

Running local benchmarks with: (from x/net/http2 directory)

$ go test -run=^$ -bench=BenchmarkServerToClientStream

example output:
* expect an alloc reduction of at least 1 and a small memory reduction between
"BenchmarkServerToClientStreamDefaultOptions" and
"BenchmarkServerToClientStreamReuseFrames"

BenchmarkServerToClientStreamDefaultOptions-12    	   30000
46216 ns/op	     971 B/op	      17 allocs/op
BenchmarkServerToClientStreamReuseFrames-12       	   30000
44952 ns/op	     924 B/op	      16 allocs/op

Fixes golang/go#18502

Change-Id: Iad93420ef6c3918f54249d867098f1dadfa324d8
Reviewed-on: https://go-review.googlesource.com/34812
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/http2/frame.go b/http2/frame.go
index 9573588..3b14890 100644
--- a/http2/frame.go
+++ b/http2/frame.go
@@ -122,7 +122,7 @@
 // a frameParser parses a frame given its FrameHeader and payload
 // bytes. The length of payload will always equal fh.Length (which
 // might be 0).
-type frameParser func(fh FrameHeader, payload []byte) (Frame, error)
+type frameParser func(fc *frameCache, fh FrameHeader, payload []byte) (Frame, error)
 
 var frameParsers = map[FrameType]frameParser{
 	FrameData:         parseDataFrame,
@@ -323,6 +323,8 @@
 	debugFramerBuf    *bytes.Buffer
 	debugReadLoggerf  func(string, ...interface{})
 	debugWriteLoggerf func(string, ...interface{})
+
+	frameCache *frameCache // nil if frames aren't reused (default)
 }
 
 func (fr *Framer) maxHeaderListSize() uint32 {
@@ -398,6 +400,27 @@
 	maxFrameSize    = 1<<24 - 1
 )
 
+// SetReuseFrames allows the Framer to reuse Frames.
+// If called on a Framer, Frames returned by calls to ReadFrame are only
+// valid until the next call to ReadFrame.
+func (fr *Framer) SetReuseFrames() {
+	if fr.frameCache != nil {
+		return
+	}
+	fr.frameCache = &frameCache{}
+}
+
+type frameCache struct {
+	dataFrame DataFrame
+}
+
+func (fc *frameCache) getDataFrame() *DataFrame {
+	if fc == nil {
+		return &DataFrame{}
+	}
+	return &fc.dataFrame
+}
+
 // NewFramer returns a Framer that writes frames to w and reads them from r.
 func NewFramer(w io.Writer, r io.Reader) *Framer {
 	fr := &Framer{
@@ -477,7 +500,7 @@
 	if _, err := io.ReadFull(fr.r, payload); err != nil {
 		return nil, err
 	}
-	f, err := typeFrameParser(fh.Type)(fh, payload)
+	f, err := typeFrameParser(fh.Type)(fr.frameCache, fh, payload)
 	if err != nil {
 		if ce, ok := err.(connError); ok {
 			return nil, fr.connError(ce.Code, ce.Reason)
@@ -565,7 +588,7 @@
 	return f.data
 }
 
-func parseDataFrame(fh FrameHeader, payload []byte) (Frame, error) {
+func parseDataFrame(fc *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
 	if fh.StreamID == 0 {
 		// DATA frames MUST be associated with a stream. If a
 		// DATA frame is received whose stream identifier
@@ -574,9 +597,9 @@
 		// PROTOCOL_ERROR.
 		return nil, connError{ErrCodeProtocol, "DATA frame with stream ID 0"}
 	}
-	f := &DataFrame{
-		FrameHeader: fh,
-	}
+	f := fc.getDataFrame()
+	f.FrameHeader = fh
+
 	var padSize byte
 	if fh.Flags.Has(FlagDataPadded) {
 		var err error
@@ -672,7 +695,7 @@
 	p []byte
 }
 
-func parseSettingsFrame(fh FrameHeader, p []byte) (Frame, error) {
+func parseSettingsFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
 	if fh.Flags.Has(FlagSettingsAck) && fh.Length > 0 {
 		// When this (ACK 0x1) bit is set, the payload of the
 		// SETTINGS frame MUST be empty. Receipt of a
@@ -774,7 +797,7 @@
 
 func (f *PingFrame) IsAck() bool { return f.Flags.Has(FlagPingAck) }
 
-func parsePingFrame(fh FrameHeader, payload []byte) (Frame, error) {
+func parsePingFrame(_ *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
 	if len(payload) != 8 {
 		return nil, ConnectionError(ErrCodeFrameSize)
 	}
@@ -814,7 +837,7 @@
 	return f.debugData
 }
 
-func parseGoAwayFrame(fh FrameHeader, p []byte) (Frame, error) {
+func parseGoAwayFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
 	if fh.StreamID != 0 {
 		return nil, ConnectionError(ErrCodeProtocol)
 	}
@@ -854,7 +877,7 @@
 	return f.p
 }
 
-func parseUnknownFrame(fh FrameHeader, p []byte) (Frame, error) {
+func parseUnknownFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
 	return &UnknownFrame{fh, p}, nil
 }
 
@@ -865,7 +888,7 @@
 	Increment uint32 // never read with high bit set
 }
 
-func parseWindowUpdateFrame(fh FrameHeader, p []byte) (Frame, error) {
+func parseWindowUpdateFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
 	if len(p) != 4 {
 		return nil, ConnectionError(ErrCodeFrameSize)
 	}
@@ -930,7 +953,7 @@
 	return f.FrameHeader.Flags.Has(FlagHeadersPriority)
 }
 
-func parseHeadersFrame(fh FrameHeader, p []byte) (_ Frame, err error) {
+func parseHeadersFrame(_ *frameCache, fh FrameHeader, p []byte) (_ Frame, err error) {
 	hf := &HeadersFrame{
 		FrameHeader: fh,
 	}
@@ -1067,7 +1090,7 @@
 	return p == PriorityParam{}
 }
 
-func parsePriorityFrame(fh FrameHeader, payload []byte) (Frame, error) {
+func parsePriorityFrame(_ *frameCache, fh FrameHeader, payload []byte) (Frame, error) {
 	if fh.StreamID == 0 {
 		return nil, connError{ErrCodeProtocol, "PRIORITY frame with stream ID 0"}
 	}
@@ -1114,7 +1137,7 @@
 	ErrCode ErrCode
 }
 
-func parseRSTStreamFrame(fh FrameHeader, p []byte) (Frame, error) {
+func parseRSTStreamFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
 	if len(p) != 4 {
 		return nil, ConnectionError(ErrCodeFrameSize)
 	}
@@ -1144,7 +1167,7 @@
 	headerFragBuf []byte
 }
 
-func parseContinuationFrame(fh FrameHeader, p []byte) (Frame, error) {
+func parseContinuationFrame(_ *frameCache, fh FrameHeader, p []byte) (Frame, error) {
 	if fh.StreamID == 0 {
 		return nil, connError{ErrCodeProtocol, "CONTINUATION frame with stream ID 0"}
 	}
@@ -1194,7 +1217,7 @@
 	return f.FrameHeader.Flags.Has(FlagPushPromiseEndHeaders)
 }
 
-func parsePushPromise(fh FrameHeader, p []byte) (_ Frame, err error) {
+func parsePushPromise(_ *frameCache, fh FrameHeader, p []byte) (_ Frame, err error) {
 	pp := &PushPromiseFrame{
 		FrameHeader: fh,
 	}
diff --git a/http2/frame_test.go b/http2/frame_test.go
index 311f86f..37266bc 100644
--- a/http2/frame_test.go
+++ b/http2/frame_test.go
@@ -1096,6 +1096,95 @@
 	}
 }
 
+func TestSetReuseFrames(t *testing.T) {
+	fr, buf := testFramer()
+	fr.SetReuseFrames()
+
+	// Check that DataFrames are reused. Note that
+	// SetReuseFrames only currently implements reuse of DataFrames.
+	firstDf := readAndVerifyDataFrame("ABC", 3, fr, buf, t)
+
+	for i := 0; i < 10; i++ {
+		df := readAndVerifyDataFrame("XYZ", 3, fr, buf, t)
+		if df != firstDf {
+			t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
+		}
+	}
+
+	for i := 0; i < 10; i++ {
+		df := readAndVerifyDataFrame("", 0, fr, buf, t)
+		if df != firstDf {
+			t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
+		}
+	}
+
+	for i := 0; i < 10; i++ {
+		df := readAndVerifyDataFrame("HHH", 3, fr, buf, t)
+		if df != firstDf {
+			t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
+		}
+	}
+}
+
+func TestSetReuseFramesMoreThanOnce(t *testing.T) {
+	fr, buf := testFramer()
+	fr.SetReuseFrames()
+
+	firstDf := readAndVerifyDataFrame("ABC", 3, fr, buf, t)
+	fr.SetReuseFrames()
+
+	for i := 0; i < 10; i++ {
+		df := readAndVerifyDataFrame("XYZ", 3, fr, buf, t)
+		// SetReuseFrames should be idempotent
+		fr.SetReuseFrames()
+		if df != firstDf {
+			t.Errorf("Expected Framer to return references to the same DataFrame. Have %v and %v", &df, &firstDf)
+		}
+	}
+}
+
+func TestNoSetReuseFrames(t *testing.T) {
+	fr, buf := testFramer()
+	const numNewDataFrames = 10
+	dfSoFar := make([]interface{}, numNewDataFrames)
+
+	// Check that DataFrames are not reused if SetReuseFrames wasn't called.
+	// SetReuseFrames only currently implements reuse of DataFrames.
+	for i := 0; i < numNewDataFrames; i++ {
+		df := readAndVerifyDataFrame("XYZ", 3, fr, buf, t)
+		for _, item := range dfSoFar {
+			if df == item {
+				t.Errorf("Expected Framer to return new DataFrames since SetNoReuseFrames not set.")
+			}
+		}
+		dfSoFar[i] = df
+	}
+}
+
+func readAndVerifyDataFrame(data string, length byte, fr *Framer, buf *bytes.Buffer, t *testing.T) *DataFrame {
+	var streamID uint32 = 1<<24 + 2<<16 + 3<<8 + 4
+	fr.WriteData(streamID, true, []byte(data))
+	wantEnc := "\x00\x00" + string(length) + "\x00\x01\x01\x02\x03\x04" + data
+	if buf.String() != wantEnc {
+		t.Errorf("encoded as %q; want %q", buf.Bytes(), wantEnc)
+	}
+	f, err := fr.ReadFrame()
+	if err != nil {
+		t.Fatal(err)
+	}
+	df, ok := f.(*DataFrame)
+	if !ok {
+		t.Fatalf("got %T; want *DataFrame", f)
+	}
+	if !bytes.Equal(df.Data(), []byte(data)) {
+		t.Errorf("got %q; want %q", df.Data(), []byte(data))
+	}
+	if f.Header().Flags&1 == 0 {
+		t.Errorf("didn't see END_STREAM flag")
+	}
+	return df
+}
+
 func encodeHeaderRaw(t *testing.T, pairs ...string) []byte {
 	var he hpackEncoder
 	return he.encodeHeaderRaw(t, pairs...)
diff --git a/http2/server_test.go b/http2/server_test.go
index c2e51e6..7830d3f 100644
--- a/http2/server_test.go
+++ b/http2/server_test.go
@@ -80,6 +80,7 @@
 
 var optOnlyServer = serverTesterOpt("only_server")
 var optQuiet = serverTesterOpt("quiet_logging")
+var optFramerReuseFrames = serverTesterOpt("frame_reuse_frames")
 
 func newServerTester(t testing.TB, handler http.HandlerFunc, opts ...interface{}) *serverTester {
 	resetHooks()
@@ -91,7 +92,7 @@
 		NextProtos:         []string{NextProtoTLS},
 	}
 
-	var onlyServer, quiet bool
+	var onlyServer, quiet, framerReuseFrames bool
 	h2server := new(Server)
 	for _, opt := range opts {
 		switch v := opt.(type) {
@@ -107,6 +108,8 @@
 				onlyServer = true
 			case optQuiet:
 				quiet = true
+			case optFramerReuseFrames:
+				framerReuseFrames = true
 			}
 		case func(net.Conn, http.ConnState):
 			ts.Config.ConnState = v
@@ -149,6 +152,9 @@
 		}
 		st.cc = cc
 		st.fr = NewFramer(cc, cc)
+		if framerReuseFrames {
+			st.fr.SetReuseFrames()
+		}
 		if !logFrameReads && !logFrameWrites {
 			st.fr.debugReadLoggerf = func(m string, v ...interface{}) {
 				m = time.Now().Format("2006-01-02 15:04:05.999999999 ") + strings.TrimPrefix(m, "http2: ") + "\n"
@@ -2994,6 +3000,89 @@
 	}
 }
 
+// Send a stream of messages from server to client in separate data frames.
+// Brings up performance issues seen in long streams.
+// Created to show problem in go issue #18502
+func BenchmarkServerToClientStreamDefaultOptions(b *testing.B) {
+	benchmarkServerToClientStream(b)
+}
+
+// Justification for Change-Id: Iad93420ef6c3918f54249d867098f1dadfa324d8
+// Expect to see memory/alloc reduction by opting in to Frame reuse with the Framer.
+func BenchmarkServerToClientStreamReuseFrames(b *testing.B) {
+	benchmarkServerToClientStream(b, optFramerReuseFrames)
+}
+
+func benchmarkServerToClientStream(b *testing.B, newServerOpts ...interface{}) {
+	defer disableGoroutineTracking()()
+	b.ReportAllocs()
+	const msgLen = 1
+	// default window size
+	const windowSize = 1<<16 - 1
+
+	// next message to send from the server and for the client to expect
+	nextMsg := func(i int) []byte {
+		msg := make([]byte, msgLen)
+		msg[0] = byte(i)
+		if len(msg) != msgLen {
+			panic("invalid test setup msg length")
+		}
+		return msg
+	}
+
+	st := newServerTester(b, func(w http.ResponseWriter, r *http.Request) {
+		// Consume the (empty) body from th peer before replying, otherwise
+		// the server will sometimes (depending on scheduling) send the peer a
+		// a RST_STREAM with the CANCEL error code.
+		if n, err := io.Copy(ioutil.Discard, r.Body); n != 0 || err != nil {
+			b.Errorf("Copy error; got %v, %v; want 0, nil", n, err)
+		}
+		for i := 0; i < b.N; i += 1 {
+			w.Write(nextMsg(i))
+			w.(http.Flusher).Flush()
+		}
+	}, newServerOpts...)
+	defer st.Close()
+	st.greet()
+
+	const id = uint32(1)
+
+	st.writeHeaders(HeadersFrameParam{
+		StreamID:      id,
+		BlockFragment: st.encodeHeader(":method", "POST"),
+		EndStream:     false,
+		EndHeaders:    true,
+	})
+
+	st.writeData(id, true, nil)
+	st.wantHeaders()
+
+	var pendingWindowUpdate = uint32(0)
+
+	for i := 0; i < b.N; i += 1 {
+		expected := nextMsg(i)
+		df := st.wantData()
+		if bytes.Compare(expected, df.data) != 0 {
+			b.Fatalf("Bad message received; want %v; got %v", expected, df.data)
+		}
+		// try to send infrequent but large window updates so they don't overwhelm the test
+		pendingWindowUpdate += uint32(len(df.data))
+		if pendingWindowUpdate >= windowSize/2 {
+			if err := st.fr.WriteWindowUpdate(0, pendingWindowUpdate); err != nil {
+				b.Fatal(err)
+			}
+			if err := st.fr.WriteWindowUpdate(id, pendingWindowUpdate); err != nil {
+				b.Fatal(err)
+			}
+			pendingWindowUpdate = 0
+		}
+	}
+	df := st.wantData()
+	if !df.StreamEnded() {
+		b.Fatalf("DATA didn't have END_STREAM; got %v", df)
+	}
+}
+
 // go-fuzz bug, originally reported at https://github.com/bradfitz/http2/issues/53
 // Verify we don't hang.
 func TestIssue53(t *testing.T) {