Interrupt Request.Body.Read on RSTStream or connection close.
So we don't leak goroutines blocked forever in Read.
diff --git a/server.go b/server.go
index a612487..d0bc1d9 100644
--- a/server.go
+++ b/server.go
@@ -42,12 +42,6 @@
// be in-flight and then the frame scheduler in the serve goroutine
// will be responsible for splitting things.
-// TODO: test/handle a client sending a POST with potential data, get
-// stuck in the handler in a Read, then client sends RST_STREAM, and
-// we should verify the Read then unblocks, rather than being stuck
-// forever and leaking a goroutine. and it should return an error from
-// the Read.
-
// Server is an HTTP/2 server.
type Server struct {
// MaxStreams optionally ...
@@ -175,7 +169,7 @@
type stream struct {
id uint32
- state streamState // owned by serverConn's processing loop
+ state streamState // owned by serverConn's serve loop
flow *flow // limits writing from Handler to client
body *pipe // non-nil if expecting DATA frames
@@ -330,10 +324,19 @@
}
}
+func (sc *serverConn) stopServing() {
+ sc.serveG.check()
+ close(sc.writeFrameCh) // stop the writeFrames loop
+ err := errors.New("client disconnected")
+ for id := range sc.streams {
+ sc.closeStream(id, err)
+ }
+}
+
func (sc *serverConn) serve() {
sc.serveG.check()
defer sc.conn.Close()
- defer close(sc.doneServing)
+ defer sc.stopServing()
sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
@@ -344,9 +347,8 @@
return
}
- go sc.readFrames() // closed by defer sc.conn.Close above
- go sc.writeFrames()
- defer close(sc.writeFrameCh) // shuts down writeFrames loop
+ go sc.readFrames() // closed by defer sc.conn.Close above
+ go sc.writeFrames() // source closed in stopServing
settingsTimer := time.NewTimer(firstSettingsTimeout)
@@ -591,6 +593,8 @@
return sc.processPing(f)
case *DataFrame:
return sc.processData(f)
+ case *RSTStreamFrame:
+ return sc.processResetStream(f)
default:
log.Printf("Ignoring unknown frame %#v", f)
return nil
@@ -649,6 +653,26 @@
return nil
}
+func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
+ sc.serveG.check()
+ sc.closeStream(f.StreamID, StreamError{f.StreamID, f.ErrCode})
+ return nil
+}
+
+func (sc *serverConn) closeStream(streamID uint32, err error) {
+ sc.serveG.check()
+ st, ok := sc.streams[streamID]
+ if !ok {
+ return
+ }
+ st.state = stateClosed // kinda useless
+ delete(sc.streams, streamID)
+ st.flow.close()
+ if p := st.body; p != nil {
+ p.Close(err)
+ }
+}
+
func (sc *serverConn) processSettings(f *SettingsFrame) error {
sc.serveG.check()
if f.IsAck() {