blob: 921eb6cd2cb87d6d7f60878d090defdc74fff53e [file] [log] [blame]
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4// See https://code.google.com/p/go/source/browse/CONTRIBUTORS
5// Licensed under the same terms as Go itself:
6// https://code.google.com/p/go/source/browse/LICENSE
7
8package http2
9
10import (
Brad Fitzpatrick390047e2014-11-14 20:37:08 -080011 "bufio"
Brad Fitzpatrickb331b812014-11-13 11:51:54 -080012 "bytes"
13 "crypto/tls"
14 "errors"
15 "fmt"
16 "io"
17 "log"
18 "net"
19 "net/http"
20 "net/url"
21 "strconv"
22 "strings"
Brad Fitzpatrick729bd722014-11-13 14:09:36 -080023 "sync"
Brad Fitzpatrick95842032014-11-15 09:47:42 -080024 "time"
Brad Fitzpatrickb331b812014-11-13 11:51:54 -080025
26 "github.com/bradfitz/http2/hpack"
27)
28
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -080029const (
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -080030 prefaceTimeout = 5 * time.Second
31 firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway
32 handlerChunkWriteSize = 4 << 10
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -080033 defaultMaxStreams = 250
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -080034)
35
36var (
37 errClientDisconnected = errors.New("client disconnected")
38 errClosedBody = errors.New("body closed by handler")
Brad Fitzpatrick4c687c62014-11-23 00:07:43 -080039 errStreamBroken = errors.New("http2: stream broken")
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -080040)
41
42var responseWriterStatePool = sync.Pool{
43 New: func() interface{} {
44 rws := &responseWriterState{}
45 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
46 return rws
47 },
48}
49
50// Test hooks.
51var (
52 testHookOnConn func()
53 testHookGetServerConn func(*serverConn)
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -080054)
55
56// TODO: finish GOAWAY support. Consider each incoming frame type and
57// whether it should be ignored during a shutdown race.
58
59// TODO: (edge case?) if peer sends a SETTINGS frame with e.g. a
60// SETTINGS_MAX_FRAME_SIZE that's lower than what we had before,
61// before we ACK it we have to make sure all currently-active streams
62// know about that and don't have existing too-large frames in flight?
63// Perhaps the settings processing should just wait for new frame to
64// be in-flight and then the frame scheduler in the serve goroutine
65// will be responsible for splitting things.
Brad Fitzpatrickb331b812014-11-13 11:51:54 -080066
Brad Fitzpatrick9b41faf2014-11-19 10:45:13 -080067// TODO: send PING frames to idle clients and disconnect them if no
68// reply
69
Brad Fitzpatrick165c0982014-11-26 08:53:01 -080070// TODO: for bonus points: turn off the serve goroutine when idle, so
71// an idle conn only has the readFrames goroutine active. (which could
72// also be optimized probably to pin less memory in crypto/tls). This
73// would involve tracking when the serve goroutine is active (atomic
74// int32 read/CAS probably?) and starting it up when frames arrive,
75// and shutting it down when all handlers exit. the occasional PING
76// packets could use time.AfterFunc to call sc.wakeStartServeLoop()
77// (which is a no-op if already running) and then queue the PING write
78// as normal. The serve loop would then exit in most cases (if no
79// Handlers running) and not be woken up again until the PING packet
80// returns.
Brad Fitzpatrickfef99e62014-11-20 18:14:02 -080081
Brad Fitzpatrickb331b812014-11-13 11:51:54 -080082// Server is an HTTP/2 server.
83type Server struct {
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -080084 // MaxHandlers limits the number of http.Handler ServeHTTP goroutines
85 // which may run at a time over all connections.
86 // Negative or zero no limit.
87 // TODO: implement
88 MaxHandlers int
89
90 // MaxConcurrentStreams optionally specifies the number of
91 // concurrent streams that each client may have open at a
92 // time. This is unrelated to the number of http.Handler goroutines
93 // which may be active globally, which is MaxHandlers.
94 // If zero, MaxConcurrentStreams defaults to at least 100, per
95 // the HTTP/2 spec's recommendations.
96 MaxConcurrentStreams uint32
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -080097
98 // MaxReadFrameSize optionally specifies the largest frame
99 // this server is willing to read. A valid value is between
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -0800100 // 16k and 16M, inclusive. If zero or otherwise invalid, a
101 // default value is used.
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800102 MaxReadFrameSize uint32
103}
104
105func (s *Server) maxReadFrameSize() uint32 {
106 if v := s.MaxReadFrameSize; v >= minMaxFrameSize && v <= maxFrameSize {
107 return v
108 }
109 return defaultMaxReadFrameSize
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800110}
111
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -0800112func (s *Server) maxConcurrentStreams() uint32 {
113 if v := s.MaxConcurrentStreams; v > 0 {
114 return v
115 }
116 return defaultMaxStreams
117}
118
Brad Fitzpatrickb5469d22014-11-13 12:17:52 -0800119// ConfigureServer adds HTTP/2 support to a net/http Server.
120//
121// The configuration conf may be nil.
122//
123// ConfigureServer must be called before s begins serving.
124func ConfigureServer(s *http.Server, conf *Server) {
125 if conf == nil {
126 conf = new(Server)
127 }
128 if s.TLSConfig == nil {
129 s.TLSConfig = new(tls.Config)
130 }
131 haveNPN := false
132 for _, p := range s.TLSConfig.NextProtos {
Brad Fitzpatrick36d9a672014-11-26 07:40:15 -0800133 if p == NextProtoTLS {
Brad Fitzpatrickb5469d22014-11-13 12:17:52 -0800134 haveNPN = true
135 break
136 }
137 }
138 if !haveNPN {
Brad Fitzpatrick36d9a672014-11-26 07:40:15 -0800139 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, NextProtoTLS)
Brad Fitzpatrickb5469d22014-11-13 12:17:52 -0800140 }
141
142 if s.TLSNextProto == nil {
143 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
144 }
Brad Fitzpatrick36d9a672014-11-26 07:40:15 -0800145 s.TLSNextProto[NextProtoTLS] = func(hs *http.Server, c *tls.Conn, h http.Handler) {
Brad Fitzpatrickb5469d22014-11-13 12:17:52 -0800146 if testHookOnConn != nil {
147 testHookOnConn()
148 }
149 conf.handleConn(hs, c, h)
150 }
151}
152
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800153func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
154 sc := &serverConn{
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -0800155 srv: srv,
156 hs: hs,
157 conn: c,
158 bw: newBufferedWriter(c),
159 handler: h,
160 streams: make(map[uint32]*stream),
161 readFrameCh: make(chan frameAndGate),
162 readFrameErrCh: make(chan error, 1), // must be buffered for 1
163 wantWriteFrameCh: make(chan frameWriteMsg, 8),
Brad Fitzpatrickb0a06c82014-11-26 09:21:28 -0800164 wroteFrameCh: make(chan struct{}, 1), // buffered; one send in reading goroutine
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -0800165 flow: newFlow(initialWindowSize),
166 doneServing: make(chan struct{}),
167 advMaxStreams: srv.maxConcurrentStreams(),
168 maxWriteFrameSize: initialMaxFrameSize,
169 initialWindowSize: initialWindowSize,
170 headerTableSize: initialHeaderTableSize,
171 serveG: newGoroutineLock(),
172 pushEnabled: true,
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800173 }
174 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
175 sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
Brad Fitzpatrick5e4e2dc2014-11-19 16:49:43 -0800176
177 fr := NewFramer(sc.bw, c)
178 fr.SetMaxReadFrameSize(srv.maxReadFrameSize())
179 sc.framer = fr
180
Brad Fitzpatrick0db6d652014-11-15 15:49:19 -0800181 if hook := testHookGetServerConn; hook != nil {
182 hook(sc)
183 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800184 sc.serve()
185}
186
Brad Fitzpatrickda4be5d2014-11-15 09:57:03 -0800187// frameAndGates coordinates the readFrames and serve
188// goroutines. Because the Framer interface only permits the most
189// recently-read Frame from being accessed, the readFrames goroutine
190// blocks until it has a frame, passes it to serve, and then waits for
191// serve to be done with it before reading the next one.
192type frameAndGate struct {
193 f Frame
194 g gate
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800195}
196
197type serverConn struct {
198 // Immutable:
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800199 srv *Server
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800200 hs *http.Server
201 conn net.Conn
Brad Fitzpatrick5e4e2dc2014-11-19 16:49:43 -0800202 bw *bufferedWriter // writing to conn
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800203 handler http.Handler
204 framer *Framer
205 hpackDecoder *hpack.Decoder
Brad Fitzpatrickda4be5d2014-11-15 09:57:03 -0800206 doneServing chan struct{} // closed when serverConn.serve ends
207 readFrameCh chan frameAndGate // written by serverConn.readFrames
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800208 readFrameErrCh chan error
209 wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
Brad Fitzpatrick165c0982014-11-26 08:53:01 -0800210 wroteFrameCh chan struct{} // from writeFrameAsync -> serve, tickles more frame writes
Brad Fitzpatrick0db6d652014-11-15 15:49:19 -0800211 testHookCh chan func() // code to run on the serve loop
Brad Fitzpatrick165c0982014-11-26 08:53:01 -0800212 flow *flow // connection-wide (not stream-specific) flow control
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800213
214 // Everything following is owned by the serve loop; use serveG.check():
Brad Fitzpatrick165c0982014-11-26 08:53:01 -0800215 serveG goroutineLock // used to verify funcs are on serve()
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800216 pushEnabled bool
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800217 sawFirstSettings bool // got the initial SETTINGS frame after the preface
218 needToSendSettingsAck bool
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -0800219 clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit)
220 advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client
221 curOpenStreams uint32 // client's number of open streams
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800222 maxStreamID uint32 // max ever seen
223 streams map[uint32]*stream
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800224 maxWriteFrameSize uint32
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800225 initialWindowSize int32
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800226 headerTableSize uint32
227 maxHeaderListSize uint32 // zero means unknown (default)
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800228 canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800229 req requestParam // non-zero while reading request headers
Brad Fitzpatrickaa524f62014-11-25 10:31:37 -0800230 writingFrame bool // started write goroutine but haven't heard back on wroteFrameCh
231 needsFrameFlush bool // last frame write wasn't a flush
Brad Fitzpatricka13c4a42014-11-27 17:47:29 -0800232 writeSched writeScheduler
233 inGoAway bool // we've started to or sent GOAWAY
234 needToSendGoAway bool // we need to schedule a GOAWAY frame write
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800235 goAwayCode ErrCode
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800236 shutdownTimerCh <-chan time.Time // nil until used
237 shutdownTimer *time.Timer // nil until used
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800238
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800239 // Owned by the writeFrameAsync goroutine:
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800240 headerWriteBuf bytes.Buffer
241 hpackEncoder *hpack.Encoder
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800242}
243
244// requestParam is the state of the next request, initialized over
245// potentially several frames HEADERS + zero or more CONTINUATION
246// frames.
247type requestParam struct {
248 // stream is non-nil if we're reading (HEADER or CONTINUATION)
249 // frames for a request (but not DATA).
250 stream *stream
251 header http.Header
252 method, path string
253 scheme, authority string
254 sawRegularHeader bool // saw a non-pseudo header already
255 invalidHeader bool // an invalid header was seen
256}
257
Gabriel Aszalos1aa5b312014-11-19 16:56:22 +0000258// stream represents a stream. This is the minimal metadata needed by
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800259// the serve goroutine. Most of the actual stream state is owned by
260// the http.Handler's goroutine in the responseWriter. Because the
261// responseWriter's responseWriterState is recycled at the end of a
262// handler, this struct intentionally has no pointer to the
263// *responseWriter{,State} itself, as the Handler ending nils out the
264// responseWriter's state field.
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800265type stream struct {
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800266 // immutable:
267 id uint32
268 conn *serverConn
269 flow *flow // limits writing from Handler to client
270 body *pipe // non-nil if expecting DATA frames
271 cw closeWaiter // closed wait stream transitions to closed state
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800272
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800273 // owned by serverConn's serve loop:
274 state streamState
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800275 bodyBytes int64 // body bytes seen so far
276 declBodyBytes int64 // or -1 if undeclared
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800277 sentReset bool // only true once detached from streams map
278 gotReset bool // only true once detacted from streams map
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800279}
280
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800281func (sc *serverConn) Framer() *Framer { return sc.framer }
282func (sc *serverConn) CloseConn() error { return sc.conn.Close() }
283func (sc *serverConn) Flush() error { return sc.bw.Flush() }
284func (sc *serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) {
285 return sc.hpackEncoder, &sc.headerWriteBuf
286}
287
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800288func (sc *serverConn) state(streamID uint32) streamState {
289 sc.serveG.check()
290 // http://http2.github.io/http2-spec/#rfc.section.5.1
291 if st, ok := sc.streams[streamID]; ok {
292 return st.state
293 }
294 // "The first use of a new stream identifier implicitly closes all
295 // streams in the "idle" state that might have been initiated by
296 // that peer with a lower-valued stream identifier. For example, if
297 // a client sends a HEADERS frame on stream 7 without ever sending a
298 // frame on stream 5, then stream 5 transitions to the "closed"
299 // state when the first frame for stream 7 is sent or received."
300 if streamID <= sc.maxStreamID {
301 return stateClosed
302 }
303 return stateIdle
304}
305
306func (sc *serverConn) vlogf(format string, args ...interface{}) {
307 if VerboseLogs {
308 sc.logf(format, args...)
309 }
310}
311
312func (sc *serverConn) logf(format string, args ...interface{}) {
313 if lg := sc.hs.ErrorLog; lg != nil {
314 lg.Printf(format, args...)
315 } else {
316 log.Printf(format, args...)
317 }
318}
319
320func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
321 if err == nil {
322 return
323 }
324 str := err.Error()
Brad Fitzpatrick95842032014-11-15 09:47:42 -0800325 if err == io.EOF || strings.Contains(str, "use of closed network connection") {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800326 // Boring, expected errors.
327 sc.vlogf(format, args...)
328 } else {
329 sc.logf(format, args...)
330 }
331}
332
333func (sc *serverConn) onNewHeaderField(f hpack.HeaderField) {
334 sc.serveG.check()
335 switch {
336 case !validHeader(f.Name):
337 sc.req.invalidHeader = true
338 case strings.HasPrefix(f.Name, ":"):
339 if sc.req.sawRegularHeader {
340 sc.logf("pseudo-header after regular header")
341 sc.req.invalidHeader = true
342 return
343 }
344 var dst *string
345 switch f.Name {
346 case ":method":
347 dst = &sc.req.method
348 case ":path":
349 dst = &sc.req.path
350 case ":scheme":
351 dst = &sc.req.scheme
352 case ":authority":
353 dst = &sc.req.authority
354 default:
355 // 8.1.2.1 Pseudo-Header Fields
356 // "Endpoints MUST treat a request or response
357 // that contains undefined or invalid
358 // pseudo-header fields as malformed (Section
359 // 8.1.2.6)."
360 sc.logf("invalid pseudo-header %q", f.Name)
361 sc.req.invalidHeader = true
362 return
363 }
364 if *dst != "" {
365 sc.logf("duplicate pseudo-header %q sent", f.Name)
366 sc.req.invalidHeader = true
367 return
368 }
369 *dst = f.Value
370 case f.Name == "cookie":
371 sc.req.sawRegularHeader = true
372 if s, ok := sc.req.header["Cookie"]; ok && len(s) == 1 {
373 s[0] = s[0] + "; " + f.Value
374 } else {
375 sc.req.header.Add("Cookie", f.Value)
376 }
377 default:
378 sc.req.sawRegularHeader = true
379 sc.req.header.Add(sc.canonicalHeader(f.Name), f.Value)
380 }
381}
382
383func (sc *serverConn) canonicalHeader(v string) string {
384 sc.serveG.check()
Brad Fitzpatrick6520e262014-11-15 09:36:47 -0800385 cv, ok := commonCanonHeader[v]
386 if ok {
387 return cv
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800388 }
Brad Fitzpatrick6520e262014-11-15 09:36:47 -0800389 cv, ok = sc.canonHeader[v]
390 if ok {
391 return cv
392 }
393 if sc.canonHeader == nil {
394 sc.canonHeader = make(map[string]string)
395 }
396 cv = http.CanonicalHeaderKey(v)
397 sc.canonHeader[v] = cv
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800398 return cv
399}
400
401// readFrames is the loop that reads incoming frames.
402// It's run on its own goroutine.
403func (sc *serverConn) readFrames() {
Brad Fitzpatrickda4be5d2014-11-15 09:57:03 -0800404 g := make(gate, 1)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800405 for {
406 f, err := sc.framer.ReadFrame()
407 if err != nil {
Brad Fitzpatrick8ec321e2014-11-25 14:08:16 -0800408 sc.readFrameErrCh <- err
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800409 close(sc.readFrameCh)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800410 return
411 }
Brad Fitzpatrickda4be5d2014-11-15 09:57:03 -0800412 sc.readFrameCh <- frameAndGate{f, g}
Brad Fitzpatrickb0a06c82014-11-26 09:21:28 -0800413 // We can't read another frame until this one is
414 // processed, as the ReadFrame interface doesn't copy
415 // memory. The Frame accessor methods access the last
416 // frame's (shared) buffer. So we wait for the
417 // serve goroutine to tell us it's done:
Brad Fitzpatrickda4be5d2014-11-15 09:57:03 -0800418 g.Wait()
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800419 }
420}
421
Brad Fitzpatrickaa524f62014-11-25 10:31:37 -0800422// writeFrameAsync runs in its own goroutine and writes a single frame
423// and then reports when it's done.
424// At most one goroutine can be running writeFrameAsync at a time per
425// serverConn.
426func (sc *serverConn) writeFrameAsync(wm frameWriteMsg) {
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800427 err := wm.write(sc, wm.v)
Brad Fitzpatrickaa524f62014-11-25 10:31:37 -0800428 if ch := wm.done; ch != nil {
429 select {
430 case ch <- err:
431 default:
432 panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.v))
433 }
434 }
435 sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800436}
437
Brad Fitzpatrick9b41faf2014-11-19 10:45:13 -0800438func (sc *serverConn) closeAllStreamsOnConnClose() {
Brad Fitzpatrick6d3aa4f2014-11-15 14:55:57 -0800439 sc.serveG.check()
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800440 for _, st := range sc.streams {
441 sc.closeStream(st, errClientDisconnected)
Brad Fitzpatrick6d3aa4f2014-11-15 14:55:57 -0800442 }
443}
444
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800445func (sc *serverConn) stopShutdownTimer() {
446 sc.serveG.check()
447 if t := sc.shutdownTimer; t != nil {
448 t.Stop()
449 }
450}
451
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800452func (sc *serverConn) serve() {
453 sc.serveG.check()
454 defer sc.conn.Close()
Brad Fitzpatrick9b41faf2014-11-19 10:45:13 -0800455 defer sc.closeAllStreamsOnConnClose()
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800456 defer sc.stopShutdownTimer()
Brad Fitzpatrickaa524f62014-11-25 10:31:37 -0800457 defer close(sc.doneServing) // unblocks handlers trying to send
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800458
459 sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
460
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800461 sc.writeFrame(frameWriteMsg{
462 write: writeSettings,
463 v: []Setting{
464 {SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
465 {SettingMaxConcurrentStreams, sc.advMaxStreams},
466 /* TODO: more actual settings */
467 },
468 })
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800469
470 if err := sc.readPreface(); err != nil {
gbbrd90e0982014-11-20 22:19:20 +0000471 sc.condlogf(err, "error reading preface from client %v: %v", sc.conn.RemoteAddr(), err)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800472 return
473 }
474
Brad Fitzpatrickaa524f62014-11-25 10:31:37 -0800475 go sc.readFrames() // closed by defer sc.conn.Close above
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800476
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800477 settingsTimer := time.NewTimer(firstSettingsTimeout)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800478 for {
479 select {
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800480 case wm := <-sc.wantWriteFrameCh:
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800481 sc.writeFrame(wm)
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800482 case <-sc.wroteFrameCh:
483 sc.writingFrame = false
484 sc.scheduleFrameWrite()
Brad Fitzpatrickda4be5d2014-11-15 09:57:03 -0800485 case fg, ok := <-sc.readFrameCh:
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800486 if !ok {
487 sc.readFrameCh = nil
488 }
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800489 if !sc.processFrameFromReader(fg, ok) {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800490 return
491 }
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800492 if settingsTimer.C != nil {
493 settingsTimer.Stop()
494 settingsTimer.C = nil
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800495 }
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800496 case <-settingsTimer.C:
497 sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr())
498 return
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800499 case <-sc.shutdownTimerCh:
500 sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr())
501 return
Brad Fitzpatrick0db6d652014-11-15 15:49:19 -0800502 case fn := <-sc.testHookCh:
503 fn()
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800504 }
505 }
506}
507
Brad Fitzpatrick95842032014-11-15 09:47:42 -0800508// readPreface reads the ClientPreface greeting from the peer
509// or returns an error on timeout or an invalid greeting.
510func (sc *serverConn) readPreface() error {
511 errc := make(chan error, 1)
512 go func() {
513 // Read the client preface
514 buf := make([]byte, len(ClientPreface))
Brad Fitzpatrick95842032014-11-15 09:47:42 -0800515 if _, err := io.ReadFull(sc.conn, buf); err != nil {
516 errc <- err
517 } else if !bytes.Equal(buf, clientPreface) {
518 errc <- fmt.Errorf("bogus greeting %q", buf)
519 } else {
520 errc <- nil
521 }
522 }()
523 timer := time.NewTimer(5 * time.Second) // TODO: configurable on *Server?
524 defer timer.Stop()
525 select {
526 case <-timer.C:
527 return errors.New("timeout waiting for client preface")
528 case err := <-errc:
529 if err == nil {
530 sc.vlogf("client %v said hello", sc.conn.RemoteAddr())
531 }
532 return err
533 }
534}
535
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -0800536// writeData writes the data described in req to stream.id.
537//
538// The provided ch is used to avoid allocating new channels for each
539// write operation. It's expected that the caller reuses req and ch
540// over time.
Brad Fitzpatrick1797e702014-11-26 10:43:20 -0800541//
542// The flow control currently happens in the Handler where it waits
543// for 1 or more bytes to be available to then write here. So at this
544// point we know that we have flow control. But this might have to
545// change when priority is implemented, so the serve goroutine knows
546// the total amount of bytes waiting to be sent and can can have more
547// scheduling decisions available.
Brad Fitzpatrick3c8c6132014-11-20 18:34:52 -0800548func (sc *serverConn) writeData(stream *stream, data *dataWriteParams, ch chan error) error {
Brad Fitzpatrick1797e702014-11-26 10:43:20 -0800549 sc.serveG.checkNotOn() // NOT on; otherwise could deadlock in sc.writeFrame
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800550 sc.writeFrameFromHandler(frameWriteMsg{
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800551 write: writeDataFrame,
Brad Fitzpatrick3c8c6132014-11-20 18:34:52 -0800552 cost: uint32(len(data.p)),
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -0800553 stream: stream,
Brad Fitzpatrick3c8c6132014-11-20 18:34:52 -0800554 endStream: data.end,
555 v: data,
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -0800556 done: ch,
557 })
558 select {
559 case err := <-ch:
560 return err
561 case <-sc.doneServing:
562 return errClientDisconnected
563 }
564}
565
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800566// writeFrameFromHandler sends wm to sc.wantWriteFrameCh, but aborts
567// if the connection has gone away.
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -0800568//
569// This must not be run from the serve goroutine itself, else it might
570// deadlock writing to sc.wantWriteFrameCh (which is only mildly
571// buffered and is read by serve itself). If you're on the serve
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800572// goroutine, call writeFrame instead.
573func (sc *serverConn) writeFrameFromHandler(wm frameWriteMsg) {
Brad Fitzpatrick9b41faf2014-11-19 10:45:13 -0800574 sc.serveG.checkNotOn() // NOT
575 select {
576 case sc.wantWriteFrameCh <- wm:
577 case <-sc.doneServing:
578 // Client has closed their connection to the server.
579 }
Brad Fitzpatricka29a3232014-11-15 11:18:25 -0800580}
581
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800582// writeFrame either sends wm to the writeFrames goroutine, or
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -0800583// enqueues it for the future (with no pushback; the serve goroutine
584// never blocks!), for sending when the currently-being-written frame
585// is done writing.
586//
587// If you're not on the serve goroutine, use writeFrame instead.
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800588func (sc *serverConn) writeFrame(wm frameWriteMsg) {
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800589 sc.serveG.check()
590 // Fast path for common case:
591 if !sc.writingFrame {
Brad Fitzpatrick165c0982014-11-26 08:53:01 -0800592 sc.startFrameWrite(wm)
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800593 return
594 }
Brad Fitzpatricka13c4a42014-11-27 17:47:29 -0800595 sc.writeSched.add(wm)
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800596}
597
Brad Fitzpatrick165c0982014-11-26 08:53:01 -0800598// startFrameWrite starts a goroutine to write wm (in a separate
599// goroutine since that might block on the network), and updates the
600// serve goroutine's state about the world, updated from info in wm.
601func (sc *serverConn) startFrameWrite(wm frameWriteMsg) {
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -0800602 sc.serveG.check()
603 if sc.writingFrame {
Brad Fitzpatrick165c0982014-11-26 08:53:01 -0800604 panic("internal error: can only be writing one frame at a time")
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -0800605 }
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800606
607 st := wm.stream
608 if st != nil {
609 switch st.state {
610 case stateHalfClosedLocal:
611 panic("internal error: attempt to send frame on half-closed-local stream")
612 case stateClosed:
613 if st.sentReset || st.gotReset {
614 // Skip this frame. But fake the frame write to reschedule:
615 sc.wroteFrameCh <- struct{}{}
616 return
617 }
618 panic("internal error: attempt to send a frame on a closed stream")
619 }
620 }
621
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -0800622 sc.writingFrame = true
Brad Fitzpatrick5e4e2dc2014-11-19 16:49:43 -0800623 sc.needsFrameFlush = true
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -0800624 if wm.endStream {
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800625 if st == nil {
626 panic("nil stream with endStream set")
627 }
628 switch st.state {
629 case stateOpen:
630 st.state = stateHalfClosedLocal
631 case stateHalfClosedRemote:
632 sc.closeStream(st, nil)
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -0800633 }
634 }
Brad Fitzpatrickaa524f62014-11-25 10:31:37 -0800635 go sc.writeFrameAsync(wm)
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -0800636}
637
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800638// scheduleFrameWrite tickles the frame writing scheduler.
639//
640// If a frame is already being written, nothing happens. This will be called again
641// when the frame is done being written.
642//
643// If a frame isn't being written we need to send one, the best frame
644// to send is selected, preferring first things that aren't
645// stream-specific (e.g. ACKing settings), and then finding the
646// highest priority stream.
647//
648// If a frame isn't being written and there's nothing else to send, we
649// flush the write buffer.
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800650func (sc *serverConn) scheduleFrameWrite() {
651 sc.serveG.check()
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800652 if sc.writingFrame {
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800653 return
654 }
655 if sc.needToSendGoAway {
656 sc.needToSendGoAway = false
Brad Fitzpatrick165c0982014-11-26 08:53:01 -0800657 sc.startFrameWrite(frameWriteMsg{
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800658 write: writeGoAwayFrame,
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800659 v: &goAwayParams{
660 maxStreamID: sc.maxStreamID,
661 code: sc.goAwayCode,
662 },
663 })
664 return
665 }
Brad Fitzpatricka13c4a42014-11-27 17:47:29 -0800666 if sc.writeSched.empty() && sc.needsFrameFlush {
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800667 sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter})
Brad Fitzpatrick165c0982014-11-26 08:53:01 -0800668 sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
Brad Fitzpatrick5e4e2dc2014-11-19 16:49:43 -0800669 return
670 }
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800671 if sc.inGoAway {
672 // No more frames after we've sent GOAWAY.
673 return
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800674 }
675 if sc.needToSendSettingsAck {
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800676 sc.needToSendSettingsAck = false
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800677 sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck})
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800678 return
679 }
Brad Fitzpatricka13c4a42014-11-27 17:47:29 -0800680 if sc.writeSched.empty() {
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800681 return
682 }
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800683 // TODO: if wm is a data frame, make sure it's not too big
684 // (because a SETTINGS frame changed our max frame size while
685 // a stream was open and writing) and cut it up into smaller
686 // bits.
Brad Fitzpatricka13c4a42014-11-27 17:47:29 -0800687 sc.startFrameWrite(sc.writeSched.take())
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800688}
689
Brad Fitzpatrick55815ec2014-11-15 13:29:57 -0800690func (sc *serverConn) goAway(code ErrCode) {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800691 sc.serveG.check()
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800692 if sc.inGoAway {
Brad Fitzpatrick55815ec2014-11-15 13:29:57 -0800693 return
694 }
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800695 if code != ErrCodeNo {
696 sc.shutDownIn(250 * time.Millisecond)
697 } else {
698 // TODO: configurable
699 sc.shutDownIn(1 * time.Second)
700 }
701 sc.inGoAway = true
702 sc.needToSendGoAway = true
703 sc.goAwayCode = code
704 sc.scheduleFrameWrite()
705}
706
707func (sc *serverConn) shutDownIn(d time.Duration) {
708 sc.serveG.check()
709 sc.shutdownTimer = time.NewTimer(d)
710 sc.shutdownTimerCh = sc.shutdownTimer.C
Brad Fitzpatrick55815ec2014-11-15 13:29:57 -0800711}
712
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -0800713func (sc *serverConn) resetStream(se StreamError) {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800714 sc.serveG.check()
Brad Fitzpatrickb3e0a872014-11-23 21:15:59 -0800715 st, ok := sc.streams[se.StreamID]
716 if !ok {
717 panic("internal package error; resetStream called on non-existent stream")
718 }
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800719 sc.writeFrame(frameWriteMsg{
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800720 write: writeRSTStreamFrame,
Brad Fitzpatrick55815ec2014-11-15 13:29:57 -0800721 v: &se,
722 })
Brad Fitzpatrickb3e0a872014-11-23 21:15:59 -0800723 st.sentReset = true
724 sc.closeStream(st, se)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800725}
726
Brad Fitzpatrickb0a06c82014-11-26 09:21:28 -0800727// curHeaderStreamID returns the stream ID of the header block we're
728// currently in the middle of reading. If this returns non-zero, the
729// next frame must be a CONTINUATION with this stream id.
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800730func (sc *serverConn) curHeaderStreamID() uint32 {
731 sc.serveG.check()
732 st := sc.req.stream
733 if st == nil {
734 return 0
735 }
736 return st.id
737}
738
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800739// processFrameFromReader processes the serve loop's read from readFrameCh from the
740// frame-reading goroutine.
741// processFrameFromReader returns whether the connection should be kept open.
742func (sc *serverConn) processFrameFromReader(fg frameAndGate, fgValid bool) bool {
743 sc.serveG.check()
Brad Fitzpatrick8ec321e2014-11-25 14:08:16 -0800744 var clientGone bool
745 var err error
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800746 if !fgValid {
Brad Fitzpatrick8ec321e2014-11-25 14:08:16 -0800747 err = <-sc.readFrameErrCh
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800748 if err == ErrFrameTooLarge {
749 sc.goAway(ErrCodeFrameSize)
750 return true // goAway will close the loop
751 }
Brad Fitzpatrick8ec321e2014-11-25 14:08:16 -0800752 clientGone = err == io.EOF || strings.Contains(err.Error(), "use of closed network connection")
753 if clientGone {
754 // TODO: could we also get into this state if
755 // the peer does a half close
756 // (e.g. CloseWrite) because they're done
757 // sending frames but they're still wanting
758 // our open replies? Investigate.
759 return false
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800760 }
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800761 }
Brad Fitzpatrick8ec321e2014-11-25 14:08:16 -0800762
763 if fgValid {
764 f := fg.f
765 sc.vlogf("got %v: %#v", f.Header(), f)
766 err = sc.processFrame(f)
767 fg.g.Done() // unblock the readFrames goroutine
768 if err == nil {
769 return true
770 }
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800771 }
772
773 switch ev := err.(type) {
774 case StreamError:
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -0800775 sc.resetStream(ev)
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800776 return true
777 case goAwayFlowError:
Brad Fitzpatrick55815ec2014-11-15 13:29:57 -0800778 sc.goAway(ErrCodeFlowControl)
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800779 return true
780 case ConnectionError:
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800781 sc.logf("%v: %v", sc.conn.RemoteAddr(), ev)
782 sc.goAway(ErrCode(ev))
783 return true // goAway will handle shutdown
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800784 default:
Brad Fitzpatrick8ec321e2014-11-25 14:08:16 -0800785 if !fgValid {
786 sc.logf("disconnecting; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err)
787 } else {
788 sc.logf("disconnection due to other error: %v", err)
789 }
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800790 }
791 return false
792}
793
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800794func (sc *serverConn) processFrame(f Frame) error {
795 sc.serveG.check()
796
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800797 // First frame received must be SETTINGS.
798 if !sc.sawFirstSettings {
799 if _, ok := f.(*SettingsFrame); !ok {
800 return ConnectionError(ErrCodeProtocol)
801 }
802 sc.sawFirstSettings = true
803 }
804
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800805 if s := sc.curHeaderStreamID(); s != 0 {
806 if cf, ok := f.(*ContinuationFrame); !ok {
807 return ConnectionError(ErrCodeProtocol)
808 } else if cf.Header().StreamID != s {
809 return ConnectionError(ErrCodeProtocol)
810 }
811 }
812
813 switch f := f.(type) {
814 case *SettingsFrame:
815 return sc.processSettings(f)
816 case *HeadersFrame:
817 return sc.processHeaders(f)
818 case *ContinuationFrame:
819 return sc.processContinuation(f)
820 case *WindowUpdateFrame:
821 return sc.processWindowUpdate(f)
822 case *PingFrame:
823 return sc.processPing(f)
824 case *DataFrame:
825 return sc.processData(f)
Brad Fitzpatrick6d3aa4f2014-11-15 14:55:57 -0800826 case *RSTStreamFrame:
827 return sc.processResetStream(f)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800828 default:
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -0800829 log.Printf("Ignoring frame: %v", f.Header())
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800830 return nil
831 }
832}
833
834func (sc *serverConn) processPing(f *PingFrame) error {
835 sc.serveG.check()
836 if f.Flags.Has(FlagSettingsAck) {
837 // 6.7 PING: " An endpoint MUST NOT respond to PING frames
838 // containing this flag."
839 return nil
840 }
841 if f.StreamID != 0 {
842 // "PING frames are not associated with any individual
843 // stream. If a PING frame is received with a stream
844 // identifier field value other than 0x0, the recipient MUST
845 // respond with a connection error (Section 5.4.1) of type
846 // PROTOCOL_ERROR."
847 return ConnectionError(ErrCodeProtocol)
848 }
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800849 sc.writeFrame(frameWriteMsg{
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -0800850 write: writePingAck,
Brad Fitzpatrick9e0eccc2014-11-15 09:14:49 -0800851 v: f,
Brad Fitzpatricka29a3232014-11-15 11:18:25 -0800852 })
Brad Fitzpatrick9e0eccc2014-11-15 09:14:49 -0800853 return nil
854}
855
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800856func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
857 sc.serveG.check()
858 switch {
859 case f.StreamID != 0: // stream-level flow control
860 st := sc.streams[f.StreamID]
861 if st == nil {
862 // "WINDOW_UPDATE can be sent by a peer that has sent a
863 // frame bearing the END_STREAM flag. This means that a
864 // receiver could receive a WINDOW_UPDATE frame on a "half
865 // closed (remote)" or "closed" stream. A receiver MUST
866 // NOT treat this as an error, see Section 5.1."
867 return nil
868 }
869 if !st.flow.add(int32(f.Increment)) {
870 return StreamError{f.StreamID, ErrCodeFlowControl}
871 }
872 default: // connection-level flow control
873 if !sc.flow.add(int32(f.Increment)) {
874 return goAwayFlowError{}
875 }
876 }
877 return nil
878}
879
Brad Fitzpatrick6d3aa4f2014-11-15 14:55:57 -0800880func (sc *serverConn) processResetStream(f *RSTStreamFrame) error {
881 sc.serveG.check()
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800882 if sc.state(f.StreamID) == stateIdle {
883 // 6.4 "RST_STREAM frames MUST NOT be sent for a
884 // stream in the "idle" state. If a RST_STREAM frame
885 // identifying an idle stream is received, the
886 // recipient MUST treat this as a connection error
887 // (Section 5.4.1) of type PROTOCOL_ERROR.
888 return ConnectionError(ErrCodeProtocol)
889 }
890 st, ok := sc.streams[f.StreamID]
891 if ok {
892 st.gotReset = true
893 sc.closeStream(st, StreamError{f.StreamID, f.ErrCode})
894 }
Brad Fitzpatrick6d3aa4f2014-11-15 14:55:57 -0800895 return nil
896}
897
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800898func (sc *serverConn) closeStream(st *stream, err error) {
Brad Fitzpatrick6d3aa4f2014-11-15 14:55:57 -0800899 sc.serveG.check()
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800900 if st.state == stateIdle || st.state == stateClosed {
901 panic("invariant")
Brad Fitzpatrick6d3aa4f2014-11-15 14:55:57 -0800902 }
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800903 st.state = stateClosed
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -0800904 sc.curOpenStreams--
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800905 delete(sc.streams, st.id)
Brad Fitzpatrick6d3aa4f2014-11-15 14:55:57 -0800906 st.flow.close()
907 if p := st.body; p != nil {
908 p.Close(err)
909 }
Brad Fitzpatrickbd391962014-11-17 18:58:58 -0800910 st.cw.Close() // signals Handler's CloseNotifier goroutine (if any) to send
Brad Fitzpatrick6d3aa4f2014-11-15 14:55:57 -0800911}
912
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800913func (sc *serverConn) processSettings(f *SettingsFrame) error {
914 sc.serveG.check()
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800915 if f.IsAck() {
916 // TODO: do we need to do anything?
Brad Fitzpatrickb0a06c82014-11-26 09:21:28 -0800917 // We might want to keep track of which settings we've sent
918 // vs which settings the client has ACK'd, so we know when to be
919 // strict. Or at least keep track of the count of
920 // our SETTINGS send count vs their ACK count. If they're equal,
921 // then we both have the same view of the world and we can be
922 // stricter in some cases. But currently we don't send SETTINGS
923 // at runtime other than the initial SETTINGS.
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800924 return nil
925 }
926 if err := f.ForeachSetting(sc.processSetting); err != nil {
927 return err
928 }
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -0800929 sc.needToSendSettingsAck = true
930 sc.scheduleFrameWrite()
Brad Fitzpatrickd07a0e42014-11-15 10:47:12 -0800931 return nil
932}
933
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800934func (sc *serverConn) processSetting(s Setting) error {
935 sc.serveG.check()
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800936 if err := s.Valid(); err != nil {
937 return err
938 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800939 sc.vlogf("processing setting %v", s)
940 switch s.ID {
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800941 case SettingHeaderTableSize:
942 sc.headerTableSize = s.Val
Tatsuhiro Tsujikawac7d67a52014-11-20 01:01:39 +0900943 sc.hpackEncoder.SetMaxDynamicTableSize(s.Val)
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800944 case SettingEnablePush:
945 sc.pushEnabled = s.Val != 0
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800946 case SettingMaxConcurrentStreams:
Brad Fitzpatrick6ec17312014-11-23 10:07:07 -0800947 sc.clientMaxStreams = s.Val
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800948 case SettingInitialWindowSize:
949 return sc.processSettingInitialWindowSize(s.Val)
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800950 case SettingMaxFrameSize:
951 sc.maxWriteFrameSize = s.Val
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800952 case SettingMaxHeaderListSize:
953 sc.maxHeaderListSize = s.Val
gbbr0b3b5742014-11-23 00:44:48 +0000954 default:
955 // Unknown setting: "An endpoint that receives a SETTINGS
956 // frame with any unknown or unsupported identifier MUST
957 // ignore that setting."
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800958 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800959 return nil
960}
961
962func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
963 sc.serveG.check()
Brad Fitzpatrickbc00c572014-11-17 12:28:01 -0800964 // Note: val already validated to be within range by
965 // processSetting's Valid call.
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800966
967 // "A SETTINGS frame can alter the initial flow control window
968 // size for all current streams. When the value of
969 // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
970 // adjust the size of all stream flow control windows that it
971 // maintains by the difference between the new value and the
972 // old value."
973 old := sc.initialWindowSize
974 sc.initialWindowSize = int32(val)
975 growth := sc.initialWindowSize - old // may be negative
976 for _, st := range sc.streams {
977 if !st.flow.add(growth) {
978 // 6.9.2 Initial Flow Control Window Size
979 // "An endpoint MUST treat a change to
980 // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
981 // control window to exceed the maximum size as a
982 // connection error (Section 5.4.1) of type
983 // FLOW_CONTROL_ERROR."
984 return ConnectionError(ErrCodeFlowControl)
985 }
986 }
987 return nil
988}
989
990func (sc *serverConn) processData(f *DataFrame) error {
991 sc.serveG.check()
992 // "If a DATA frame is received whose stream is not in "open"
993 // or "half closed (local)" state, the recipient MUST respond
994 // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
995 id := f.Header().StreamID
996 st, ok := sc.streams[id]
997 if !ok || (st.state != stateOpen && st.state != stateHalfClosedLocal) {
998 return StreamError{id, ErrCodeStreamClosed}
999 }
1000 if st.body == nil {
Brad Fitzpatrickb0a06c82014-11-26 09:21:28 -08001001 panic("internal error: should have a body in this state")
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001002 }
1003 data := f.Data()
1004
1005 // Sender sending more than they'd declared?
1006 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
gbbrd90e0982014-11-20 22:19:20 +00001007 st.body.Close(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001008 return StreamError{id, ErrCodeStreamClosed}
1009 }
1010 if len(data) > 0 {
1011 // TODO: verify they're allowed to write with the flow control
1012 // window we'd advertised to them.
Brad Fitzpatrick0218ba62014-11-26 09:36:05 -08001013 wrote, err := st.body.Write(data)
1014 if err != nil {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001015 return StreamError{id, ErrCodeStreamClosed}
1016 }
Brad Fitzpatrick0218ba62014-11-26 09:36:05 -08001017 if wrote != len(data) {
1018 panic("internal error: bad Writer")
1019 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001020 st.bodyBytes += int64(len(data))
1021 }
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001022 if f.StreamEnded() {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001023 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
gbbrd90e0982014-11-20 22:19:20 +00001024 st.body.Close(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001025 st.declBodyBytes, st.bodyBytes))
1026 } else {
1027 st.body.Close(io.EOF)
1028 }
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001029 switch st.state {
1030 case stateOpen:
1031 st.state = stateHalfClosedRemote
1032 case stateHalfClosedLocal:
1033 st.state = stateClosed
1034 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001035 }
1036 return nil
1037}
1038
1039func (sc *serverConn) processHeaders(f *HeadersFrame) error {
1040 sc.serveG.check()
1041 id := f.Header().StreamID
Brad Fitzpatrick21896bb2014-11-19 16:05:21 -08001042 if sc.inGoAway {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001043 // Ignore.
1044 return nil
1045 }
1046 // http://http2.github.io/http2-spec/#rfc.section.5.1.1
1047 if id%2 != 1 || id <= sc.maxStreamID || sc.req.stream != nil {
1048 // Streams initiated by a client MUST use odd-numbered
1049 // stream identifiers. [...] The identifier of a newly
1050 // established stream MUST be numerically greater than all
1051 // streams that the initiating endpoint has opened or
1052 // reserved. [...] An endpoint that receives an unexpected
1053 // stream identifier MUST respond with a connection error
1054 // (Section 5.4.1) of type PROTOCOL_ERROR.
1055 return ConnectionError(ErrCodeProtocol)
1056 }
1057 if id > sc.maxStreamID {
1058 sc.maxStreamID = id
1059 }
1060 st := &stream{
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001061 conn: sc,
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001062 id: id,
1063 state: stateOpen,
1064 flow: newFlow(sc.initialWindowSize),
1065 }
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001066 st.cw.Init() // make Cond use its Mutex, without heap-promoting them separately
1067 if f.StreamEnded() {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001068 st.state = stateHalfClosedRemote
1069 }
1070 sc.streams[id] = st
Brad Fitzpatrickb3e0a872014-11-23 21:15:59 -08001071 sc.curOpenStreams++
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001072 sc.req = requestParam{
1073 stream: st,
1074 header: make(http.Header),
1075 }
1076 return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded())
1077}
1078
1079func (sc *serverConn) processContinuation(f *ContinuationFrame) error {
1080 sc.serveG.check()
1081 st := sc.streams[f.Header().StreamID]
1082 if st == nil || sc.curHeaderStreamID() != st.id {
1083 return ConnectionError(ErrCodeProtocol)
1084 }
1085 return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded())
1086}
1087
1088func (sc *serverConn) processHeaderBlockFragment(st *stream, frag []byte, end bool) error {
1089 sc.serveG.check()
1090 if _, err := sc.hpackDecoder.Write(frag); err != nil {
1091 // TODO: convert to stream error I assume?
1092 return err
1093 }
1094 if !end {
1095 return nil
1096 }
1097 if err := sc.hpackDecoder.Close(); err != nil {
1098 // TODO: convert to stream error I assume?
1099 return err
1100 }
Brad Fitzpatrickb3e0a872014-11-23 21:15:59 -08001101 defer sc.resetPendingRequest()
1102 if sc.curOpenStreams > sc.advMaxStreams {
1103 // Too many open streams.
1104 // TODO: which error code here? Using ErrCodeProtocol for now.
1105 // https://github.com/http2/http2-spec/issues/649
1106 return StreamError{st.id, ErrCodeProtocol}
1107 }
1108
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001109 rw, req, err := sc.newWriterAndRequest()
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001110 if err != nil {
1111 return err
1112 }
1113 st.body = req.Body.(*requestBody).pipe // may be nil
1114 st.declBodyBytes = req.ContentLength
1115 go sc.runHandler(rw, req)
1116 return nil
1117}
1118
Brad Fitzpatrickb3e0a872014-11-23 21:15:59 -08001119// resetPendingRequest zeros out all state related to a HEADERS frame
1120// and its zero or more CONTINUATION frames sent to start a new
1121// request.
1122func (sc *serverConn) resetPendingRequest() {
1123 sc.serveG.check()
1124 sc.req = requestParam{}
1125}
1126
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001127func (sc *serverConn) newWriterAndRequest() (*responseWriter, *http.Request, error) {
1128 sc.serveG.check()
1129 rp := &sc.req
1130 if rp.invalidHeader || rp.method == "" || rp.path == "" ||
1131 (rp.scheme != "https" && rp.scheme != "http") {
1132 // See 8.1.2.6 Malformed Requests and Responses:
1133 //
1134 // Malformed requests or responses that are detected
1135 // MUST be treated as a stream error (Section 5.4.2)
1136 // of type PROTOCOL_ERROR."
1137 //
1138 // 8.1.2.3 Request Pseudo-Header Fields
1139 // "All HTTP/2 requests MUST include exactly one valid
1140 // value for the :method, :scheme, and :path
1141 // pseudo-header fields"
1142 return nil, nil, StreamError{rp.stream.id, ErrCodeProtocol}
1143 }
1144 var tlsState *tls.ConnectionState // make this non-nil if https
1145 if rp.scheme == "https" {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001146 tlsState = &tls.ConnectionState{}
Brad Fitzpatrick7482ed02014-11-18 15:45:54 -08001147 if tc, ok := sc.conn.(*tls.Conn); ok {
1148 *tlsState = tc.ConnectionState()
1149 if tlsState.Version < tls.VersionTLS12 {
1150 // 9.2 Use of TLS Features
1151 // An implementation of HTTP/2 over TLS MUST use TLS
1152 // 1.2 or higher with the restrictions on feature set
1153 // and cipher suite described in this section. Due to
1154 // implementation limitations, it might not be
1155 // possible to fail TLS negotiation. An endpoint MUST
1156 // immediately terminate an HTTP/2 connection that
1157 // does not meet the TLS requirements described in
1158 // this section with a connection error (Section
1159 // 5.4.1) of type INADEQUATE_SECURITY.
1160 return nil, nil, ConnectionError(ErrCodeInadequateSecurity)
1161 }
1162 // TODO: verify cipher suites. (9.2.1, 9.2.2)
1163 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001164 }
1165 authority := rp.authority
1166 if authority == "" {
1167 authority = rp.header.Get("Host")
1168 }
Brad Fitzpatrick9d63ade2014-11-15 12:02:43 -08001169 needsContinue := rp.header.Get("Expect") == "100-continue"
1170 if needsContinue {
1171 rp.header.Del("Expect")
1172 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001173 bodyOpen := rp.stream.state == stateOpen
1174 body := &requestBody{
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001175 stream: rp.stream,
Brad Fitzpatrick9d63ade2014-11-15 12:02:43 -08001176 needsContinue: needsContinue,
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001177 }
Brad Fitzpatrickad4757f2014-11-26 09:37:40 -08001178 // TODO: handle asterisk '*' requests + test
Brad Fitzpatrickff0471b2014-11-14 21:55:14 -08001179 url, err := url.ParseRequestURI(rp.path)
1180 if err != nil {
1181 // TODO: find the right error code?
1182 return nil, nil, StreamError{rp.stream.id, ErrCodeProtocol}
1183 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001184 req := &http.Request{
1185 Method: rp.method,
Brad Fitzpatrickff0471b2014-11-14 21:55:14 -08001186 URL: url,
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001187 RemoteAddr: sc.conn.RemoteAddr().String(),
1188 Header: rp.header,
1189 RequestURI: rp.path,
1190 Proto: "HTTP/2.0",
1191 ProtoMajor: 2,
1192 ProtoMinor: 0,
1193 TLS: tlsState,
1194 Host: authority,
1195 Body: body,
1196 }
1197 if bodyOpen {
1198 body.pipe = &pipe{
1199 b: buffer{buf: make([]byte, 65536)}, // TODO: share/remove
1200 }
1201 body.pipe.c.L = &body.pipe.m
1202
1203 if vv, ok := rp.header["Content-Length"]; ok {
1204 req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
1205 } else {
1206 req.ContentLength = -1
1207 }
1208 }
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001209
1210 rws := responseWriterStatePool.Get().(*responseWriterState)
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001211 bwSave := rws.bw
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001212 *rws = responseWriterState{} // zero all the fields
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001213 rws.bw = bwSave
1214 rws.bw.Reset(chunkWriter{rws})
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001215 rws.stream = rp.stream
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001216 rws.req = req
1217 rws.body = body
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -08001218 rws.frameWriteCh = make(chan error, 1)
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001219
1220 rw := &responseWriter{rws: rws}
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001221 return rw, req, nil
1222}
1223
1224// Run on its own goroutine.
1225func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request) {
1226 defer rw.handlerDone()
1227 // TODO: catch panics like net/http.Server
1228 sc.handler.ServeHTTP(rw, req)
1229}
1230
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001231// headerWriteReq is a request to write an HTTP response header from a server Handler.
1232type headerWriteReq struct {
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001233 stream *stream
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001234 httpResCode int
1235 h http.Header // may be nil
1236 endStream bool
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001237
1238 contentType string
1239 contentLength string
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001240}
1241
1242// called from handler goroutines.
1243// h may be nil.
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -08001244func (sc *serverConn) writeHeaders(req headerWriteReq, tempCh chan error) {
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001245 sc.serveG.checkNotOn() // NOT on
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001246 var errc chan error
1247 if req.h != nil {
1248 // If there's a header map (which we don't own), so we have to block on
1249 // waiting for this frame to be written, so an http.Flush mid-handler
1250 // writes out the correct value of keys, before a handler later potentially
1251 // mutates it.
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -08001252 errc = tempCh
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001253 }
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -08001254 sc.writeFrameFromHandler(frameWriteMsg{
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -08001255 write: writeHeadersFrame,
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001256 v: req,
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001257 stream: req.stream,
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001258 done: errc,
1259 endStream: req.endStream,
Brad Fitzpatricka29a3232014-11-15 11:18:25 -08001260 })
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001261 if errc != nil {
Brad Fitzpatrick9b41faf2014-11-19 10:45:13 -08001262 select {
1263 case <-errc:
1264 // Ignore. Just for synchronization.
1265 // Any error will be handled in the writing goroutine.
1266 case <-sc.doneServing:
1267 // Client has closed the connection.
1268 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001269 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001270}
1271
Brad Fitzpatrick9d63ade2014-11-15 12:02:43 -08001272// called from handler goroutines.
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001273func (sc *serverConn) write100ContinueHeaders(st *stream) {
1274 sc.serveG.checkNotOn() // NOT
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -08001275 sc.writeFrameFromHandler(frameWriteMsg{
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -08001276 write: write100ContinueHeadersFrame,
1277 v: st,
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001278 stream: st,
Brad Fitzpatrick9d63ade2014-11-15 12:02:43 -08001279 })
1280}
1281
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001282// called from handler goroutines
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001283func (sc *serverConn) sendWindowUpdate(st *stream, n int) {
Brad Fitzpatrickc68dd6d2014-11-20 18:27:29 -08001284 sc.serveG.checkNotOn() // NOT
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001285 if st == nil {
1286 panic("no stream")
1287 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001288 const maxUint32 = 2147483647
1289 for n >= maxUint32 {
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -08001290 sc.writeFrameFromHandler(frameWriteMsg{
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -08001291 write: writeWindowUpdate,
1292 v: windowUpdateReq{streamID: st.id, n: maxUint32},
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001293 stream: st,
Brad Fitzpatricka29a3232014-11-15 11:18:25 -08001294 })
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001295 n -= maxUint32
1296 }
1297 if n > 0 {
Brad Fitzpatrick4e3c9222014-11-22 17:35:09 -08001298 sc.writeFrameFromHandler(frameWriteMsg{
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -08001299 write: writeWindowUpdate,
1300 v: windowUpdateReq{streamID: st.id, n: uint32(n)},
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001301 stream: st,
Brad Fitzpatricka29a3232014-11-15 11:18:25 -08001302 })
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001303 }
1304}
1305
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001306type requestBody struct {
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001307 stream *stream
Brad Fitzpatrick9d63ade2014-11-15 12:02:43 -08001308 closed bool
1309 pipe *pipe // non-nil if we have a HTTP entity message body
1310 needsContinue bool // need to send a 100-continue
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001311}
1312
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001313func (b *requestBody) Close() error {
1314 if b.pipe != nil {
1315 b.pipe.Close(errClosedBody)
1316 }
1317 b.closed = true
1318 return nil
1319}
1320
1321func (b *requestBody) Read(p []byte) (n int, err error) {
Brad Fitzpatrick9d63ade2014-11-15 12:02:43 -08001322 if b.needsContinue {
1323 b.needsContinue = false
Brad Fitzpatrickc68dd6d2014-11-20 18:27:29 -08001324 b.stream.conn.write100ContinueHeaders(b.stream)
Brad Fitzpatrick9d63ade2014-11-15 12:02:43 -08001325 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001326 if b.pipe == nil {
1327 return 0, io.EOF
1328 }
1329 n, err = b.pipe.Read(p)
1330 if n > 0 {
Brad Fitzpatrickc68dd6d2014-11-20 18:27:29 -08001331 b.stream.conn.sendWindowUpdate(b.stream, n)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001332 }
1333 return
1334}
1335
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001336// responseWriter is the http.ResponseWriter implementation. It's
1337// intentionally small (1 pointer wide) to minimize garbage. The
1338// responseWriterState pointer inside is zeroed at the end of a
1339// request (in handlerDone) and calls on the responseWriter thereafter
1340// simply crash (caller's mistake), but the much larger responseWriterState
1341// and buffers are reused between multiple requests.
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001342type responseWriter struct {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001343 rws *responseWriterState
1344}
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001345
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001346// Optional http.ResponseWriter interfaces implemented.
1347var (
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001348 _ http.CloseNotifier = (*responseWriter)(nil)
1349 _ http.Flusher = (*responseWriter)(nil)
1350 _ stringWriter = (*responseWriter)(nil)
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001351)
1352
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001353type responseWriterState struct {
1354 // immutable within a request:
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001355 stream *stream
1356 req *http.Request
1357 body *requestBody // to close at end of request, if DATA frames didn't
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001358
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001359 // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001360 bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState}
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001361
1362 // mutated by http.Handler goroutine:
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001363 handlerHeader http.Header // nil until called
1364 snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001365 status int // status code passed to WriteHeader
Brad Fitzpatrick3c8c6132014-11-20 18:34:52 -08001366 wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001367 sentHeader bool // have we sent the header frame?
1368 handlerDone bool // handler has finished
Brad Fitzpatrick3c8c6132014-11-20 18:34:52 -08001369 curWrite dataWriteParams
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -08001370 frameWriteCh chan error // re-used whenever we need to block on a frame being written
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001371
1372 closeNotifierMu sync.Mutex // guards closeNotifierCh
1373 closeNotifierCh chan bool // nil until first used
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001374}
1375
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -08001376func (rws *responseWriterState) writeData(p []byte, end bool) error {
Brad Fitzpatrick23564bf2014-11-27 19:40:04 -08001377 rws.curWrite.streamID = rws.stream.id
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -08001378 rws.curWrite.p = p
1379 rws.curWrite.end = end
1380 return rws.stream.conn.writeData(rws.stream, &rws.curWrite, rws.frameWriteCh)
1381}
1382
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001383type chunkWriter struct{ rws *responseWriterState }
1384
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001385func (cw chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001386
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001387// writeChunk writes chunks from the bufio.Writer. But because
1388// bufio.Writer may bypass its chunking, sometimes p may be
1389// arbitrarily large.
1390//
1391// writeChunk is also responsible (on the first chunk) for sending the
1392// HEADER response.
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001393func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
1394 if !rws.wroteHeader {
1395 rws.writeHeader(200)
1396 }
1397 if !rws.sentHeader {
1398 rws.sentHeader = true
1399 var ctype, clen string // implicit ones, if we can calculate it
1400 if rws.handlerDone && rws.snapHeader.Get("Content-Length") == "" {
1401 clen = strconv.Itoa(len(p))
1402 }
1403 if rws.snapHeader.Get("Content-Type") == "" {
1404 ctype = http.DetectContentType(p)
1405 }
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001406 endStream := rws.handlerDone && len(p) == 0
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001407 rws.stream.conn.writeHeaders(headerWriteReq{
1408 stream: rws.stream,
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001409 httpResCode: rws.status,
1410 h: rws.snapHeader,
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001411 endStream: endStream,
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001412 contentType: ctype,
1413 contentLength: clen,
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -08001414 }, rws.frameWriteCh)
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001415 if endStream {
1416 return
1417 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001418 }
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001419 if len(p) == 0 {
1420 if rws.handlerDone {
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -08001421 err = rws.writeData(nil, true)
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001422 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001423 return
1424 }
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001425 for len(p) > 0 {
1426 chunk := p
1427 if len(chunk) > handlerChunkWriteSize {
1428 chunk = chunk[:handlerChunkWriteSize]
1429 }
Brad Fitzpatrick4c687c62014-11-23 00:07:43 -08001430 allowedSize := rws.stream.flow.wait(int32(len(chunk)))
1431 if allowedSize == 0 {
1432 return n, errStreamBroken
1433 }
1434 chunk = chunk[:allowedSize]
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001435 p = p[len(chunk):]
Brad Fitzpatrickdc0c5c02014-11-20 14:15:26 -08001436 isFinal := rws.handlerDone && len(p) == 0
1437 err = rws.writeData(chunk, isFinal)
Brad Fitzpatrick3d7a3ad2014-11-15 21:38:23 -08001438 if err != nil {
1439 break
1440 }
1441 n += len(chunk)
1442 }
1443 return
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001444}
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001445
1446func (w *responseWriter) Flush() {
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001447 rws := w.rws
1448 if rws == nil {
1449 panic("Header called after Handler finished")
1450 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001451 if rws.bw.Buffered() > 0 {
1452 if err := rws.bw.Flush(); err != nil {
1453 // Ignore the error. The frame writer already knows.
1454 return
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001455 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001456 } else {
1457 // The bufio.Writer won't call chunkWriter.Write
1458 // (writeChunk with zero bytes, so we have to do it
1459 // ourselves to force the HTTP response header and/or
1460 // final DATA frame (with END_STREAM) to be sent.
1461 rws.writeChunk(nil)
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001462 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001463}
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001464
Brad Fitzpatrickbd391962014-11-17 18:58:58 -08001465func (w *responseWriter) CloseNotify() <-chan bool {
1466 rws := w.rws
1467 if rws == nil {
1468 panic("CloseNotify called after Handler finished")
1469 }
1470 rws.closeNotifierMu.Lock()
1471 ch := rws.closeNotifierCh
1472 if ch == nil {
1473 ch = make(chan bool, 1)
1474 rws.closeNotifierCh = ch
1475 go func() {
1476 rws.stream.cw.Wait() // wait for close
1477 ch <- true
1478 }()
1479 }
1480 rws.closeNotifierMu.Unlock()
1481 return ch
1482}
1483
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001484func (w *responseWriter) Header() http.Header {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001485 rws := w.rws
1486 if rws == nil {
1487 panic("Header called after Handler finished")
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001488 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001489 if rws.handlerHeader == nil {
1490 rws.handlerHeader = make(http.Header)
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001491 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001492 return rws.handlerHeader
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001493}
1494
1495func (w *responseWriter) WriteHeader(code int) {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001496 rws := w.rws
1497 if rws == nil {
1498 panic("WriteHeader called after Handler finished")
1499 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001500 rws.writeHeader(code)
1501}
1502
1503func (rws *responseWriterState) writeHeader(code int) {
1504 if !rws.wroteHeader {
1505 rws.wroteHeader = true
1506 rws.status = code
1507 if len(rws.handlerHeader) > 0 {
1508 rws.snapHeader = cloneHeader(rws.handlerHeader)
1509 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001510 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001511}
1512
1513func cloneHeader(h http.Header) http.Header {
1514 h2 := make(http.Header, len(h))
1515 for k, vv := range h {
1516 vv2 := make([]string, len(vv))
1517 copy(vv2, vv)
1518 h2[k] = vv2
1519 }
1520 return h2
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001521}
1522
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001523// The Life Of A Write is like this:
1524//
Brad Fitzpatricked26b482014-11-26 09:16:43 -08001525// * Handler calls w.Write or w.WriteString ->
1526// * -> rws.bw (*bufio.Writer) ->
1527// * (Handler migth call Flush)
1528// * -> chunkWriter{rws}
1529// * -> responseWriterState.writeChunk(p []byte)
1530// * -> responseWriterState.writeChunk (most of the magic; see comment there)
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001531func (w *responseWriter) Write(p []byte) (n int, err error) {
1532 return w.write(len(p), p, "")
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001533}
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001534
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001535func (w *responseWriter) WriteString(s string) (n int, err error) {
1536 return w.write(len(s), nil, s)
1537}
1538
1539// either dataB or dataS is non-zero.
1540func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001541 rws := w.rws
1542 if rws == nil {
1543 panic("Write called after Handler finished")
1544 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001545 if !rws.wroteHeader {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001546 w.WriteHeader(200)
1547 }
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001548 if dataB != nil {
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001549 return rws.bw.Write(dataB)
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001550 } else {
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001551 return rws.bw.WriteString(dataS)
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001552 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001553}
1554
1555func (w *responseWriter) handlerDone() {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001556 rws := w.rws
1557 if rws == nil {
1558 panic("handlerDone called twice")
1559 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001560 rws.handlerDone = true
1561 w.Flush()
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001562 w.rws = nil
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001563 responseWriterStatePool.Put(rws)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001564}