http2: don't leaving hanging server goroutines after RST_STREAM from client
In general, clean up and simplify the handling of frame writing from
handler goroutines. Always select on streams closing, and don't try
to pass around and re-use channels. It was too confusing. Instead,
reuse channels in a very local manner that's easy to reason about.
Thanks to Github user @pabbott0 (who has signed the Google CLA) for
the initial bug report and test cases.
Fixes bradfitz/http2#45
Change-Id: Ib72a87cb6e33a4bb118ae23d765ba594e9182ade
Reviewed-on: https://go-review.googlesource.com/15820
Reviewed-by: Andrew Gerrand <adg@golang.org>
diff --git a/http2/server.go b/http2/server.go
index 01f8f7f..b2b3fd9 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -322,7 +322,7 @@
wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
wroteFrameCh chan struct{} // from writeFrameAsync -> serve, tickles more frame writes
bodyReadCh chan bodyReadMsg // from handlers -> serve
- testHookCh chan func() // code to run on the serve loop
+ testHookCh chan func(int) // code to run on the serve loop
flow flow // conn-wide (not stream-specific) outbound flow control
inflow flow // conn-wide inbound flow control
tlsState *tls.ConnectionState // shared by all handlers, like net/http
@@ -636,7 +636,9 @@
go sc.readFrames() // closed by defer sc.conn.Close above
settingsTimer := time.NewTimer(firstSettingsTimeout)
+ loopNum := 0
for {
+ loopNum++
select {
case wm := <-sc.wantWriteFrameCh:
sc.writeFrame(wm)
@@ -664,7 +666,7 @@
sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
return
case fn := <-sc.testHookCh:
- fn()
+ fn(loopNum)
}
}
}
@@ -697,19 +699,20 @@
}
}
+var errChanPool = sync.Pool{
+ New: func() interface{} { return make(chan error, 1) },
+}
+
// writeDataFromHandler writes the data described in req to stream.id.
//
-// The provided ch is used to avoid allocating new channels for each
-// write operation. It's expected that the caller reuses writeData and ch
-// over time.
-//
// The flow control currently happens in the Handler where it waits
// for 1 or more bytes to be available to then write here. So at this
// point we know that we have flow control. But this might have to
// change when priority is implemented, so the serve goroutine knows
// the total amount of bytes waiting to be sent and can can have more
// scheduling decisions available.
-func (sc *serverConn) writeDataFromHandler(stream *stream, writeData *writeData, ch chan error) error {
+func (sc *serverConn) writeDataFromHandler(stream *stream, writeData *writeData) error {
+ ch := errChanPool.Get().(chan error)
sc.writeFrameFromHandler(frameWriteMsg{
write: writeData,
stream: stream,
@@ -717,6 +720,7 @@
})
select {
case err := <-ch:
+ errChanPool.Put(ch)
return err
case <-sc.doneServing:
return errClientDisconnected
@@ -734,10 +738,22 @@
// goroutine, call writeFrame instead.
func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) {
sc.serveG.checkNotOn() // NOT
+ var scheduled bool
select {
case sc.wantWriteFrameCh <- wm:
+ scheduled = true
case <-sc.doneServing:
// Client has closed their connection to the server.
+ case <-wm.stream.cw:
+ // Stream closed.
+ }
+ // Don't block writers expecting a reply.
+ if !scheduled && wm.done != nil {
+ select {
+ case wm.done <- errStreamBroken:
+ default:
+ panic("expected buffered channel")
+ }
}
}
@@ -1435,7 +1451,6 @@
rws.stream = rp.stream
rws.req = req
rws.body = body
- rws.frameWriteCh = make(chan error, 1)
rw := &responseWriter{rws: rws}
return rw, req, nil
@@ -1460,7 +1475,7 @@
// called from handler goroutines.
// h may be nil.
-func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders, tempCh chan error) {
+func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) {
sc.serveG.checkNotOn() // NOT on
var errc chan error
if headerData.h != nil {
@@ -1468,7 +1483,7 @@
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
// mutates it.
- errc = tempCh
+ errc = errChanPool.Get().(chan error)
}
sc.writeFrameFromHandler(frameWriteMsg{
write: headerData,
@@ -1480,8 +1495,11 @@
case <-errc:
// Ignore. Just for synchronization.
// Any error will be handled in the writing goroutine.
+ errChanPool.Put(errc)
case <-sc.doneServing:
// Client has closed the connection.
+ case <-st.cw:
+ // Client did RST_STREAM, etc. (but conn still alive)
}
}
}
@@ -1629,7 +1647,6 @@
sentHeader bool // have we sent the header frame?
handlerDone bool // handler has finished
curWrite writeData
- frameWriteCh chan error // re-used whenever we need to block on a frame being written
closeNotifierMu sync.Mutex // guards closeNotifierCh
closeNotifierCh chan bool // nil until first used
@@ -1666,7 +1683,7 @@
endStream: endStream,
contentType: ctype,
contentLength: clen,
- }, rws.frameWriteCh)
+ })
if endStream {
return 0, nil
}
@@ -1678,7 +1695,7 @@
curWrite.streamID = rws.stream.id
curWrite.p = p
curWrite.endStream = rws.handlerDone
- if err := rws.conn.writeDataFromHandler(rws.stream, curWrite, rws.frameWriteCh); err != nil {
+ if err := rws.conn.writeDataFromHandler(rws.stream, curWrite); err != nil {
return 0, err
}
return len(p), nil