Rename, rearrange, document stuff.
diff --git a/server.go b/server.go
index 9366a62..a2de381 100644
--- a/server.go
+++ b/server.go
@@ -27,8 +27,28 @@
)
const (
- prefaceTimeout = 5 * time.Second
- firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
+ prefaceTimeout = 5 * time.Second
+ firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
+ handlerChunkWriteSize = 4 << 10
+)
+
+var (
+ errClientDisconnected = errors.New("client disconnected")
+ errClosedBody = errors.New("body closed by handler")
+)
+
+var responseWriterStatePool = sync.Pool{
+ New: func() interface{} {
+ rws := &responseWriterState{}
+ rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
+ return rws
+ },
+}
+
+// Test hooks.
+var (
+ testHookOnConn func()
+ testHookGetServerConn func(*serverConn)
)
// TODO: finish GOAWAY support. Consider each incoming frame type and
@@ -78,8 +98,6 @@
return defaultMaxReadFrameSize
}
-var testHookOnConn func() // for testing
-
// ConfigureServer adds HTTP/2 support to a net/http Server.
//
// The configuration conf may be nil.
@@ -114,8 +132,6 @@
}
}
-var testHookGetServerConn func(*serverConn)
-
func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
sc := &serverConn{
srv: srv,
@@ -201,8 +217,8 @@
inGoAway bool // we've started to or sent GOAWAY
needToSendGoAway bool // we need to schedule a GOAWAY frame write
goAwayCode ErrCode
- shutdownTimerCh <-chan time.Time
- shutdownTimer *time.Timer
+ shutdownTimerCh <-chan time.Time // nil until used
+ shutdownTimer *time.Timer // nil until used
// Owned by the writeFrames goroutine; use writeG.check():
headerWriteBuf bytes.Buffer
@@ -402,8 +418,6 @@
return sc.bw.Flush() // may block on the network
}
-var errClientDisconnected = errors.New("client disconnected")
-
func (sc *serverConn) closeAllStreamsOnConnClose() {
sc.serveG.check()
for _, st := range sc.streams {
@@ -428,7 +442,7 @@
sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
- sc.enqueueFrameWrite(frameWriteMsg{write: (*serverConn).sendInitialSettings})
+ sc.writeFrame(frameWriteMsg{write: (*serverConn).sendInitialSettings})
if err := sc.readPreface(); err != nil {
sc.condlogf(err, "error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
@@ -442,7 +456,7 @@
for {
select {
case wm := <-sc.wantWriteFrameCh:
- sc.enqueueFrameWrite(wm)
+ sc.writeFrame(wm)
case <-sc.wroteFrameCh:
sc.writingFrame = false
sc.scheduleFrameWrite()
@@ -520,7 +534,7 @@
// loop and modify the frame scheduler there to write chunks
// of req as tokens allow. Don't necessarily write it all at
// once in one frame.
- sc.writeFrame(frameWriteMsg{
+ sc.writeFrameFromHandler(frameWriteMsg{
write: (*serverConn).writeDataFrame,
cost: uint32(len(data.p)),
stream: stream,
@@ -536,14 +550,14 @@
}
}
-// writeFrame sends wm to sc.wantWriteFrameCh, but aborts if the
-// connection has gone away.
+// writeFrameFromHandler sends wm to sc.wantWriteFrameCh, but aborts
+// if the connection has gone away.
//
// This must not be run from the serve goroutine itself, else it might
// deadlock writing to sc.wantWriteFrameCh (which is only mildly
// buffered and is read by serve itself). If you're on the serve
-// goroutine, call enqueueFrameWrite instead.
-func (sc *serverConn) writeFrame(wm frameWriteMsg) {
+// goroutine, call writeFrame instead.
+func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) {
sc.serveG.checkNotOn() // NOT
select {
case sc.wantWriteFrameCh <- wm:
@@ -552,13 +566,13 @@
}
}
-// enqueueFrameWrite either sends wm to the writeFrames goroutine, or
+// writeFrame either sends wm to the writeFrames goroutine, or
// enqueues it for the future (with no pushback; the serve goroutine
// never blocks!), for sending when the currently-being-written frame
// is done writing.
//
// If you're not on the serve goroutine, use writeFrame instead.
-func (sc *serverConn) enqueueFrameWrite(wm frameWriteMsg) {
+func (sc *serverConn) writeFrame(wm frameWriteMsg) {
sc.serveG.check()
// Fast path for common case:
if !sc.writingFrame {
@@ -609,26 +623,18 @@
sc.writeFrameCh <- wm
}
-func (sc *serverConn) sendFrameWriteFlush() {
- sc.serveG.check()
- if sc.writingFrame {
- panic("invariant")
- }
- sc.writingFrame = true
- sc.needsFrameFlush = false
- sc.writeFrameCh <- frameWriteMsg{write: (*serverConn).flushFrameWriter}
-}
-
-func (sc *serverConn) enqueueSettingsAck() {
- sc.serveG.check()
- if !sc.writingFrame {
- sc.needToSendSettingsAck = false
- sc.sendFrameWrite(frameWriteMsg{write: (*serverConn).writeSettingsAck})
- return
- }
- sc.needToSendSettingsAck = true
-}
-
+// scheduleFrameWrite tickles the frame writing scheduler.
+//
+// If a frame is already being written, nothing happens. This will be called again
+// when the frame is done being written.
+//
+// If a frame isn't being written we need to send one, the best frame
+// to send is selected, preferring first things that aren't
+// stream-specific (e.g. ACKing settings), and then finding the
+// highest priority stream.
+//
+// If a frame isn't being written and there's nothing else to send, we
+// flush the write buffer.
func (sc *serverConn) scheduleFrameWrite() {
sc.serveG.check()
if sc.writingFrame {
@@ -646,7 +652,8 @@
return
}
if len(sc.writeQueue) == 0 && sc.needsFrameFlush {
- sc.sendFrameWriteFlush()
+ sc.sendFrameWrite(frameWriteMsg{write: (*serverConn).flushFrameWriter})
+ sc.needsFrameFlush = false // after sendFrameWrite, since it sets this true
return
}
if sc.inGoAway {
@@ -654,7 +661,8 @@
return
}
if sc.needToSendSettingsAck {
- sc.enqueueSettingsAck()
+ sc.needToSendSettingsAck = false
+ sc.sendFrameWrite(frameWriteMsg{write: (*serverConn).writeSettingsAck})
return
}
if len(sc.writeQueue) == 0 {
@@ -734,7 +742,7 @@
if !ok {
panic(fmt.Sprintf("invariant. closing non-open stream %d", se.StreamID))
}
- sc.enqueueFrameWrite(frameWriteMsg{
+ sc.writeFrame(frameWriteMsg{
write: (*serverConn).writeRSTStreamFrame,
v: &se,
})
@@ -859,7 +867,7 @@
// PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol)
}
- sc.enqueueFrameWrite(frameWriteMsg{
+ sc.writeFrame(frameWriteMsg{
write: (*serverConn).writePingAck,
v: f,
})
@@ -937,7 +945,8 @@
if err := f.ForeachSetting(sc.processSetting); err != nil {
return err
}
- sc.enqueueSettingsAck()
+ sc.needToSendSettingsAck = true
+ sc.scheduleFrameWrite()
return nil
}
@@ -1217,16 +1226,6 @@
return rw, req, nil
}
-const handlerChunkWriteSize = 4 << 10
-
-var responseWriterStatePool = sync.Pool{
- New: func() interface{} {
- rws := &responseWriterState{}
- rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
- return rws
- },
-}
-
// Run on its own goroutine.
func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request) {
defer rw.handlerDone()
@@ -1272,7 +1271,7 @@
// mutates it.
errc = tempCh
}
- sc.writeFrame(frameWriteMsg{
+ sc.writeFrameFromHandler(frameWriteMsg{
write: (*serverConn).writeHeadersFrame,
v: req,
stream: req.stream,
@@ -1329,7 +1328,7 @@
// called from handler goroutines.
func (sc *serverConn) write100ContinueHeaders(st *stream) {
sc.serveG.checkNotOn() // NOT
- sc.writeFrame(frameWriteMsg{
+ sc.writeFrameFromHandler(frameWriteMsg{
write: (*serverConn).write100ContinueHeadersFrame,
stream: st,
})
@@ -1365,7 +1364,7 @@
}
const maxUint32 = 2147483647
for n >= maxUint32 {
- sc.writeFrame(frameWriteMsg{
+ sc.writeFrameFromHandler(frameWriteMsg{
write: (*serverConn).sendWindowUpdateInLoop,
v: windowUpdateReq{maxUint32},
stream: st,
@@ -1373,7 +1372,7 @@
n -= maxUint32
}
if n > 0 {
- sc.writeFrame(frameWriteMsg{
+ sc.writeFrameFromHandler(frameWriteMsg{
write: (*serverConn).sendWindowUpdateInLoop,
v: windowUpdateReq{uint32(n)},
stream: st,
@@ -1400,8 +1399,6 @@
needsContinue bool // need to send a 100-continue
}
-var errClosedBody = errors.New("body closed by handler")
-
func (b *requestBody) Close() error {
if b.pipe != nil {
b.pipe.Close(errClosedBody)