http2: add configurable knobs for the server's receive window

Upload performance is poor when BDP is higher than the flow-control window.
Previously, the server's receive window was fixed at 64KB, which resulted in
very poor performance for high-BDP links. The receive window now defaults to
1MB and is configurable. The per-connection and per-stream windows are
configurable separately (both default to 1MB as suggested in golang/go#16512).

Previously, the server created a "fixedBuffer" for each request body. This is no
longer a good idea because a fixedBuffer has fixed size, which means individual
streams cannot use varying amounts of the available connection window. To
overcome this limitation, I replaced fixedBuffer with "dataBuffer", which grows
and shrinks based on current usage. The worst-case fragmentation of dataBuffer
is 32KB wasted memory per stream, but I expect that worst-case will be rare.

A slightly modified version of adg@'s grpcbench program shows a dramatic
improvement when increasing from a 64KB window to a 1MB window, especially at
higher latencies (i.e., higher BDPs). Network latency was simulated with netem,
e.g., `tc qdisc add dev lo root netem delay 16ms`.

Duration        Latency Proto           H2 Window

11ms±4.05ms     0s      HTTP/1.1        -
17ms±1.95ms     0s      HTTP/2.0        65535
8ms±1.75ms      0s      HTTP/2.0        1048576

10ms±1.49ms     1ms     HTTP/1.1        -
47ms±2.91ms     1ms     HTTP/2.0        65535
10ms±1.77ms     1ms     HTTP/2.0        1048576

15ms±1.69ms     2ms     HTTP/1.1        -
88ms±11.29ms    2ms     HTTP/2.0        65535
15ms±1.18ms     2ms     HTTP/2.0        1048576

23ms±1.42ms     4ms     HTTP/1.1        -
152ms±0.77ms    4ms     HTTP/2.0        65535
23ms±0.94ms     4ms     HTTP/2.0        1048576

40ms±1.54ms     8ms     HTTP/1.1        -
288ms±1.67ms    8ms     HTTP/2.0        65535
39ms±1.29ms     8ms     HTTP/2.0        1048576

72ms±1.13ms     16ms    HTTP/1.1        -
559ms±0.68ms    16ms    HTTP/2.0        65535
71ms±1.12ms     16ms    HTTP/2.0        1048576

136ms±1.15ms    32ms    HTTP/1.1        -
1104ms±1.62ms   32ms    HTTP/2.0        65535
135ms±0.96ms    32ms    HTTP/2.0        1048576

264ms±0.95ms    64ms    HTTP/1.1        -
2191ms±2.08ms   64ms    HTTP/2.0        65535
263ms±1.57ms    64ms    HTTP/2.0        1048576

Fixes golang/go#16512
Updates golang/go#17985
Updates golang/go#18404

Change-Id: Ied385aa94588337e98dad9475cf2ece2f39ba346
Reviewed-on: https://go-review.googlesource.com/37226
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
diff --git a/http2/server.go b/http2/server.go
index 7e523f8..550427d 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -110,11 +110,38 @@
 	// activity for the purposes of IdleTimeout.
 	IdleTimeout time.Duration
 
+	// MaxUploadBufferPerConnection is the size of the initial flow
+	// control window for each connections. The HTTP/2 spec does not
+	// allow this to be smaller than 65535 or larger than 2^32-1.
+	// If the value is outside this range, a default value will be
+	// used instead.
+	MaxUploadBufferPerConnection int32
+
+	// MaxUploadBufferPerStream is the size of the initial flow control
+	// window for each stream. The HTTP/2 spec does not allow this to
+	// be larger than 2^32-1. If the value is zero or larger than the
+	// maximum, a default value will be used instead.
+	MaxUploadBufferPerStream int32
+
 	// NewWriteScheduler constructs a write scheduler for a connection.
 	// If nil, a default scheduler is chosen.
 	NewWriteScheduler func() WriteScheduler
 }
 
