http2: add configurable knobs for the server's receive window
Upload performance is poor when BDP is higher than the flow-control window.
Previously, the server's receive window was fixed at 64KB, which resulted in
very poor performance for high-BDP links. The receive window now defaults to
1MB and is configurable. The per-connection and per-stream windows are
configurable separately (both default to 1MB as suggested in golang/go#16512).
Previously, the server created a "fixedBuffer" for each request body. This is no
longer a good idea because a fixedBuffer has fixed size, which means individual
streams cannot use varying amounts of the available connection window. To
overcome this limitation, I replaced fixedBuffer with "dataBuffer", which grows
and shrinks based on current usage. The worst-case fragmentation of dataBuffer
is 32KB wasted memory per stream, but I expect that worst-case will be rare.
A slightly modified version of adg@'s grpcbench program shows a dramatic
improvement when increasing from a 64KB window to a 1MB window, especially at
higher latencies (i.e., higher BDPs). Network latency was simulated with netem,
e.g., `tc qdisc add dev lo root netem delay 16ms`.
Duration Latency Proto H2 Window
11ms±4.05ms 0s HTTP/1.1 -
17ms±1.95ms 0s HTTP/2.0 65535
8ms±1.75ms 0s HTTP/2.0 1048576
10ms±1.49ms 1ms HTTP/1.1 -
47ms±2.91ms 1ms HTTP/2.0 65535
10ms±1.77ms 1ms HTTP/2.0 1048576
15ms±1.69ms 2ms HTTP/1.1 -
88ms±11.29ms 2ms HTTP/2.0 65535
15ms±1.18ms 2ms HTTP/2.0 1048576
23ms±1.42ms 4ms HTTP/1.1 -
152ms±0.77ms 4ms HTTP/2.0 65535
23ms±0.94ms 4ms HTTP/2.0 1048576
40ms±1.54ms 8ms HTTP/1.1 -
288ms±1.67ms 8ms HTTP/2.0 65535
39ms±1.29ms 8ms HTTP/2.0 1048576
72ms±1.13ms 16ms HTTP/1.1 -
559ms±0.68ms 16ms HTTP/2.0 65535
71ms±1.12ms 16ms HTTP/2.0 1048576
136ms±1.15ms 32ms HTTP/1.1 -
1104ms±1.62ms 32ms HTTP/2.0 65535
135ms±0.96ms 32ms HTTP/2.0 1048576
264ms±0.95ms 64ms HTTP/1.1 -
2191ms±2.08ms 64ms HTTP/2.0 65535
263ms±1.57ms 64ms HTTP/2.0 1048576
Fixes golang/go#16512
Updates golang/go#17985
Updates golang/go#18404
Change-Id: Ied385aa94588337e98dad9475cf2ece2f39ba346
Reviewed-on: https://go-review.googlesource.com/37226
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
diff --git a/http2/server.go b/http2/server.go
index 7e523f8..550427d 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -110,11 +110,38 @@
// activity for the purposes of IdleTimeout.
IdleTimeout time.Duration
+ // MaxUploadBufferPerConnection is the size of the initial flow
+ // control window for each connections. The HTTP/2 spec does not
+ // allow this to be smaller than 65535 or larger than 2^32-1.
+ // If the value is outside this range, a default value will be
+ // used instead.
+ MaxUploadBufferPerConnection int32
+
+ // MaxUploadBufferPerStream is the size of the initial flow control
+ // window for each stream. The HTTP/2 spec does not allow this to
+ // be larger than 2^32-1. If the value is zero or larger than the
+ // maximum, a default value will be used instead.
+ MaxUploadBufferPerStream int32
+
// NewWriteScheduler constructs a write scheduler for a connection.
// If nil, a default scheduler is chosen.
NewWriteScheduler func() WriteScheduler
}
+func (s *Server) initialConnRecvWindowSize() int32 {
+ if s.MaxUploadBufferPerConnection > initialWindowSize {
+ return s.MaxUploadBufferPerConnection
+ }
+ return 1 << 20
+}
+
+func (s *Server) initialStreamRecvWindowSize() int32 {
+ if s.MaxUploadBufferPerStream > 0 {
+ return s.MaxUploadBufferPerStream
+ }
+ return 1 << 20
+}
+
func (s *Server) maxReadFrameSize() uint32 {
if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
return v
@@ -255,27 +282,27 @@
defer cancel()
sc := &serverConn{
- srv: s,
- hs: opts.baseConfig(),
- conn: c,
- baseCtx: baseCtx,
- remoteAddrStr: c.RemoteAddr().String(),
- bw: newBufferedWriter(c),
- handler: opts.handler(),
- streams: make(map[uint32]*stream),
- readFrameCh: make(chan readFrameResult),
- wantWriteFrameCh: make(chan FrameWriteRequest, 8),
- wantStartPushCh: make(chan startPushRequest, 8),
- wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
- bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
- doneServing: make(chan struct{}),
- clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
- advMaxStreams: s.maxConcurrentStreams(),
- initialWindowSize: initialWindowSize,
- maxFrameSize: initialMaxFrameSize,
- headerTableSize: initialHeaderTableSize,
- serveG: newGoroutineLock(),
- pushEnabled: true,
+ srv: s,
+ hs: opts.baseConfig(),
+ conn: c,
+ baseCtx: baseCtx,
+ remoteAddrStr: c.RemoteAddr().String(),
+ bw: newBufferedWriter(c),
+ handler: opts.handler(),
+ streams: make(map[uint32]*stream),
+ readFrameCh: make(chan readFrameResult),
+ wantWriteFrameCh: make(chan FrameWriteRequest, 8),
+ wantStartPushCh: make(chan startPushRequest, 8),
+ wroteFrameCh: make(chan frameWriteResult, 1), // buffered; one send in writeFrameAsync
+ bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
+ doneServing: make(chan struct{}),
+ clientMaxStreams: math.MaxUint32, // Section 6.5.2: "Initially, there is no limit to this value"
+ advMaxStreams: s.maxConcurrentStreams(),
+ initialStreamSendWindowSize: initialWindowSize,
+ maxFrameSize: initialMaxFrameSize,
+ headerTableSize: initialHeaderTableSize,
+ serveG: newGoroutineLock(),
+ pushEnabled: true,
}
// The net/http package sets the write deadline from the
@@ -294,6 +321,9 @@
sc.writeSched = NewRandomWriteScheduler()
}
+ // These start at the RFC-specified defaults. If there is a higher
+ // configured value for inflow, that will be updated when we send a
+ // WINDOW_UPDATE shortly after sending SETTINGS.
sc.flow.add(initialWindowSize)
sc.inflow.add(initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
@@ -387,34 +417,34 @@
writeSched WriteScheduler
// Everything following is owned by the serve loop; use serveG.check():
- serveG goroutineLock // used to verify funcs are on serve()
- pushEnabled bool
- sawFirstSettings bool // got the initial SETTINGS frame after the preface
- needToSendSettingsAck bool
- unackedSettings int // how many SETTINGS have we sent without ACKs?
- clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
- advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
- curClientStreams uint32 // number of open streams initiated by the client
- curPushedStreams uint32 // number of open streams initiated by server push
- maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
- maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
- streams map[uint32]*stream
- initialWindowSize int32
- maxFrameSize int32
- headerTableSize uint32
- peerMaxHeaderListSize uint32 // zero means unknown (default)
- canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
- writingFrame bool // started writing a frame (on serve goroutine or separate)
- writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
- needsFrameFlush bool // last frame write wasn't a flush
- inGoAway bool // we've started to or sent GOAWAY
- inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
- needToSendGoAway bool // we need to schedule a GOAWAY frame write
- goAwayCode ErrCode
- shutdownTimerCh <-chan time.Time // nil until used
- shutdownTimer *time.Timer // nil until used
- idleTimer *time.Timer // nil if unused
- idleTimerCh <-chan time.Time // nil if unused
+ serveG goroutineLock // used to verify funcs are on serve()
+ pushEnabled bool
+ sawFirstSettings bool // got the initial SETTINGS frame after the preface
+ needToSendSettingsAck bool
+ unackedSettings int // how many SETTINGS have we sent without ACKs?
+ clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
+ advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
+ curClientStreams uint32 // number of open streams initiated by the client
+ curPushedStreams uint32 // number of open streams initiated by server push
+ maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests
+ maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes
+ streams map[uint32]*stream
+ initialStreamSendWindowSize int32
+ maxFrameSize int32
+ headerTableSize uint32
+ peerMaxHeaderListSize uint32 // zero means unknown (default)
+ canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
+ writingFrame bool // started writing a frame (on serve goroutine or separate)
+ writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh
+ needsFrameFlush bool // last frame write wasn't a flush
+ inGoAway bool // we've started to or sent GOAWAY
+ inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop
+ needToSendGoAway bool // we need to schedule a GOAWAY frame write
+ goAwayCode ErrCode
+ shutdownTimerCh <-chan time.Time // nil until used
+ shutdownTimer *time.Timer // nil until used
+ idleTimer *time.Timer // nil if unused
+ idleTimerCh <-chan time.Time // nil if unused
// Owned by the writeFrameAsync goroutine:
headerWriteBuf bytes.Buffer
@@ -695,15 +725,17 @@
{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
{SettingMaxConcurrentStreams, sc.advMaxStreams},
{SettingMaxHeaderListSize, sc.maxHeaderListSize()},
-
- // TODO: more actual settings, notably
- // SettingInitialWindowSize, but then we also
- // want to bump up the conn window size the
- // same amount here right after the settings
+ {SettingInitialWindowSize, uint32(sc.srv.initialStreamRecvWindowSize())},
},
})
sc.unackedSettings++
+ // Each connection starts with intialWindowSize inflow tokens.
+ // If a higher value is configured, we add more tokens.
+ if diff := sc.srv.initialConnRecvWindowSize() - initialWindowSize; diff > 0 {
+ sc.sendWindowUpdate(nil, int(diff))
+ }
+
if err := sc.readPreface(); err != nil {
sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
return
@@ -1394,9 +1426,9 @@
// adjust the size of all stream flow control windows that it
// maintains by the difference between the new value and the
// old value."
- old := sc.initialWindowSize
- sc.initialWindowSize = int32(val)
- growth := sc.initialWindowSize - old // may be negative
+ old := sc.initialStreamSendWindowSize
+ sc.initialStreamSendWindowSize = int32(val)
+ growth := int32(val) - old // may be negative
for _, st := range sc.streams {
if !st.flow.add(growth) {
// 6.9.2 Initial Flow Control Window Size
@@ -1718,9 +1750,9 @@
}
st.cw.Init()
st.flow.conn = &sc.flow // link to conn-level counter
- st.flow.add(sc.initialWindowSize)
- st.inflow.conn = &sc.inflow // link to conn-level counter
- st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
+ st.flow.add(sc.initialStreamSendWindowSize)
+ st.inflow.conn = &sc.inflow // link to conn-level counter
+ st.inflow.add(sc.srv.initialStreamRecvWindowSize())
sc.streams[id] = st
sc.writeSched.OpenStream(st.id, OpenStreamOptions{PusherID: pusherID})
diff --git a/http2/server_test.go b/http2/server_test.go
index 7830d3f..407fafc 100644
--- a/http2/server_test.go
+++ b/http2/server_test.go
@@ -260,11 +260,52 @@
// greet initiates the client's HTTP/2 connection into a state where
// frames may be sent.
func (st *serverTester) greet() {
+ st.greetAndCheckSettings(func(Setting) error { return nil })
+}
+
+func (st *serverTester) greetAndCheckSettings(checkSetting func(s Setting) error) {
st.writePreface()
st.writeInitialSettings()
- st.wantSettings()
+ st.wantSettings().ForeachSetting(checkSetting)
st.writeSettingsAck()
- st.wantSettingsAck()
+
+ // The initial WINDOW_UPDATE and SETTINGS ACK can come in any order.
+ var gotSettingsAck bool
+ var gotWindowUpdate bool
+
+ for i := 0; i < 2; i++ {
+ f, err := st.readFrame()
+ if err != nil {
+ st.t.Fatal(err)
+ }
+ switch f := f.(type) {
+ case *SettingsFrame:
+ if !f.Header().Flags.Has(FlagSettingsAck) {
+ st.t.Fatal("Settings Frame didn't have ACK set")
+ }
+ gotSettingsAck = true
+
+ case *WindowUpdateFrame:
+ if f.FrameHeader.StreamID != 0 {
+ st.t.Fatalf("WindowUpdate StreamID = %d; want 0", f.FrameHeader.StreamID, 0)
+ }
+ incr := uint32((&Server{}).initialConnRecvWindowSize() - initialWindowSize)
+ if f.Increment != incr {
+ st.t.Fatalf("WindowUpdate increment = %d; want %d", f.Increment, incr)
+ }
+ gotWindowUpdate = true
+
+ default:
+ st.t.Fatalf("Wanting a settings ACK or window update, received a %T", f)
+ }
+ }
+
+ if !gotSettingsAck {
+ st.t.Fatalf("Didn't get a settings ACK")
+ }
+ if !gotWindowUpdate {
+ st.t.Fatalf("Didn't get a window update")
+ }
}
func (st *serverTester) writePreface() {
@@ -584,12 +625,7 @@
server sends in the HTTP/2 connection.
`)
- st.writePreface()
- st.writeInitialSettings()
- st.wantSettings()
- st.writeSettingsAck()
- st.wantSettingsAck()
-
+ st.greet()
st.writeHeaders(HeadersFrameParam{
StreamID: 1, // clients send odd numbers
BlockFragment: st.encodeHeader(),
@@ -2601,11 +2637,9 @@
defer st.Close()
// shake hands
- st.writePreface()
- st.writeInitialSettings()
frameSize := defaultMaxReadFrameSize
var advHeaderListSize *uint32
- st.wantSettings().ForeachSetting(func(s Setting) error {
+ st.greetAndCheckSettings(func(s Setting) error {
switch s.ID {
case SettingMaxFrameSize:
if s.Val < minMaxFrameSize {
@@ -2620,8 +2654,6 @@
}
return nil
})
- st.writeSettingsAck()
- st.wantSettingsAck()
if advHeaderListSize == nil {
t.Errorf("server didn't advertise a max header list size")