Simply, break up serve loop, handle SettingsFrames better, ACK them, etc
diff --git a/server.go b/server.go
index b56c64d..d8ea1eb 100644
--- a/server.go
+++ b/server.go
@@ -26,8 +26,21 @@
"github.com/bradfitz/http2/hpack"
)
-// TODO: finish GOAWAY support. Consider each incoming frame type and whether
-// it should be ignored during a shutdown race.
+const (
+ prefaceTimeout = 5 * time.Second
+ firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
+)
+
+// TODO: finish GOAWAY support. Consider each incoming frame type and
+// whether it should be ignored during a shutdown race.
+
+// TODO: (edge case?) if peer sends a SETTINGS frame with e.g. a
+// SETTINGS_MAX_FRAME_SIZE that's lower than what we had before,
+// before we ACK it we have to make sure all currently-active streams
+// know about that and don't have existing too-large frames in flight?
+// Perhaps the settings processing should just wait for new frame to
+// be in-flight and then the frame scheduler in the serve goroutine
+// will be responsible for splitting things.
// Server is an HTTP/2 server.
type Server struct {
@@ -123,15 +136,17 @@
flow *flow // connection-wide (not stream-specific) flow control
// Everything following is owned by the serve loop; use serveG.check():
- maxStreamID uint32 // max ever seen
- streams map[uint32]*stream
- maxWriteFrameSize uint32 // TODO: update this when settings come in
- initialWindowSize int32
- canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
- sentGoAway bool
- req requestParam // non-zero while reading request headers
- writingFrame bool // sent on writeFrameCh but haven't heard back on wroteFrameCh yet
- writeQueue []frameWriteMsg // TODO: proper scheduler, not a queue
+ sawFirstSettings bool // got the initial SETTINGS frame after the preface
+ needToSendSettingsAck bool
+ maxStreamID uint32 // max ever seen
+ streams map[uint32]*stream
+ maxWriteFrameSize uint32 // TODO: update this when settings come in
+ initialWindowSize int32
+ canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
+ sentGoAway bool
+ req requestParam // non-zero while reading request headers
+ writingFrame bool // sent on writeFrameCh but haven't heard back on wroteFrameCh yet
+ writeQueue []frameWriteMsg // TODO: proper scheduler, not a queue
// Owned by the writeFrames goroutine; use writeG.check():
headerWriteBuf bytes.Buffer
@@ -316,39 +331,13 @@
sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
- if err := sc.readPreface(); err != nil {
- sc.condlogf(err, "Error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
- return
- }
-
- f, err := sc.framer.ReadFrame() // TODO: timeout
- if err != nil {
- sc.logf("error reading initial frame from client: %v", err)
- return
- }
- sf, ok := f.(*SettingsFrame)
- if !ok {
- sc.logf("invalid initial frame type %T received from client", f)
- return
- }
- if err := sf.ForeachSetting(sc.processSetting); err != nil {
- sc.logf("initial settings error: %v", err)
- return
- }
-
- // TODO: don't send two network packets for our SETTINGS + our
- // ACK of their settings. But if we make framer write to a
- // *bufio.Writer, that increases the per-connection memory
- // overhead, and there could be many idle conns. So maybe some
- // liveswitchWriter-like thing where we only switch to a
- // *bufio Writer when we really need one temporarily, else go
- // back to an unbuffered writes by default.
if err := sc.framer.WriteSettings( /* TODO: actual settings */ ); err != nil {
sc.logf("error writing server's initial settings: %v", err)
return
}
- if err := sc.framer.WriteSettingsAck(); err != nil {
- sc.logf("error writing server's ack of client's settings: %v", err)
+
+ if err := sc.readPreface(); err != nil {
+ sc.condlogf(err, "Error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
return
}
@@ -356,6 +345,8 @@
go sc.writeFrames()
defer close(sc.writeFrameCh) // shuts down writeFrames loop
+ settingsTimer := time.NewTimer(firstSettingsTimeout)
+
for {
select {
case wm := <-sc.wantWriteFrameCh:
@@ -364,40 +355,16 @@
sc.writingFrame = false
sc.scheduleFrameWrite()
case fg, ok := <-sc.readFrameCh:
- if !ok {
- err := <-sc.readFrameErrCh
- if err != io.EOF {
- errstr := err.Error()
- if !strings.Contains(errstr, "use of closed network connection") {
- sc.logf("client %s stopped sending frames: %v", sc.conn.RemoteAddr(), errstr)
- }
- }
+ if !sc.processFrameFromReader(fg, ok) {
return
}
- f := fg.f
- sc.vlogf("got %v: %#v", f.Header(), f)
- err := sc.processFrame(f)
- fg.g.Done() // unblock the readFrames goroutine
- switch ev := err.(type) {
- case nil:
- // nothing.
- case StreamError:
- if err := sc.resetStreamInLoop(ev); err != nil {
- sc.logf("Error writing RSTSTream: %v", err)
- return
- }
- case ConnectionError:
- sc.logf("Disconnecting; %v", ev)
- return
- case goAwayFlowError:
- if err := sc.goAway(ErrCodeFlowControl); err != nil {
- sc.condlogf(err, "failed to GOAWAY: %v", err)
- return
- }
- default:
- sc.logf("Disconnection due to other error: %v", err)
- return
+ if settingsTimer.C != nil {
+ settingsTimer.Stop()
+ settingsTimer.C = nil
}
+ case <-settingsTimer.C:
+ sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
+ return
}
}
}
@@ -442,8 +409,26 @@
sc.writeQueue = append(sc.writeQueue, wm) // TODO: proper scheduler
}
+func (sc *serverConn) enqueueSettingsAck() {
+ sc.serveG.check()
+ // Fast path for common case:
+ if !sc.writingFrame {
+ sc.wantWriteFrameCh <- frameWriteMsg{write: (*serverConn).writeSettingsAck}
+ return
+ }
+ sc.needToSendSettingsAck = true
+}
+
func (sc *serverConn) scheduleFrameWrite() {
sc.serveG.check()
+ if sc.writingFrame {
+ panic("invariant")
+ }
+ if sc.needToSendSettingsAck {
+ sc.needToSendSettingsAck = false
+ sc.enqueueSettingsAck()
+ return
+ }
if len(sc.writeQueue) == 0 {
// TODO: flush Framer's underlying buffered writer, once that's added
return
@@ -455,6 +440,10 @@
copy(sc.writeQueue, sc.writeQueue[1:])
sc.writeQueue = sc.writeQueue[:len(sc.writeQueue)-1]
+ // TODO: if wm is a data frame, make sure it's not too big
+ // (because a SETTINGS frame changed our max frame size while
+ // a stream was open and writing) and cut it up into smaller
+ // bits.
sc.writingFrame = true
sc.writeFrameCh <- wm
}
@@ -483,9 +472,64 @@
return st.id
}
+// processFrameFromReader processes the serve loop's read from readFrameCh from the
+// frame-reading goroutine.
+// processFrameFromReader returns whether the connection should be kept open.
+func (sc *serverConn) processFrameFromReader(fg frameAndGate, fgValid bool) bool {
+ sc.serveG.check()
+ if !fgValid {
+ err := <-sc.readFrameErrCh
+ if err != io.EOF {
+ errstr := err.Error()
+ if !strings.Contains(errstr, "use of closed network connection") {
+ sc.logf("client %s stopped sending frames: %v", sc.conn.RemoteAddr(), errstr)
+ }
+ }
+ // TODO: could we also get into this state if the peer does a half close (e.g. CloseWrite)
+ // because they're done sending frames but they're still wanting our open replies?
+ // Investigate.
+ return false
+ }
+ f := fg.f
+ sc.vlogf("got %v: %#v", f.Header(), f)
+ err := sc.processFrame(f)
+ fg.g.Done() // unblock the readFrames goroutine
+ if err == nil {
+ return true
+ }
+
+ switch ev := err.(type) {
+ case StreamError:
+ if err := sc.resetStreamInLoop(ev); err != nil {
+ sc.logf("Error writing RSTSTream: %v", err)
+ return false
+ }
+ return true
+ case goAwayFlowError:
+ if err := sc.goAway(ErrCodeFlowControl); err != nil {
+ sc.condlogf(err, "failed to GOAWAY: %v", err)
+ return false
+ }
+ return true
+ case ConnectionError:
+ sc.logf("disconnecting; %v", ev)
+ default:
+ sc.logf("Disconnection due to other error: %v", err)
+ }
+ return false
+}
+
func (sc *serverConn) processFrame(f Frame) error {
sc.serveG.check()
+ // First frame received must be SETTINGS.
+ if !sc.sawFirstSettings {
+ if _, ok := f.(*SettingsFrame); !ok {
+ return ConnectionError(ErrCodeProtocol)
+ }
+ sc.sawFirstSettings = true
+ }
+
if s := sc.curHeaderStreamID(); s != 0 {
if cf, ok := f.(*ContinuationFrame); !ok {
return ConnectionError(ErrCodeProtocol)
@@ -567,7 +611,19 @@
func (sc *serverConn) processSettings(f *SettingsFrame) error {
sc.serveG.check()
- return f.ForeachSetting(sc.processSetting)
+ if f.IsAck() {
+ // TODO: do we need to do anything?
+ return nil
+ }
+ if err := f.ForeachSetting(sc.processSetting); err != nil {
+ return err
+ }
+ sc.enqueueSettingsAck()
+ return nil
+}
+
+func (sc *serverConn) writeSettingsAck(_ interface{}) error {
+ return sc.framer.WriteSettingsAck()
}
func (sc *serverConn) processSetting(s Setting) error {