+func (s *Server) initialConnRecvWindowSize() int32 {
+	if s.MaxUploadBufferPerConnection > initialWindowSize {
+		return s.MaxUploadBufferPerConnection
+	}
+	return 1 << 20
+}
+
+func (s *Server) initialStreamRecvWindowSize() int32 {
+	if s.MaxUploadBufferPerStream > 0 {
+		return s.MaxUploadBufferPerStream
+	}
+	return 1 << 20
+}
+
 func (s *Server) maxReadFrameSize() uint32 {
 	if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
 		return v
@@ -255,27 +282,27 @@
 	defer cancel()
 
 	sc := &serverConn{
-		srv:               s,
-		hs:                opts.baseConfig(),
-		conn:              c,
-		baseCtx:           baseCtx,
-		remoteAddrStr:     c.RemoteAddr().String(),
-		bw:                newBufferedWriter(c),
-		handler:           opts.handler(),
-		streams:           make(map[uint32]*stream),
-		readFrameCh:       make(chan readFrameResult),
-		wantWriteFrameCh:  make(chan FrameWriteRequest, 8),
-		wantStartPushCh:   make(chan startPushRequest, 8),
-		wroteFrameCh:      make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
-		bodyReadCh:        make(chan bodyReadMsg),         // buffering doesn't matter either way
-		doneServing:       make(chan struct{}),
-		clientMaxStreams:  math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
-		advMaxStreams:     s.maxConcurrentStreams(),
-		initialWindowSize: initialWindowSize,
-		maxFrameSize:      initialMaxFrameSize,
-		headerTableSize:   initialHeaderTableSize,
-		serveG:            newGoroutineLock(),
-		pushEnabled:       true,
+		srv:                         s,
+		hs:                          opts.baseConfig(),
+		conn:                        c,
+		baseCtx:                     baseCtx,
+		remoteAddrStr:               c.RemoteAddr().String(),
+		bw:                          newBufferedWriter(c),
+		handler:                     opts.handler(),
+		streams:                     make(map[uint32]*stream),
+		readFrameCh:                 make(chan readFrameResult),
+		wantWriteFrameCh:            make(chan FrameWriteRequest, 8),
+		wantStartPushCh:             make(chan startPushRequest, 8),
+		wroteFrameCh:                make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
+		bodyReadCh:                  make(chan bodyReadMsg),         // buffering doesn't matter either way
+		doneServing:                 make(chan struct{}),
+		clientMaxStreams:            math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
+		advMaxStreams:               s.maxConcurrentStreams(),
+		initialStreamSendWindowSize: initialWindowSize,
+		maxFrameSize:                initialMaxFrameSize,
+		headerTableSize:             initialHeaderTableSize,
+		serveG:                      newGoroutineLock(),
+		pushEnabled:                 true,
 	}
 
 	// The net/http package sets the write deadline from the
@@ -294,6 +321,9 @@
 		sc.writeSched = NewRandomWriteScheduler()
 	}
 
+	// These start at the RFC-specified defaults. If there is a higher
+	// configured value for inflow, that will be updated when we send a
+	// WINDOW_UPDATE shortly after sending SETTINGS.
 	sc.flow.add(initialWindowSize)
 	sc.inflow.add(initialWindowSize)
 	sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
@@ -387,34 +417,34 @@
 	writeSched       WriteScheduler
 
 	// Everything following is owned by the serve loop; use serveG.check():
