Buffer the writing of frames.
Previously each written frame went in its own network packet.
Now we buffer until the frame writer has nothing else to write, at
which point it flushes and discards its write buffers, to minimize
the memory usage of idle connections.
diff --git a/server.go b/server.go
index 9cd504a..405593f 100644
--- a/server.go
+++ b/server.go
@@ -45,6 +45,9 @@
// TODO: send PING frames to idle clients and disconnect them if no
// reply
+// TODO: don't keep the writeFrames goroutine active. turn it off when no frames
+// are enqueued.
+
// Server is an HTTP/2 server.
type Server struct {
// MaxStreams optionally ...
@@ -102,17 +105,12 @@
var testHookGetServerConn func(*serverConn)
func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
- // TODO: write to a (custom?) buffered writer that can
- // alternate when it's in buffered mode.
- fr := NewFramer(c, c)
- fr.SetMaxReadFrameSize(srv.maxReadFrameSize())
-
sc := &serverConn{
srv: srv,
hs: hs,
conn: c,
+ bw: newBufferedWriter(c),
handler: h,
- framer: fr,
streams: make(map[uint32]*stream),
readFrameCh: make(chan frameAndGate),
readFrameErrCh: make(chan error, 1), // must be buffered for 1
@@ -130,6 +128,11 @@
}
sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
+
+ fr := NewFramer(sc.bw, c)
+ fr.SetMaxReadFrameSize(srv.maxReadFrameSize())
+ sc.framer = fr
+
if hook := testHookGetServerConn; hook != nil {
hook(sc)
}
@@ -151,6 +154,7 @@
srv *Server
hs *http.Server
conn net.Conn
+ bw *bufferedWriter // writing to conn
handler http.Handler
framer *Framer
hpackDecoder *hpack.Decoder
@@ -180,6 +184,7 @@
canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
req requestParam // non-zero while reading request headers
writingFrame bool // sent on writeFrameCh but haven't heard back on wroteFrameCh yet
+ needsFrameFlush bool // last frame to writeFrameCh wasn't a flush
writeQueue []frameWriteMsg // TODO: proper scheduler, not a queue
inGoAway bool // we've started to or sent GOAWAY
needToSendGoAway bool // we need to schedule a GOAWAY frame write
@@ -376,6 +381,11 @@
}
}
+func (sc *serverConn) flushFrameWriter(_ interface{}) error {
+ sc.writeG.check()
+ return sc.bw.Flush() // may block on the network
+}
+
var errClientDisconnected = errors.New("client disconnected")
func (sc *serverConn) closeAllStreamsOnConnClose() {
@@ -528,6 +538,7 @@
}
sc.writingFrame = true
+ sc.needsFrameFlush = true
if wm.endStream {
if st == nil {
panic("nil stream with endStream set")
@@ -542,11 +553,21 @@
sc.writeFrameCh <- wm
}
+func (sc *serverConn) sendFrameWriteFlush() {
+ sc.serveG.check()
+ if sc.writingFrame {
+ panic("invariant")
+ }
+ sc.writingFrame = true
+ sc.needsFrameFlush = false
+ sc.writeFrameCh <- frameWriteMsg{write: (*serverConn).flushFrameWriter}
+}
+
func (sc *serverConn) enqueueSettingsAck() {
sc.serveG.check()
if !sc.writingFrame {
sc.needToSendSettingsAck = false
- sc.writeFrameCh <- frameWriteMsg{write: (*serverConn).writeSettingsAck}
+ sc.sendFrameWrite(frameWriteMsg{write: (*serverConn).writeSettingsAck})
return
}
sc.needToSendSettingsAck = true
@@ -568,6 +589,10 @@
})
return
}
+ if len(sc.writeQueue) == 0 && sc.needsFrameFlush {
+ sc.sendFrameWriteFlush()
+ return
+ }
if sc.inGoAway {
// No more frames after we've sent GOAWAY.
return
@@ -577,7 +602,6 @@
return
}
if len(sc.writeQueue) == 0 {
- // TODO: flush Framer's underlying buffered writer, once that's added
return
}
@@ -627,9 +651,7 @@
p := v.(*goAwayParams)
err := sc.framer.WriteGoAway(p.maxStreamID, p.code, nil)
if p.code != 0 {
- // TODO: flush any buffer, if we add a buffering writing
- // Sleep a bit to give the peer a bit of time to read the
- // GOAWAY before potentially getting a TCP RST packet:
+ sc.bw.Flush() // ignore error: we're hanging up on them anyway
time.Sleep(50 * time.Millisecond)
sc.conn.Close()
}