Interrupt Request.Body.Read on RSTStream or connection close.

So we don't leak goroutines blocked forever in Read.
diff --git a/server.go b/server.go
index a612487..d0bc1d9 100644
--- a/server.go
+++ b/server.go
@@ -42,12 +42,6 @@
 // be in-flight and then the frame scheduler in the serve goroutine
 // will be responsible for splitting things.
 
-// TODO: test/handle a client sending a POST with potential data, get
-// stuck in the handler in a Read, then client sends RST_STREAM, and
-// we should verify the Read then unblocks, rather than being stuck
-// forever and leaking a goroutine. and it should return an error from
-// the Read.
-
 // Server is an HTTP/2 server.
 type Server struct {
 	// MaxStreams optionally ...
@@ -175,7 +169,7 @@
 
 type stream struct {
 	id    uint32
-	state streamState // owned by serverConn's processing loop
+	state streamState // owned by serverConn's serve loop
 	flow  *flow       // limits writing from Handler to client
 	body  *pipe       // non-nil if expecting DATA frames
 
@@ -330,10 +324,19 @@
 	}
 }
 
+func (sc *serverConn) stopServing() {
+	sc.serveG.check()
+	close(sc.writeFrameCh) // stop the writeFrames loop
+	err := errors.New("client disconnected")
+	for id := range sc.streams {
+		sc.closeStream(id, err)
+	}
+}
+
 func (sc *serverConn) serve() {
 	sc.serveG.check()
 	defer sc.conn.Close()
-	defer close(sc.doneServing)
+	defer sc.stopServing()
 
 	sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
 
@@ -344,9 +347,8 @@
 		return
 	}
 
-	go sc.readFrames() // closed by defer sc.conn.Close above
-	go sc.writeFrames()
-	defer close(sc.writeFrameCh) // shuts down writeFrames loop
+	go sc.readFrames()  // closed by defer sc.conn.Close above
+	go sc.writeFrames() // source closed in stopServing
 
 	settingsTimer := time.NewTimer(firstSettingsTimeout)
 
@@ -591,6 +593,8 @@
 		return sc.processPing(f)
 	case *DataFrame:
 		return sc.processData(f)
+	case *RSTStreamFrame:
+		return sc.processResetStream(f)
 	default:
 		log.Printf("Ignoring unknown frame %#v", f)
 		return nil
@@ -649,6 +653,26 @@
 	return nil
 }
 
+func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
+	sc.serveG.check()
+	sc.closeStream(f.StreamID, StreamError{f.StreamID, f.ErrCode})
+	return nil
+}
+
+func (sc *serverConn) closeStream(streamID uint32, err error) {
+	sc.serveG.check()
+	st, ok := sc.streams[streamID]
+	if !ok {
+		return
+	}
+	st.state = stateClosed // kinda useless
+	delete(sc.streams, streamID)
+	st.flow.close()
+	if p := st.body; p != nil {
+		p.Close(err)
+	}
+}
+
 func (sc *serverConn) processSettings(f *SettingsFrame) error {
 	sc.serveG.check()
 	if f.IsAck() {