-	serveG                goroutineLock // used to verify funcs are on serve()
-	pushEnabled           bool
-	sawFirstSettings      bool // got the initial SETTINGS frame after the preface
-	needToSendSettingsAck bool
-	unackedSettings       int    // how many SETTINGS have we sent without ACKs?
-	clientMaxStreams      uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
-	advMaxStreams         uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
-	curClientStreams      uint32 // number of open streams initiated by the client
-	curPushedStreams      uint32 // number of open streams initiated by server push
-	maxClientStreamID     uint32 // max ever seen from client (odd), or 0 if there have been no client requests
-	maxPushPromiseID      uint32 // ID of the last push promise (even), or 0 if there have been no pushes
-	streams               map[uint32]*stream
-	initialWindowSize     int32
-	maxFrameSize          int32
-	headerTableSize       uint32
-	peerMaxHeaderListSize uint32            // zero means unknown (default)
-	canonHeader           map[string]string // http2-lower-case -> Go-Canonical-Case
-	writingFrame          bool              // started writing a frame (on serve goroutine or separate)
-	writingFrameAsync     bool              // started a frame on its own goroutine but haven't heard back on wroteFrameCh
-	needsFrameFlush       bool              // last frame write wasn't a flush
-	inGoAway              bool              // we've started to or sent GOAWAY
-	inFrameScheduleLoop   bool              // whether we're in the scheduleFrameWrite loop
-	needToSendGoAway      bool              // we need to schedule a GOAWAY frame write
-	goAwayCode            ErrCode
-	shutdownTimerCh       <-chan time.Time // nil until used
-	shutdownTimer         *time.Timer      // nil until used
-	idleTimer             *time.Timer      // nil if unused
-	idleTimerCh           <-chan time.Time // nil if unused
+	serveG                      goroutineLock // used to verify funcs are on serve()
+	pushEnabled                 bool
+	sawFirstSettings            bool // got the initial SETTINGS frame after the preface
+	needToSendSettingsAck       bool
+	unackedSettings             int    // how many SETTINGS have we sent without ACKs?
+	clientMaxStreams            uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
+	advMaxStreams               uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
+	curClientStreams            uint32 // number of open streams initiated by the client
+	curPushedStreams            uint32 // number of open streams initiated by server push
+	maxClientStreamID           uint32 // max ever seen from client (odd), or 0 if there have been no client requests
+	maxPushPromiseID            uint32 // ID of the last push promise (even), or 0 if there have been no pushes
+	streams                     map[uint32]*stream
+	initialStreamSendWindowSize int32
+	maxFrameSize                int32
+	headerTableSize             uint32
+	peerMaxHeaderListSize       uint32            // zero means unknown (default)
+	canonHeader                 map[string]string // http2-lower-case -> Go-Canonical-Case
+	writingFrame                bool              // started writing a frame (on serve goroutine or separate)
+	writingFrameAsync           bool              // started a frame on its own goroutine but haven't heard back on wroteFrameCh
+	needsFrameFlush             bool              // last frame write wasn't a flush
+	inGoAway                    bool              // we've started to or sent GOAWAY
+	inFrameScheduleLoop         bool              // whether we're in the scheduleFrameWrite loop
+	needToSendGoAway            bool              // we need to schedule a GOAWAY frame write
+	goAwayCode                  ErrCode
+	shutdownTimerCh             <-chan time.Time // nil until used
+	shutdownTimer               *time.Timer      // nil until used
+	idleTimer                   *time.Timer      // nil if unused
+	idleTimerCh                 <-chan time.Time // nil if unused
 
 	// Owned by the writeFrameAsync goroutine:
 	headerWriteBuf bytes.Buffer
@@ -695,15 +725,17 @@
 			{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
 			{SettingMaxConcurrentStreams, sc.advMaxStreams},
 			{SettingMaxHeaderListSize, sc.maxHeaderListSize()},
-
-			// TODO: more actual settings, notably
-			// SettingInitialWindowSize, but then we also
-			// want to bump up the conn window size the
-			// same amount here right after the settings
+			{SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
 		},
 	})
 	sc.unackedSettings++
 
