http2: don't leaving hanging server goroutines after RST_STREAM from client

In general, clean up and simplify the handling of frame writing from
handler goroutines.  Always select on streams closing, and don't try
to pass around and re-use channels. It was too confusing. Instead,
reuse channels in a very local manner that's easy to reason about.

Thanks to Github user @pabbott0 (who has signed the Google CLA) for
the initial bug report and test cases.

Fixes bradfitz/http2#45

Change-Id: Ib72a87cb6e33a4bb118ae23d765ba594e9182ade
Reviewed-on: https://go-review.googlesource.com/15820
Reviewed-by: Andrew Gerrand <adg@golang.org>
diff --git a/http2/server.go b/http2/server.go
index 01f8f7f..b2b3fd9 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -322,7 +322,7 @@
 	wantWriteFrameCh chan frameWriteMsg   // from handlers -> serve
 	wroteFrameCh     chan struct{}        // from writeFrameAsync -> serve, tickles more frame writes
 	bodyReadCh       chan bodyReadMsg     // from handlers -> serve
-	testHookCh       chan func()          // code to run on the serve loop
+	testHookCh       chan func(int)       // code to run on the serve loop
 	flow             flow                 // conn-wide (not stream-specific) outbound flow control
 	inflow           flow                 // conn-wide inbound flow control
 	tlsState         *tls.ConnectionState // shared by all handlers, like net/http
@@ -636,7 +636,9 @@
 	go sc.readFrames() // closed by defer sc.conn.Close above
 
 	settingsTimer := time.NewTimer(firstSettingsTimeout)
+	loopNum := 0
 	for {
+		loopNum++
 		select {
 		case wm := <-sc.wantWriteFrameCh:
 			sc.writeFrame(wm)
@@ -664,7 +666,7 @@
 			sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
 			return
 		case fn := <-sc.testHookCh:
-			fn()
+			fn(loopNum)
 		}
 	}
 }
@@ -697,19 +699,20 @@
 	}
 }
 
+var errChanPool = sync.Pool{
+	New: func() interface{} { return make(chan error, 1) },
+}
+
 // writeDataFromHandler writes the data described in req to stream.id.
 //
-// The provided ch is used to avoid allocating new channels for each
-// write operation. It's expected that the caller reuses writeData and ch
-// over time.
-//
 // The flow control currently happens in the Handler where it waits
 // for 1 or more bytes to be available to then write here.  So at this
 // point we know that we have flow control. But this might have to
 // change when priority is implemented, so the serve goroutine knows
 // the total amount of bytes waiting to be sent and can can have more
 // scheduling decisions available.
-func (sc *serverConn) writeDataFromHandler(stream *stream, writeData *writeData, ch chan error) error {
+func (sc *serverConn) writeDataFromHandler(stream *stream, writeData *writeData) error {
+	ch := errChanPool.Get().(chan error)
 	sc.writeFrameFromHandler(frameWriteMsg{
 		write:  writeData,
 		stream: stream,
@@ -717,6 +720,7 @@
 	})
 	select {
 	case err := <-ch:
+		errChanPool.Put(ch)
 		return err
 	case <-sc.doneServing:
 		return errClientDisconnected
@@ -734,10 +738,22 @@
 // goroutine, call writeFrame instead.
 func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) {
 	sc.serveG.checkNotOn() // NOT
+	var scheduled bool
 	select {
 	case sc.wantWriteFrameCh <- wm:
+		scheduled = true
 	case <-sc.doneServing:
 		// Client has closed their connection to the server.
+	case <-wm.stream.cw:
+		// Stream closed.
+	}
+	// Don't block writers expecting a reply.
+	if !scheduled && wm.done != nil {
+		select {
+		case wm.done <- errStreamBroken:
+		default:
+			panic("expected buffered channel")
+		}
 	}
 }
 
@@ -1435,7 +1451,6 @@
 	rws.stream = rp.stream
 	rws.req = req
 	rws.body = body
-	rws.frameWriteCh = make(chan error, 1)
 
 	rw := &responseWriter{rws: rws}
 	return rw, req, nil
@@ -1460,7 +1475,7 @@
 
 // called from handler goroutines.
 // h may be nil.
-func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders, tempCh chan error) {
+func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders) {
 	sc.serveG.checkNotOn() // NOT on
 	var errc chan error
 	if headerData.h != nil {
@@ -1468,7 +1483,7 @@
 		// waiting for this frame to be written, so an http.Flush mid-handler
 		// writes out the correct value of keys, before a handler later potentially
 		// mutates it.
-		errc = tempCh
+		errc = errChanPool.Get().(chan error)
 	}
 	sc.writeFrameFromHandler(frameWriteMsg{
 		write:  headerData,
@@ -1480,8 +1495,11 @@
 		case <-errc:
 			// Ignore. Just for synchronization.
 			// Any error will be handled in the writing goroutine.
+			errChanPool.Put(errc)
 		case <-sc.doneServing:
 			// Client has closed the connection.
+		case <-st.cw:
+			// Client did RST_STREAM, etc. (but conn still alive)
 		}
 	}
 }
@@ -1629,7 +1647,6 @@
 	sentHeader    bool        // have we sent the header frame?
 	handlerDone   bool        // handler has finished
 	curWrite      writeData
-	frameWriteCh  chan error // re-used whenever we need to block on a frame being written
 
 	closeNotifierMu sync.Mutex // guards closeNotifierCh
 	closeNotifierCh chan bool  // nil until first used
@@ -1666,7 +1683,7 @@
 			endStream:     endStream,
 			contentType:   ctype,
 			contentLength: clen,
-		}, rws.frameWriteCh)
+		})
 		if endStream {
 			return 0, nil
 		}
@@ -1678,7 +1695,7 @@
 	curWrite.streamID = rws.stream.id
 	curWrite.p = p
 	curWrite.endStream = rws.handlerDone
-	if err := rws.conn.writeDataFromHandler(rws.stream, curWrite, rws.frameWriteCh); err != nil {
+	if err := rws.conn.writeDataFromHandler(rws.stream, curWrite); err != nil {
 		return 0, err
 	}
 	return len(p), nil