Move server's WINDOW_UPDATE sending logic into the serve loop.
Previously the http.Handler (via requestBody) were just blinding
writing telling the serve loop to send the WINDOW_UPDATE frames, but
the serve loop didn't really know what was going on.
Now the requestBody.Read instead tells the serve loop that n bytes
were read on a given stream, so the server can be smarter.
This opens the door to coalescing updates, and suppressing pointless
ones (like the new TODO notes: no need to send a stream-specific
WINDOW_UPDATE when the peer has closed their side already)
No (intentional) changes in behavior in this change, though.
diff --git a/server.go b/server.go
index 705e62a..4bd2a3e 100644
--- a/server.go
+++ b/server.go
@@ -166,6 +166,7 @@
readFrameErrCh: make(chan error, 1), // must be buffered for 1
wantWriteFrameCh: make(chan frameWriteMsg, 8),
wroteFrameCh: make(chan struct{}, 1), // buffered; one send in reading goroutine
+ bodyReadCh: make(chan bodyReadMsg), // buffering doesn't matter either way
doneServing: make(chan struct{}),
advMaxStreams: srv.maxConcurrentStreams(),
writeSched: writeScheduler{
@@ -214,6 +215,7 @@
readFrameErrCh chan error
wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
wroteFrameCh chan struct{} // from writeFrameAsync -> serve, tickles more frame writes
+ bodyReadCh chan bodyReadMsg // from handlers -> serve
testHookCh chan func() // code to run on the serve loop
flow flow // connection-wide (not stream-specific) flow control
@@ -504,6 +506,8 @@
settingsTimer.Stop()
settingsTimer.C = nil
}
+ case m := <-sc.bodyReadCh:
+ sc.noteBodyRead(m.st, m.n)
case <-settingsTimer.C:
sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
return
@@ -1307,22 +1311,50 @@
})
}
-// called from handler goroutines
+// A bodyReadMsg tells the server loop that the http.Handler read n
+// bytes of the DATA from the client on the given stream.
+type bodyReadMsg struct {
+ st *stream
+ n int
+}
+
+// called from handler goroutines.
+// Notes that the handler for the given stream ID read n bytes of its body
+// and schedules flow control tokens to be sent.
+func (sc *serverConn) noteBodyReadFromHandler(st *stream, n int) {
+ sc.serveG.checkNotOn() // NOT on
+ sc.bodyReadCh <- bodyReadMsg{st, n}
+}
+
+func (sc *serverConn) noteBodyRead(st *stream, n int) {
+ sc.serveG.check()
+ sc.sendWindowUpdate(nil, n) // conn-level
+ // TODO: don't send this WINDOW_UPDATE if the stream is in
+ // stateClosedRemote. No need to tell them they can send more
+ // if they've already said they're done.
+ sc.sendWindowUpdate(st, n)
+}
+
+// st may be nil for conn-level
func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
- if st == nil {
- panic("no stream")
+ sc.serveG.check()
+ // "The legal range for the increment to the flow control
+ // window is 1 to 2^31-1 (2,147,483,647) octets."
+ var streamID uint32
+ if st != nil {
+ streamID = st.id
}
- const maxUint32 = 2147483647
- for n >= maxUint32 {
- sc.writeFrameFromHandler(frameWriteMsg{
- write: writeWindowUpdate{streamID: st.id, n: maxUint32},
+ const maxUint31 = 1<<31 - 1
+ for n >= maxUint31 {
+ sc.writeFrame(frameWriteMsg{
+ write: writeWindowUpdate{streamID: streamID, n: maxUint31},
stream: st,
})
- n -= maxUint32
+ n -= maxUint31
}
if n > 0 {
- sc.writeFrameFromHandler(frameWriteMsg{
- write: writeWindowUpdate{streamID: st.id, n: uint32(n)},
+ sc.writeFrame(frameWriteMsg{
+ write: writeWindowUpdate{streamID: streamID, n: uint32(n)},
stream: st,
})
}
@@ -1354,7 +1386,7 @@
}
n, err = b.pipe.Read(p)
if n > 0 {
- b.conn.sendWindowUpdate(b.stream, n)
+ b.conn.noteBodyReadFromHandler(b.stream, n)
}
return
}