+	// Each connection starts with intialWindowSize inflow tokens.
+	// If a higher value is configured, we add more tokens.
+	if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
+		sc.sendWindowUpdate(nil, int(diff))
+	}
+
 	if err := sc.readPreface(); err != nil {
 		sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
 		return
@@ -1394,9 +1426,9 @@
 	// adjust the size of all stream flow control windows that it
 	// maintains by the difference between the new value and the
 	// old value."
-	old := sc.initialWindowSize
-	sc.initialWindowSize = int32(val)
-	growth := sc.initialWindowSize - old // may be negative
+	old := sc.initialStreamSendWindowSize
+	sc.initialStreamSendWindowSize = int32(val)
+	growth := int32(val) - old // may be negative
 	for _, st := range sc.streams {
 		if !st.flow.add(growth) {
 			// 6.9.2 Initial Flow Control Window Size
@@ -1718,9 +1750,9 @@
 	}
 	st.cw.Init()
 	st.flow.conn = &sc.flow // link to conn-level counter
-	st.flow.add(sc.initialWindowSize)
-	st.inflow.conn = &sc.inflow      // link to conn-level counter
-	st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
+	st.flow.add(sc.initialStreamSendWindowSize)
+	st.inflow.conn = &sc.inflow // link to conn-level counter
+	st.inflow.add(sc.srv.initialStreamRecvWindowSize())
 
 	sc.streams[id] = st
 	sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
diff --git a/http2/server_test.go b/http2/server_test.go
index 7830d3f..407fafc 100644
--- a/http2/server_test.go
+++ b/http2/server_test.go
@@ -260,11 +260,52 @@
 // greet initiates the client's HTTP/2 connection into a state where
 // frames may be sent.
 func (st *serverTester) greet() {
+	st.greetAndCheckSettings(func(Setting) error { return nil })
+}
+
+func (st *serverTester) greetAndCheckSettings(checkSetting func(s Setting) error) {
 	st.writePreface()
 	st.writeInitialSettings()
-	st.wantSettings()
+	st.wantSettings().ForeachSetting(checkSetting)
 	st.writeSettingsAck()
-	st.wantSettingsAck()
+
+	// The initial WINDOW_UPDATE and SETTINGS ACK can come in any order.
+	var gotSettingsAck bool
+	var gotWindowUpdate bool
+
+	for i := 0; i < 2; i++ {
+		f, err := st.readFrame()
+		if err != nil {
+			st.t.Fatal(err)
+		}
+		switch f := f.(type) {
+		case *SettingsFrame:
+			if !f.Header().Flags.Has(FlagSettingsAck) {
+				st.t.Fatal("Settings Frame didn't have ACK set")
+			}
+			gotSettingsAck = true
+
+		case *WindowUpdateFrame:
+			if f.FrameHeader.StreamID != 0 {
+				st.t.Fatalf("WindowUpdate StreamID = %d; want 0", f.FrameHeader.StreamID, 0)
+			}
+			incr := uint32((&Server{}).initialConnRecvWindowSize() - initialWindowSize)
+			if f.Increment != incr {
+				st.t.Fatalf("WindowUpdate increment = %d; want %d", f.Increment, incr)
+			}
+			gotWindowUpdate = true
+
+		default:
+			st.t.Fatalf("Wanting a settings ACK or window update, received a %T", f)
+		}
+	}
+
+	if !gotSettingsAck {
+		st.t.Fatalf("Didn't get a settings ACK")
+	}
+	if !gotWindowUpdate {
+		st.t.Fatalf("Didn't get a window update")
+	}
 }
 
 func (st *serverTester) writePreface() {
@@ -584,12 +625,7 @@
 		server sends in the HTTP/2 connection.
 	`)
 
-	st.writePreface()
-	st.writeInitialSettings()
-	st.wantSettings()
-	st.writeSettingsAck()
-	st.wantSettingsAck()
-
+	st.greet()
 	st.writeHeaders(HeadersFrameParam{
 		StreamID:      1, // clients send odd numbers
 		BlockFragment: st.encodeHeader(),
@@ -2601,11 +2637,9 @@
 	defer st.Close()
 
 	// shake hands
-	st.writePreface()
-	st.writeInitialSettings()
 	frameSize := defaultMaxReadFrameSize
 	var advHeaderListSize *uint32
-	st.wantSettings().ForeachSetting(func(s Setting) error {
+	st.greetAndCheckSettings(func(s Setting) error {
 		switch s.ID {
 		case SettingMaxFrameSize:
 			if s.Val < minMaxFrameSize {
@@ -2620,8 +2654,6 @@
 		}
 		return nil
 	})
-	st.writeSettingsAck()
-	st.wantSettingsAck()
 
 	if advHeaderListSize == nil {
 		t.Errorf("server didn't advertise a max header list size")