Track the flow control of the peer explicitly.
Before we enforced flow control only because we gave them a 64k buffer
to write into and blew up if they went over. But in prep for larger,
non-default, and configurable initial window sizes, we need to start
counting.
diff --git a/server.go b/server.go
index ebfe400..7779d32 100644
--- a/server.go
+++ b/server.go
@@ -34,7 +34,7 @@
prefaceTimeout = 10 * time.Second
firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
handlerChunkWriteSize = 4 << 10
- defaultMaxStreams = 250
+ defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to?
)
var (
@@ -207,6 +207,7 @@
pushEnabled: true,
}
sc.flow.add(initialWindowSize)
+ sc.inflow.add(initialWindowSize)
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
@@ -317,7 +318,8 @@
wroteFrameCh chan struct{} // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan bodyReadMsg // from handlers -> serve
testHookCh chan func() // code to run on the serve loop
- flow flow // connection-wide (not stream-specific) flow control
+ flow flow // conn-wide (not stream-specific) outbound flow control
+ inflow flow // conn-wide inbound flow control
tlsState *tls.ConnectionState // shared by all handlers, like net/http
// Everything following is owned by the serve loop; use serveG.check():
@@ -374,18 +376,19 @@
type stream struct {
// immutable:
id uint32
- flow flow // limits writing from Handler to client
body *pipe // non-nil if expecting DATA frames
cw closeWaiter // closed wait stream transitions to closed state
// owned by serverConn's serve loop:
+ bodyBytes int64 // body bytes seen so far
+ declBodyBytes int64 // or -1 if undeclared
+ flow flow // limits writing from Handler to client
+ inflow flow // what the client is allowed to POST/etc to us
parent *stream // or nil
weight uint8
state streamState
- bodyBytes int64 // body bytes seen so far
- declBodyBytes int64 // or -1 if undeclared
- sentReset bool // only true once detached from streams map
- gotReset bool // only true once detacted from streams map
+ sentReset bool // only true once detached from streams map
+ gotReset bool // only true once detacted from streams map
}
func (sc *serverConn) Framer() *Framer { return sc.framer }
@@ -651,7 +654,7 @@
errc <- nil
}
}()
- timer := time.NewTimer(5 * time.Second) // TODO: configurable on *Server?
+ timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
defer timer.Stop()
select {
case <-timer.C:
@@ -1145,13 +1148,11 @@
return StreamError{id, ErrCodeStreamClosed}
}
if len(data) > 0 {
- // TODO: verify they're allowed to write with the flow
- // control window we'd advertised to them. (currently
- // this is fails elsewhere, in that the body buffer is
- // always 65k, the default initial window size, but
- // once that's fixed to grow and shrink on demand,
- // we'll need to be stricter before that, or in the
- // buffer code)
+ // Check whether the client has flow control quota.
+ if int(st.inflow.available()) < len(data) {
+ return StreamError{id, ErrCodeFlowControl}
+ }
+ st.inflow.take(int32(len(data)))
wrote, err := st.body.Write(data)
if err != nil {
return StreamError{id, ErrCodeStreamClosed}
@@ -1198,13 +1199,16 @@
id: id,
state: stateOpen,
}
- // connection-level flow control is shared by all streams.
- st.flow.conn = &sc.flow
- st.flow.add(sc.initialWindowSize)
- st.cw.Init() // make Cond use its Mutex, without heap-promoting them separately
if f.StreamEnded() {
st.state = stateHalfClosedRemote
}
+ st.cw.Init()
+
+ st.flow.conn = &sc.flow // link to conn-level counter
+ st.flow.add(sc.initialWindowSize)
+ st.inflow.conn = &sc.inflow // link to conn-level counter
+ st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
+
sc.streams[id] = st
if f.HasPriority() {
sc.adjustStreamPriority(st.id, f.Priority)
@@ -1457,23 +1461,42 @@
sc.serveG.check()
// "The legal range for the increment to the flow control
// window is 1 to 2^31-1 (2,147,483,647) octets."
+ // A Go Read call on 64-bit machines could in theory read
+ // a larger Read than this. Very unlikely, but we handle it here
+ // rather than elsewhere for now.
+ const maxUint31 = 1<<31 - 1
+ for n >= maxUint31 {
+ sc.sendWindowUpdate32(st, maxUint31)
+ n -= maxUint31
+ }
+ sc.sendWindowUpdate32(st, int32(n))
+}
+
+// st may be nil for conn-level
+func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
+ sc.serveG.check()
+ if n == 0 {
+ return
+ }
+ if n < 0 {
+ panic("negative update")
+ }
var streamID uint32
if st != nil {
streamID = st.id
}
- const maxUint31 = 1<<31 - 1
- for n >= maxUint31 {
- sc.writeFrame(frameWriteMsg{
- write: writeWindowUpdate{streamID: streamID, n: maxUint31},
- stream: st,
- })
- n -= maxUint31
+ sc.writeFrame(frameWriteMsg{
+ write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
+ stream: st,
+ })
+ var ok bool
+ if st == nil {
+ ok = sc.inflow.add(n)
+ } else {
+ ok = st.inflow.add(n)
}
- if n > 0 {
- sc.writeFrame(frameWriteMsg{
- write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
- stream: st,
- })
+ if !ok {
+ panic("internal error; sent too many window updates without decrements?")
}
}