Track the flow control of the peer explicitly.

Before we enforced flow control only because we gave them a 64k buffer
to write into and blew up if they went over. But in prep for larger,
non-default, and configurable initial window sizes, we need to start
counting.
diff --git a/server.go b/server.go
index ebfe400..7779d32 100644
--- a/server.go
+++ b/server.go
@@ -34,7 +34,7 @@
 	prefaceTimeout        = 10 * time.Second
 	firstSettingsTimeout  = 2 * time.Second // should be in-flight with preface anyway
 	handlerChunkWriteSize = 4 << 10
-	defaultMaxStreams     = 250
+	defaultMaxStreams     = 250 // TODO: make this 100 as the GFE seems to?
 )
 
 var (
@@ -207,6 +207,7 @@
 		pushEnabled:       true,
 	}
 	sc.flow.add(initialWindowSize)
+	sc.inflow.add(initialWindowSize)
 	sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
 	sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
 
@@ -317,7 +318,8 @@
 	wroteFrameCh     chan struct{}        // from writeFrameAsync -> serve, tickles more frame writes
 	bodyReadCh       chan bodyReadMsg     // from handlers -> serve
 	testHookCh       chan func()          // code to run on the serve loop
-	flow             flow                 // connection-wide (not stream-specific) flow control
+	flow             flow                 // conn-wide (not stream-specific) outbound flow control
+	inflow           flow                 // conn-wide inbound flow control
 	tlsState         *tls.ConnectionState // shared by all handlers, like net/http
 
 	// Everything following is owned by the serve loop; use serveG.check():
@@ -374,18 +376,19 @@
 type stream struct {
 	// immutable:
 	id   uint32
-	flow flow        // limits writing from Handler to client
 	body *pipe       // non-nil if expecting DATA frames
 	cw   closeWaiter // closed wait stream transitions to closed state
 
 	// owned by serverConn's serve loop:
+	bodyBytes     int64   // body bytes seen so far
+	declBodyBytes int64   // or -1 if undeclared
+	flow          flow    // limits writing from Handler to client
+	inflow        flow    // what the client is allowed to POST/etc to us
 	parent        *stream // or nil
 	weight        uint8
 	state         streamState
-	bodyBytes     int64 // body bytes seen so far
-	declBodyBytes int64 // or -1 if undeclared
-	sentReset     bool  // only true once detached from streams map
-	gotReset      bool  // only true once detacted from streams map
+	sentReset     bool // only true once detached from streams map
+	gotReset      bool // only true once detacted from streams map
 }
 
 func (sc *serverConn) Framer() *Framer  { return sc.framer }
@@ -651,7 +654,7 @@
 			errc <- nil
 		}
 	}()
-	timer := time.NewTimer(5 * time.Second) // TODO: configurable on *Server?
+	timer := time.NewTimer(prefaceTimeout) // TODO: configurable on *Server?
 	defer timer.Stop()
 	select {
 	case <-timer.C:
@@ -1145,13 +1148,11 @@
 		return StreamError{id, ErrCodeStreamClosed}
 	}
 	if len(data) > 0 {
-		// TODO: verify they're allowed to write with the flow
-		// control window we'd advertised to them. (currently
-		// this is fails elsewhere, in that the body buffer is
-		// always 65k, the default initial window size, but
-		// once that's fixed to grow and shrink on demand,
-		// we'll need to be stricter before that, or in the
-		// buffer code)
+		// Check whether the client has flow control quota.
+		if int(st.inflow.available()) < len(data) {
+			return StreamError{id, ErrCodeFlowControl}
+		}
+		st.inflow.take(int32(len(data)))
 		wrote, err := st.body.Write(data)
 		if err != nil {
 			return StreamError{id, ErrCodeStreamClosed}
@@ -1198,13 +1199,16 @@
 		id:    id,
 		state: stateOpen,
 	}
-	// connection-level flow control is shared by all streams.
-	st.flow.conn = &sc.flow
-	st.flow.add(sc.initialWindowSize)
-	st.cw.Init() // make Cond use its Mutex, without heap-promoting them separately
 	if f.StreamEnded() {
 		st.state = stateHalfClosedRemote
 	}
+	st.cw.Init()
+
+	st.flow.conn = &sc.flow // link to conn-level counter
+	st.flow.add(sc.initialWindowSize)
+	st.inflow.conn = &sc.inflow      // link to conn-level counter
+	st.inflow.add(initialWindowSize) // TODO: update this when we send a higher initial window size in the initial settings
+
 	sc.streams[id] = st
 	if f.HasPriority() {
 		sc.adjustStreamPriority(st.id, f.Priority)
@@ -1457,23 +1461,42 @@
 	sc.serveG.check()
 	// "The legal range for the increment to the flow control
 	// window is 1 to 2^31-1 (2,147,483,647) octets."
+	// A Go Read call on 64-bit machines could in theory read
+	// a larger Read than this. Very unlikely, but we handle it here
+	// rather than elsewhere for now.
+	const maxUint31 = 1<<31 - 1
+	for n >= maxUint31 {
+		sc.sendWindowUpdate32(st, maxUint31)
+		n -= maxUint31
+	}
+	sc.sendWindowUpdate32(st, int32(n))
+}
+
+// st may be nil for conn-level
+func (sc *serverConn) sendWindowUpdate32(st *stream, n int32) {
+	sc.serveG.check()
+	if n == 0 {
+		return
+	}
+	if n < 0 {
+		panic("negative update")
+	}
 	var streamID uint32
 	if st != nil {
 		streamID = st.id
 	}
-	const maxUint31 = 1<<31 - 1
-	for n >= maxUint31 {
-		sc.writeFrame(frameWriteMsg{
-			write:  writeWindowUpdate{streamID: streamID, n: maxUint31},
-			stream: st,
-		})
-		n -= maxUint31
+	sc.writeFrame(frameWriteMsg{
+		write:  writeWindowUpdate{streamID: streamID, n: uint32(n)},
+		stream: st,
+	})
+	var ok bool
+	if st == nil {
+		ok = sc.inflow.add(n)
+	} else {
+		ok = st.inflow.add(n)
 	}
-	if n > 0 {
-		sc.writeFrame(frameWriteMsg{
-			write:  writeWindowUpdate{streamID: streamID, n: uint32(n)},
-			stream: st,
-		})
+	if !ok {
+		panic("internal error; sent too many window updates without decrements?")
 	}
 }