blob: 8ae0bab71dff8e97158f3dca1805df5066f6a6ed [file] [log] [blame]
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001// Copyright 2014 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4// See https://code.google.com/p/go/source/browse/CONTRIBUTORS
5// Licensed under the same terms as Go itself:
6// https://code.google.com/p/go/source/browse/LICENSE
7
8package http2
9
10import (
Brad Fitzpatrick390047e2014-11-14 20:37:08 -080011 "bufio"
Brad Fitzpatrickb331b812014-11-13 11:51:54 -080012 "bytes"
13 "crypto/tls"
14 "errors"
15 "fmt"
16 "io"
17 "log"
18 "net"
19 "net/http"
20 "net/url"
21 "strconv"
22 "strings"
Brad Fitzpatrick729bd722014-11-13 14:09:36 -080023 "sync"
Brad Fitzpatrickb331b812014-11-13 11:51:54 -080024
25 "github.com/bradfitz/http2/hpack"
26)
27
28// TODO: finish GOAWAY support. Consider each incoming frame type and whether
29// it should be ignored during a shutdown race.
30
31// Server is an HTTP/2 server.
32type Server struct {
33 // MaxStreams optionally ...
34 MaxStreams int
35}
36
Brad Fitzpatrickb5469d22014-11-13 12:17:52 -080037var testHookOnConn func() // for testing
38
39// ConfigureServer adds HTTP/2 support to a net/http Server.
40//
41// The configuration conf may be nil.
42//
43// ConfigureServer must be called before s begins serving.
44func ConfigureServer(s *http.Server, conf *Server) {
45 if conf == nil {
46 conf = new(Server)
47 }
48 if s.TLSConfig == nil {
49 s.TLSConfig = new(tls.Config)
50 }
51 haveNPN := false
52 for _, p := range s.TLSConfig.NextProtos {
53 if p == npnProto {
54 haveNPN = true
55 break
56 }
57 }
58 if !haveNPN {
59 s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, npnProto)
60 }
61
62 if s.TLSNextProto == nil {
63 s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){}
64 }
65 s.TLSNextProto[npnProto] = func(hs *http.Server, c *tls.Conn, h http.Handler) {
66 if testHookOnConn != nil {
67 testHookOnConn()
68 }
69 conf.handleConn(hs, c, h)
70 }
71}
72
Brad Fitzpatrickb331b812014-11-13 11:51:54 -080073func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) {
74 sc := &serverConn{
75 hs: hs,
76 conn: c,
77 handler: h,
78 framer: NewFramer(c, c), // TODO: write to a (custom?) buffered writer that can alternate when it's in buffered mode.
79 streams: make(map[uint32]*stream),
80 canonHeader: make(map[string]string),
81 readFrameCh: make(chan frameAndProcessed),
Brad Fitzpatrick520123b2014-11-14 15:57:37 -080082 readFrameErrCh: make(chan error, 1), // must be buffered for 1
83 wantWriteFrameCh: make(chan frameWriteMsg, 8),
84 writeFrameCh: make(chan frameWriteMsg, 1), // may be 0 or 1, but more is useless. (max 1 in flight)
85 wroteFrameCh: make(chan struct{}, 1),
Brad Fitzpatrickb331b812014-11-13 11:51:54 -080086 flow: newFlow(initialWindowSize),
87 doneServing: make(chan struct{}),
88 maxWriteFrameSize: initialMaxFrameSize,
89 initialWindowSize: initialWindowSize,
90 serveG: newGoroutineLock(),
91 }
92 sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf)
93 sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField)
94 sc.serve()
95}
96
97// frameAndProcessed coordinates the readFrames and serve goroutines, since
98// the Framer interface only permits the most recently-read Frame from being
99// accessed. The serve goroutine sends on processed to signal to the readFrames
100// goroutine that another frame may be read.
101type frameAndProcessed struct {
102 f Frame
103 processed chan struct{}
104}
105
106type serverConn struct {
107 // Immutable:
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800108 hs *http.Server
109 conn net.Conn
110 handler http.Handler
111 framer *Framer
112 hpackDecoder *hpack.Decoder
113 doneServing chan struct{} // closed when serverConn.serve ends
114 readFrameCh chan frameAndProcessed // written by serverConn.readFrames
115 readFrameErrCh chan error
116 wantWriteFrameCh chan frameWriteMsg // from handlers -> serve
117 writeFrameCh chan frameWriteMsg // from serve -> writeFrames
118 wroteFrameCh chan struct{} // from writeFrames -> serve, tickles more sends on writeFrameCh
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800119
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800120 serveG goroutineLock // used to verify funcs are on serve()
121 writeG goroutineLock // used to verify things running on writeLoop
122 flow *flow // connection-wide (not stream-specific) flow control
123
124 // Everything following is owned by the serve loop; use serveG.check():
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800125 maxStreamID uint32 // max ever seen
126 streams map[uint32]*stream
127 maxWriteFrameSize uint32 // TODO: update this when settings come in
128 initialWindowSize int32
129 canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case
130 sentGoAway bool
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800131 req requestParam // non-zero while reading request headers
132 writingFrame bool // sent on writeFrameCh but haven't heard back on wroteFrameCh yet
133 writeQueue []frameWriteMsg // TODO: proper scheduler, not a queue
134
135 // Owned by the writeFrames goroutine; use writeG.check():
136 headerWriteBuf bytes.Buffer
137 hpackEncoder *hpack.Encoder
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800138}
139
140// requestParam is the state of the next request, initialized over
141// potentially several frames HEADERS + zero or more CONTINUATION
142// frames.
143type requestParam struct {
144 // stream is non-nil if we're reading (HEADER or CONTINUATION)
145 // frames for a request (but not DATA).
146 stream *stream
147 header http.Header
148 method, path string
149 scheme, authority string
150 sawRegularHeader bool // saw a non-pseudo header already
151 invalidHeader bool // an invalid header was seen
152}
153
154type stream struct {
155 id uint32
156 state streamState // owned by serverConn's processing loop
157 flow *flow // limits writing from Handler to client
158 body *pipe // non-nil if expecting DATA frames
159
160 bodyBytes int64 // body bytes seen so far
161 declBodyBytes int64 // or -1 if undeclared
162}
163
164func (sc *serverConn) state(streamID uint32) streamState {
165 sc.serveG.check()
166 // http://http2.github.io/http2-spec/#rfc.section.5.1
167 if st, ok := sc.streams[streamID]; ok {
168 return st.state
169 }
170 // "The first use of a new stream identifier implicitly closes all
171 // streams in the "idle" state that might have been initiated by
172 // that peer with a lower-valued stream identifier. For example, if
173 // a client sends a HEADERS frame on stream 7 without ever sending a
174 // frame on stream 5, then stream 5 transitions to the "closed"
175 // state when the first frame for stream 7 is sent or received."
176 if streamID <= sc.maxStreamID {
177 return stateClosed
178 }
179 return stateIdle
180}
181
182func (sc *serverConn) vlogf(format string, args ...interface{}) {
183 if VerboseLogs {
184 sc.logf(format, args...)
185 }
186}
187
188func (sc *serverConn) logf(format string, args ...interface{}) {
189 if lg := sc.hs.ErrorLog; lg != nil {
190 lg.Printf(format, args...)
191 } else {
192 log.Printf(format, args...)
193 }
194}
195
196func (sc *serverConn) condlogf(err error, format string, args ...interface{}) {
197 if err == nil {
198 return
199 }
200 str := err.Error()
201 if strings.Contains(str, "use of closed network connection") {
202 // Boring, expected errors.
203 sc.vlogf(format, args...)
204 } else {
205 sc.logf(format, args...)
206 }
207}
208
209func (sc *serverConn) onNewHeaderField(f hpack.HeaderField) {
210 sc.serveG.check()
211 switch {
212 case !validHeader(f.Name):
213 sc.req.invalidHeader = true
214 case strings.HasPrefix(f.Name, ":"):
215 if sc.req.sawRegularHeader {
216 sc.logf("pseudo-header after regular header")
217 sc.req.invalidHeader = true
218 return
219 }
220 var dst *string
221 switch f.Name {
222 case ":method":
223 dst = &sc.req.method
224 case ":path":
225 dst = &sc.req.path
226 case ":scheme":
227 dst = &sc.req.scheme
228 case ":authority":
229 dst = &sc.req.authority
230 default:
231 // 8.1.2.1 Pseudo-Header Fields
232 // "Endpoints MUST treat a request or response
233 // that contains undefined or invalid
234 // pseudo-header fields as malformed (Section
235 // 8.1.2.6)."
236 sc.logf("invalid pseudo-header %q", f.Name)
237 sc.req.invalidHeader = true
238 return
239 }
240 if *dst != "" {
241 sc.logf("duplicate pseudo-header %q sent", f.Name)
242 sc.req.invalidHeader = true
243 return
244 }
245 *dst = f.Value
246 case f.Name == "cookie":
247 sc.req.sawRegularHeader = true
248 if s, ok := sc.req.header["Cookie"]; ok && len(s) == 1 {
249 s[0] = s[0] + "; " + f.Value
250 } else {
251 sc.req.header.Add("Cookie", f.Value)
252 }
253 default:
254 sc.req.sawRegularHeader = true
255 sc.req.header.Add(sc.canonicalHeader(f.Name), f.Value)
256 }
257}
258
259func (sc *serverConn) canonicalHeader(v string) string {
260 sc.serveG.check()
261 // TODO: use a sync.Pool instead of putting the cache on *serverConn?
262 cv, ok := sc.canonHeader[v]
263 if !ok {
264 cv = http.CanonicalHeaderKey(v)
265 sc.canonHeader[v] = cv
266 }
267 return cv
268}
269
270// readFrames is the loop that reads incoming frames.
271// It's run on its own goroutine.
272func (sc *serverConn) readFrames() {
273 processed := make(chan struct{}, 1)
274 for {
275 f, err := sc.framer.ReadFrame()
276 if err != nil {
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800277 sc.readFrameErrCh <- err // BEFORE the close
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800278 close(sc.readFrameCh)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800279 return
280 }
281 sc.readFrameCh <- frameAndProcessed{f, processed}
282 <-processed
283 }
284}
285
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800286// writeFrames is the loop that writes frames to the peer
287// and is responsible for prioritization and buffering.
288// It's run on its own goroutine.
289func (sc *serverConn) writeFrames() {
290 sc.writeG = newGoroutineLock()
291 for wm := range sc.writeFrameCh {
292 err := wm.write(sc, wm.v)
293 if ch := wm.done; ch != nil {
Brad Fitzpatrickbe341152014-11-14 16:05:41 -0800294 select {
295 case ch <- err:
296 default:
297 panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.v))
298 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800299 }
300 sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler
301 }
302}
303
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800304func (sc *serverConn) serve() {
305 sc.serveG.check()
306 defer sc.conn.Close()
307 defer close(sc.doneServing)
308
309 sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
310
311 // Read the client preface
312 buf := make([]byte, len(ClientPreface))
313 // TODO: timeout reading from the client
314 if _, err := io.ReadFull(sc.conn, buf); err != nil {
315 sc.logf("error reading client preface: %v", err)
316 return
317 }
318 if !bytes.Equal(buf, clientPreface) {
319 sc.logf("bogus greeting from client: %q", buf)
320 return
321 }
322 sc.vlogf("client %v said hello", sc.conn.RemoteAddr())
323
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800324 f, err := sc.framer.ReadFrame() // TODO: timeout
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800325 if err != nil {
326 sc.logf("error reading initial frame from client: %v", err)
327 return
328 }
329 sf, ok := f.(*SettingsFrame)
330 if !ok {
331 sc.logf("invalid initial frame type %T received from client", f)
332 return
333 }
334 if err := sf.ForeachSetting(sc.processSetting); err != nil {
335 sc.logf("initial settings error: %v", err)
336 return
337 }
338
339 // TODO: don't send two network packets for our SETTINGS + our
340 // ACK of their settings. But if we make framer write to a
341 // *bufio.Writer, that increases the per-connection memory
342 // overhead, and there could be many idle conns. So maybe some
343 // liveswitchWriter-like thing where we only switch to a
344 // *bufio Writer when we really need one temporarily, else go
345 // back to an unbuffered writes by default.
346 if err := sc.framer.WriteSettings( /* TODO: actual settings */ ); err != nil {
347 sc.logf("error writing server's initial settings: %v", err)
348 return
349 }
350 if err := sc.framer.WriteSettingsAck(); err != nil {
351 sc.logf("error writing server's ack of client's settings: %v", err)
352 return
353 }
354
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800355 go sc.readFrames() // closed by defer sc.conn.Close above
356 go sc.writeFrames()
357 defer close(sc.writeFrameCh) // shuts down writeFrames loop
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800358
359 for {
360 select {
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800361 case wm := <-sc.wantWriteFrameCh:
362 sc.enqueueFrameWrite(wm)
363 case <-sc.wroteFrameCh:
364 sc.writingFrame = false
365 sc.scheduleFrameWrite()
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800366 case fp, ok := <-sc.readFrameCh:
367 if !ok {
368 err := <-sc.readFrameErrCh
369 if err != io.EOF {
370 errstr := err.Error()
371 if !strings.Contains(errstr, "use of closed network connection") {
372 sc.logf("client %s stopped sending frames: %v", sc.conn.RemoteAddr(), errstr)
373 }
374 }
375 return
376 }
377 f := fp.f
378 sc.vlogf("got %v: %#v", f.Header(), f)
379 err := sc.processFrame(f)
380 fp.processed <- struct{}{} // let readFrames proceed
381 switch ev := err.(type) {
382 case nil:
383 // nothing.
384 case StreamError:
385 if err := sc.resetStreamInLoop(ev); err != nil {
386 sc.logf("Error writing RSTSTream: %v", err)
387 return
388 }
389 case ConnectionError:
390 sc.logf("Disconnecting; %v", ev)
391 return
392 case goAwayFlowError:
393 if err := sc.goAway(ErrCodeFlowControl); err != nil {
394 sc.condlogf(err, "failed to GOAWAY: %v", err)
395 return
396 }
397 default:
398 sc.logf("Disconnection due to other error: %v", err)
399 return
400 }
401 }
402 }
403}
404
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800405func (sc *serverConn) enqueueFrameWrite(wm frameWriteMsg) {
406 sc.serveG.check()
407 // Fast path for common case:
408 if !sc.writingFrame {
409 sc.writingFrame = true
410 sc.writeFrameCh <- wm
411 return
412 }
413 sc.writeQueue = append(sc.writeQueue, wm) // TODO: proper scheduler
414}
415
416func (sc *serverConn) scheduleFrameWrite() {
417 sc.serveG.check()
418 if len(sc.writeQueue) == 0 {
419 // TODO: flush Framer's underlying buffered writer, once that's added
420 return
421 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800422
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800423 // TODO: proper scheduler
424 wm := sc.writeQueue[0]
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800425 // shift it all down. kinda lame. will be removed later anyway.
426 copy(sc.writeQueue, sc.writeQueue[1:])
427 sc.writeQueue = sc.writeQueue[:len(sc.writeQueue)-1]
428
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800429 sc.writingFrame = true
430 sc.writeFrameCh <- wm
431}
432
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800433func (sc *serverConn) goAway(code ErrCode) error {
434 sc.serveG.check()
435 sc.sentGoAway = true
436 return sc.framer.WriteGoAway(sc.maxStreamID, code, nil)
437}
438
439func (sc *serverConn) resetStreamInLoop(se StreamError) error {
440 sc.serveG.check()
441 if err := sc.framer.WriteRSTStream(se.streamID, uint32(se.code)); err != nil {
442 return err
443 }
444 delete(sc.streams, se.streamID)
445 return nil
446}
447
448func (sc *serverConn) curHeaderStreamID() uint32 {
449 sc.serveG.check()
450 st := sc.req.stream
451 if st == nil {
452 return 0
453 }
454 return st.id
455}
456
457func (sc *serverConn) processFrame(f Frame) error {
458 sc.serveG.check()
459
460 if s := sc.curHeaderStreamID(); s != 0 {
461 if cf, ok := f.(*ContinuationFrame); !ok {
462 return ConnectionError(ErrCodeProtocol)
463 } else if cf.Header().StreamID != s {
464 return ConnectionError(ErrCodeProtocol)
465 }
466 }
467
468 switch f := f.(type) {
469 case *SettingsFrame:
470 return sc.processSettings(f)
471 case *HeadersFrame:
472 return sc.processHeaders(f)
473 case *ContinuationFrame:
474 return sc.processContinuation(f)
475 case *WindowUpdateFrame:
476 return sc.processWindowUpdate(f)
477 case *PingFrame:
478 return sc.processPing(f)
479 case *DataFrame:
480 return sc.processData(f)
481 default:
482 log.Printf("Ignoring unknown frame %#v", f)
483 return nil
484 }
485}
486
487func (sc *serverConn) processPing(f *PingFrame) error {
488 sc.serveG.check()
489 if f.Flags.Has(FlagSettingsAck) {
490 // 6.7 PING: " An endpoint MUST NOT respond to PING frames
491 // containing this flag."
492 return nil
493 }
494 if f.StreamID != 0 {
495 // "PING frames are not associated with any individual
496 // stream. If a PING frame is received with a stream
497 // identifier field value other than 0x0, the recipient MUST
498 // respond with a connection error (Section 5.4.1) of type
499 // PROTOCOL_ERROR."
500 return ConnectionError(ErrCodeProtocol)
501 }
502 return sc.framer.WritePing(true, f.Data)
503}
504
505func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error {
506 sc.serveG.check()
507 switch {
508 case f.StreamID != 0: // stream-level flow control
509 st := sc.streams[f.StreamID]
510 if st == nil {
511 // "WINDOW_UPDATE can be sent by a peer that has sent a
512 // frame bearing the END_STREAM flag. This means that a
513 // receiver could receive a WINDOW_UPDATE frame on a "half
514 // closed (remote)" or "closed" stream. A receiver MUST
515 // NOT treat this as an error, see Section 5.1."
516 return nil
517 }
518 if !st.flow.add(int32(f.Increment)) {
519 return StreamError{f.StreamID, ErrCodeFlowControl}
520 }
521 default: // connection-level flow control
522 if !sc.flow.add(int32(f.Increment)) {
523 return goAwayFlowError{}
524 }
525 }
526 return nil
527}
528
529func (sc *serverConn) processSettings(f *SettingsFrame) error {
530 sc.serveG.check()
531 return f.ForeachSetting(sc.processSetting)
532}
533
534func (sc *serverConn) processSetting(s Setting) error {
535 sc.serveG.check()
536 sc.vlogf("processing setting %v", s)
537 switch s.ID {
538 case SettingInitialWindowSize:
539 return sc.processSettingInitialWindowSize(s.Val)
540 }
541 log.Printf("TODO: handle %v", s)
542 return nil
543}
544
545func (sc *serverConn) processSettingInitialWindowSize(val uint32) error {
546 sc.serveG.check()
547 if val > (1<<31 - 1) {
548 // 6.5.2 Defined SETTINGS Parameters
549 // "Values above the maximum flow control window size of
550 // 231-1 MUST be treated as a connection error (Section
551 // 5.4.1) of type FLOW_CONTROL_ERROR."
552 return ConnectionError(ErrCodeFlowControl)
553 }
554
555 // "A SETTINGS frame can alter the initial flow control window
556 // size for all current streams. When the value of
557 // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST
558 // adjust the size of all stream flow control windows that it
559 // maintains by the difference between the new value and the
560 // old value."
561 old := sc.initialWindowSize
562 sc.initialWindowSize = int32(val)
563 growth := sc.initialWindowSize - old // may be negative
564 for _, st := range sc.streams {
565 if !st.flow.add(growth) {
566 // 6.9.2 Initial Flow Control Window Size
567 // "An endpoint MUST treat a change to
568 // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow
569 // control window to exceed the maximum size as a
570 // connection error (Section 5.4.1) of type
571 // FLOW_CONTROL_ERROR."
572 return ConnectionError(ErrCodeFlowControl)
573 }
574 }
575 return nil
576}
577
578func (sc *serverConn) processData(f *DataFrame) error {
579 sc.serveG.check()
580 // "If a DATA frame is received whose stream is not in "open"
581 // or "half closed (local)" state, the recipient MUST respond
582 // with a stream error (Section 5.4.2) of type STREAM_CLOSED."
583 id := f.Header().StreamID
584 st, ok := sc.streams[id]
585 if !ok || (st.state != stateOpen && st.state != stateHalfClosedLocal) {
586 return StreamError{id, ErrCodeStreamClosed}
587 }
588 if st.body == nil {
589 // Not expecting data.
590 // TODO: which error code?
591 return StreamError{id, ErrCodeStreamClosed}
592 }
593 data := f.Data()
594
595 // Sender sending more than they'd declared?
596 if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
597 st.body.Close(fmt.Errorf("Sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
598 return StreamError{id, ErrCodeStreamClosed}
599 }
600 if len(data) > 0 {
601 // TODO: verify they're allowed to write with the flow control
602 // window we'd advertised to them.
603 // TODO: verify n from Write
604 if _, err := st.body.Write(data); err != nil {
605 return StreamError{id, ErrCodeStreamClosed}
606 }
607 st.bodyBytes += int64(len(data))
608 }
609 if f.Header().Flags.Has(FlagDataEndStream) {
610 if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
611 st.body.Close(fmt.Errorf("Request declared a Content-Length of %d but only wrote %d bytes",
612 st.declBodyBytes, st.bodyBytes))
613 } else {
614 st.body.Close(io.EOF)
615 }
616 }
617 return nil
618}
619
620func (sc *serverConn) processHeaders(f *HeadersFrame) error {
621 sc.serveG.check()
622 id := f.Header().StreamID
623 if sc.sentGoAway {
624 // Ignore.
625 return nil
626 }
627 // http://http2.github.io/http2-spec/#rfc.section.5.1.1
628 if id%2 != 1 || id <= sc.maxStreamID || sc.req.stream != nil {
629 // Streams initiated by a client MUST use odd-numbered
630 // stream identifiers. [...] The identifier of a newly
631 // established stream MUST be numerically greater than all
632 // streams that the initiating endpoint has opened or
633 // reserved. [...] An endpoint that receives an unexpected
634 // stream identifier MUST respond with a connection error
635 // (Section 5.4.1) of type PROTOCOL_ERROR.
636 return ConnectionError(ErrCodeProtocol)
637 }
638 if id > sc.maxStreamID {
639 sc.maxStreamID = id
640 }
641 st := &stream{
642 id: id,
643 state: stateOpen,
644 flow: newFlow(sc.initialWindowSize),
645 }
646 if f.Header().Flags.Has(FlagHeadersEndStream) {
647 st.state = stateHalfClosedRemote
648 }
649 sc.streams[id] = st
650 sc.req = requestParam{
651 stream: st,
652 header: make(http.Header),
653 }
654 return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded())
655}
656
657func (sc *serverConn) processContinuation(f *ContinuationFrame) error {
658 sc.serveG.check()
659 st := sc.streams[f.Header().StreamID]
660 if st == nil || sc.curHeaderStreamID() != st.id {
661 return ConnectionError(ErrCodeProtocol)
662 }
663 return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded())
664}
665
666func (sc *serverConn) processHeaderBlockFragment(st *stream, frag []byte, end bool) error {
667 sc.serveG.check()
668 if _, err := sc.hpackDecoder.Write(frag); err != nil {
669 // TODO: convert to stream error I assume?
670 return err
671 }
672 if !end {
673 return nil
674 }
675 if err := sc.hpackDecoder.Close(); err != nil {
676 // TODO: convert to stream error I assume?
677 return err
678 }
679 rw, req, err := sc.newWriterAndRequest()
680 sc.req = requestParam{}
681 if err != nil {
682 return err
683 }
684 st.body = req.Body.(*requestBody).pipe // may be nil
685 st.declBodyBytes = req.ContentLength
686 go sc.runHandler(rw, req)
687 return nil
688}
689
690func (sc *serverConn) newWriterAndRequest() (*responseWriter, *http.Request, error) {
691 sc.serveG.check()
692 rp := &sc.req
693 if rp.invalidHeader || rp.method == "" || rp.path == "" ||
694 (rp.scheme != "https" && rp.scheme != "http") {
695 // See 8.1.2.6 Malformed Requests and Responses:
696 //
697 // Malformed requests or responses that are detected
698 // MUST be treated as a stream error (Section 5.4.2)
699 // of type PROTOCOL_ERROR."
700 //
701 // 8.1.2.3 Request Pseudo-Header Fields
702 // "All HTTP/2 requests MUST include exactly one valid
703 // value for the :method, :scheme, and :path
704 // pseudo-header fields"
705 return nil, nil, StreamError{rp.stream.id, ErrCodeProtocol}
706 }
707 var tlsState *tls.ConnectionState // make this non-nil if https
708 if rp.scheme == "https" {
709 // TODO: get from sc's ConnectionState
710 tlsState = &tls.ConnectionState{}
711 }
712 authority := rp.authority
713 if authority == "" {
714 authority = rp.header.Get("Host")
715 }
716 bodyOpen := rp.stream.state == stateOpen
717 body := &requestBody{
718 sc: sc,
719 streamID: rp.stream.id,
720 }
Brad Fitzpatrickff0471b2014-11-14 21:55:14 -0800721 url, err := url.ParseRequestURI(rp.path)
722 if err != nil {
723 // TODO: find the right error code?
724 return nil, nil, StreamError{rp.stream.id, ErrCodeProtocol}
725 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800726 req := &http.Request{
727 Method: rp.method,
Brad Fitzpatrickff0471b2014-11-14 21:55:14 -0800728 URL: url,
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800729 RemoteAddr: sc.conn.RemoteAddr().String(),
730 Header: rp.header,
731 RequestURI: rp.path,
732 Proto: "HTTP/2.0",
733 ProtoMajor: 2,
734 ProtoMinor: 0,
735 TLS: tlsState,
736 Host: authority,
737 Body: body,
738 }
739 if bodyOpen {
740 body.pipe = &pipe{
741 b: buffer{buf: make([]byte, 65536)}, // TODO: share/remove
742 }
743 body.pipe.c.L = &body.pipe.m
744
745 if vv, ok := rp.header["Content-Length"]; ok {
746 req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
747 } else {
748 req.ContentLength = -1
749 }
750 }
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800751
752 rws := responseWriterStatePool.Get().(*responseWriterState)
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800753 bwSave := rws.bw
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800754 *rws = responseWriterState{} // zero all the fields
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800755 rws.bw = bwSave
756 rws.bw.Reset(chunkWriter{rws})
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800757 rws.sc = sc
758 rws.streamID = rp.stream.id
759 rws.req = req
760 rws.body = body
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800761 rws.chunkWrittenCh = make(chan error, 1)
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800762
763 rw := &responseWriter{rws: rws}
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800764 return rw, req, nil
765}
766
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800767const handlerChunkWriteSize = 4 << 10
768
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800769var responseWriterStatePool = sync.Pool{
770 New: func() interface{} {
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800771 rws := &responseWriterState{}
772 rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize)
773 return rws
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800774 },
775}
776
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800777// Run on its own goroutine.
778func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request) {
779 defer rw.handlerDone()
780 // TODO: catch panics like net/http.Server
781 sc.handler.ServeHTTP(rw, req)
782}
783
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800784type frameWriteMsg struct {
785 // write runs on the writeFrames goroutine.
786 write func(sc *serverConn, v interface{}) error
787
788 v interface{} // passed to write
789 cost uint32 // number of flow control bytes required
790 streamID uint32 // used for prioritization
791
792 // done, if non-nil, must be a buffered channel with space for
793 // 1 message and is sent the return value from write (or an
794 // earlier error) when the frame has been written.
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800795 done chan error
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800796}
797
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800798// headerWriteReq is a request to write an HTTP response header from a server Handler.
799type headerWriteReq struct {
800 streamID uint32
801 httpResCode int
802 h http.Header // may be nil
803 endStream bool
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800804
805 contentType string
806 contentLength string
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800807}
808
809// called from handler goroutines.
810// h may be nil.
811func (sc *serverConn) writeHeader(req headerWriteReq) {
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800812 var errc chan error
813 if req.h != nil {
814 // If there's a header map (which we don't own), so we have to block on
815 // waiting for this frame to be written, so an http.Flush mid-handler
816 // writes out the correct value of keys, before a handler later potentially
817 // mutates it.
818 errc = make(chan error, 1)
819 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800820 sc.wantWriteFrameCh <- frameWriteMsg{
821 write: (*serverConn).writeHeaderInLoop,
822 v: req,
823 streamID: req.streamID,
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800824 done: errc,
825 }
826 if errc != nil {
827 <-errc
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800828 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800829}
830
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800831func (sc *serverConn) writeHeaderInLoop(v interface{}) error {
832 sc.writeG.check()
833 req := v.(headerWriteReq)
834
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800835 sc.headerWriteBuf.Reset()
836 sc.hpackEncoder.WriteField(hpack.HeaderField{Name: ":status", Value: httpCodeString(req.httpResCode)})
837 for k, vv := range req.h {
838 for _, v := range vv {
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800839 // TODO: more of "8.1.2.2 Connection-Specific Header Fields"
840 if k == "Transfer-Encoding" && v != "trailers" {
841 continue
842 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800843 // TODO: for gargage, cache lowercase copies of headers at
844 // least for common ones and/or popular recent ones for
845 // this serverConn. LRU?
846 sc.hpackEncoder.WriteField(hpack.HeaderField{Name: strings.ToLower(k), Value: v})
847 }
848 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800849 if req.contentType != "" {
850 sc.hpackEncoder.WriteField(hpack.HeaderField{Name: "content-type", Value: req.contentType})
851 }
852 if req.contentLength != "" {
853 sc.hpackEncoder.WriteField(hpack.HeaderField{Name: "content-length", Value: req.contentLength})
854 }
855
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800856 headerBlock := sc.headerWriteBuf.Bytes()
857 if len(headerBlock) > int(sc.maxWriteFrameSize) {
858 // we'll need continuation ones.
859 panic("TODO")
860 }
861 return sc.framer.WriteHeaders(HeadersFrameParam{
862 StreamID: req.streamID,
863 BlockFragment: headerBlock,
864 EndStream: req.endStream,
865 EndHeaders: true, // no continuation yet
866 })
867}
868
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800869func (sc *serverConn) writeDataInLoop(v interface{}) error {
870 sc.writeG.check()
871 rws := v.(*responseWriterState)
872 return sc.framer.WriteData(rws.streamID, rws.curChunkIsFinal, rws.curChunk)
873}
874
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800875type windowUpdateReq struct {
876 streamID uint32
877 n uint32
878}
879
880// called from handler goroutines
881func (sc *serverConn) sendWindowUpdate(streamID uint32, n int) {
882 const maxUint32 = 2147483647
883 for n >= maxUint32 {
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800884 sc.wantWriteFrameCh <- frameWriteMsg{
885 write: (*serverConn).sendWindowUpdateInLoop,
886 v: windowUpdateReq{streamID, maxUint32},
887 streamID: streamID,
888 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800889 n -= maxUint32
890 }
891 if n > 0 {
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800892 sc.wantWriteFrameCh <- frameWriteMsg{
893 write: (*serverConn).sendWindowUpdateInLoop,
894 v: windowUpdateReq{streamID, uint32(n)},
895 streamID: streamID,
896 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800897 }
898}
899
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800900func (sc *serverConn) sendWindowUpdateInLoop(v interface{}) error {
901 sc.writeG.check()
902 wu := v.(windowUpdateReq)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800903 if err := sc.framer.WriteWindowUpdate(0, wu.n); err != nil {
904 return err
905 }
906 if err := sc.framer.WriteWindowUpdate(wu.streamID, wu.n); err != nil {
907 return err
908 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800909 return nil
910}
911
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800912type requestBody struct {
913 sc *serverConn
914 streamID uint32
915 closed bool
916 pipe *pipe // non-nil if we have a HTTP entity message body
917}
918
919var errClosedBody = errors.New("body closed by handler")
920
921func (b *requestBody) Close() error {
922 if b.pipe != nil {
923 b.pipe.Close(errClosedBody)
924 }
925 b.closed = true
926 return nil
927}
928
929func (b *requestBody) Read(p []byte) (n int, err error) {
930 if b.pipe == nil {
931 return 0, io.EOF
932 }
933 n, err = b.pipe.Read(p)
934 if n > 0 {
935 b.sc.sendWindowUpdate(b.streamID, n)
936 // TODO: tell b.sc to send back 'n' flow control quota credits to the sender
937 }
938 return
939}
940
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800941// responseWriter is the http.ResponseWriter implementation. It's
942// intentionally small (1 pointer wide) to minimize garbage. The
943// responseWriterState pointer inside is zeroed at the end of a
944// request (in handlerDone) and calls on the responseWriter thereafter
945// simply crash (caller's mistake), but the much larger responseWriterState
946// and buffers are reused between multiple requests.
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800947type responseWriter struct {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800948 rws *responseWriterState
949}
Brad Fitzpatrickb331b812014-11-13 11:51:54 -0800950
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800951// Optional http.ResponseWriter interfaces implemented.
952var (
953 _ http.Flusher = (*responseWriter)(nil)
954 _ stringWriter = (*responseWriter)(nil)
955 // TODO: hijacker for websockets?
956)
957
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800958type responseWriterState struct {
959 // immutable within a request:
960 sc *serverConn
961 streamID uint32
962 req *http.Request
963 body *requestBody // to close at end of request, if DATA frames didn't
964
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800965 // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800966 bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState}
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800967
968 // mutated by http.Handler goroutine:
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800969 handlerHeader http.Header // nil until called
970 snapHeader http.Header // snapshot of handlerHeader at WriteHeader time
Brad Fitzpatrick520123b2014-11-14 15:57:37 -0800971 wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
972 status int // status code passed to WriteHeader
973 wroteContinue bool // 100 Continue response was written
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800974 sentHeader bool // have we sent the header frame?
975 handlerDone bool // handler has finished
976
977 curChunk []byte // current chunk we're writing
978 curChunkIsFinal bool
979 chunkWrittenCh chan error
Brad Fitzpatrick729bd722014-11-13 14:09:36 -0800980}
981
Brad Fitzpatrick390047e2014-11-14 20:37:08 -0800982type chunkWriter struct{ rws *responseWriterState }
983
984// chunkWriter.Write is called from bufio.Writer. Because bufio.Writer passes through large
985// writes, we break them up here if they're too big.
986func (cw chunkWriter) Write(p []byte) (n int, err error) {
987 for len(p) > 0 {
988 chunk := p
989 if len(chunk) > handlerChunkWriteSize {
990 chunk = chunk[:handlerChunkWriteSize]
991 }
992 _, err = cw.rws.writeChunk(chunk)
993 if err != nil {
994 return
995 }
996 n += len(chunk)
997 p = p[len(chunk):]
998 }
999 return n, nil
1000}
1001
1002// writeChunk writes small (max 4k, or handlerChunkWriteSize) chunks.
1003// It's also responsible for sending the HEADER response.
1004func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) {
1005 if !rws.wroteHeader {
1006 rws.writeHeader(200)
1007 }
1008 if !rws.sentHeader {
1009 rws.sentHeader = true
1010 var ctype, clen string // implicit ones, if we can calculate it
1011 if rws.handlerDone && rws.snapHeader.Get("Content-Length") == "" {
1012 clen = strconv.Itoa(len(p))
1013 }
1014 if rws.snapHeader.Get("Content-Type") == "" {
1015 ctype = http.DetectContentType(p)
1016 }
1017 rws.sc.writeHeader(headerWriteReq{
1018 streamID: rws.streamID,
1019 httpResCode: rws.status,
1020 h: rws.snapHeader,
1021 endStream: rws.handlerDone && len(p) == 0,
1022 contentType: ctype,
1023 contentLength: clen,
1024 })
1025 }
1026 if len(p) == 0 && !rws.handlerDone {
1027 return
1028 }
1029 rws.curChunk = p
1030 rws.curChunkIsFinal = rws.handlerDone
1031
1032 // TODO: await flow control tokens for both stream and conn
1033 rws.sc.wantWriteFrameCh <- frameWriteMsg{
1034 cost: uint32(len(p)),
1035 streamID: rws.streamID,
1036 write: (*serverConn).writeDataInLoop,
1037 done: rws.chunkWrittenCh,
1038 v: rws, // writeDataInLoop uses only rws.curChunk and rws.curChunkIsFinal
1039 }
1040 err = <-rws.chunkWrittenCh // block until it's written
1041 return len(p), err
1042}
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001043
1044func (w *responseWriter) Flush() {
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001045 rws := w.rws
1046 if rws == nil {
1047 panic("Header called after Handler finished")
1048 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001049 if rws.bw.Buffered() > 0 {
1050 if err := rws.bw.Flush(); err != nil {
1051 // Ignore the error. The frame writer already knows.
1052 return
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001053 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001054 } else {
1055 // The bufio.Writer won't call chunkWriter.Write
1056 // (writeChunk with zero bytes, so we have to do it
1057 // ourselves to force the HTTP response header and/or
1058 // final DATA frame (with END_STREAM) to be sent.
1059 rws.writeChunk(nil)
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001060 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001061}
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001062
1063func (w *responseWriter) Header() http.Header {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001064 rws := w.rws
1065 if rws == nil {
1066 panic("Header called after Handler finished")
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001067 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001068 if rws.handlerHeader == nil {
1069 rws.handlerHeader = make(http.Header)
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001070 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001071 return rws.handlerHeader
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001072}
1073
1074func (w *responseWriter) WriteHeader(code int) {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001075 rws := w.rws
1076 if rws == nil {
1077 panic("WriteHeader called after Handler finished")
1078 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001079 rws.writeHeader(code)
1080}
1081
1082func (rws *responseWriterState) writeHeader(code int) {
1083 if !rws.wroteHeader {
1084 rws.wroteHeader = true
1085 rws.status = code
1086 if len(rws.handlerHeader) > 0 {
1087 rws.snapHeader = cloneHeader(rws.handlerHeader)
1088 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001089 }
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001090}
1091
1092func cloneHeader(h http.Header) http.Header {
1093 h2 := make(http.Header, len(h))
1094 for k, vv := range h {
1095 vv2 := make([]string, len(vv))
1096 copy(vv2, vv)
1097 h2[k] = vv2
1098 }
1099 return h2
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001100}
1101
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001102// The Life Of A Write is like this:
1103//
1104// TODO: copy/adapt the similar comment from Go's http server.go
1105func (w *responseWriter) Write(p []byte) (n int, err error) {
1106 return w.write(len(p), p, "")
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001107}
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001108
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001109func (w *responseWriter) WriteString(s string) (n int, err error) {
1110 return w.write(len(s), nil, s)
1111}
1112
1113// either dataB or dataS is non-zero.
1114func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001115 rws := w.rws
1116 if rws == nil {
1117 panic("Write called after Handler finished")
1118 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001119 if !rws.wroteHeader {
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001120 w.WriteHeader(200)
1121 }
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001122 if dataB != nil {
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001123 return rws.bw.Write(dataB)
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001124 } else {
Brad Fitzpatrick390047e2014-11-14 20:37:08 -08001125 return rws.bw.WriteString(dataS)
Brad Fitzpatrick15a4bf32014-11-14 10:56:12 -08001126 }
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001127}
1128
1129func (w *responseWriter) handlerDone() {
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001130 rws := w.rws
1131 if rws == nil {
1132 panic("handlerDone called twice")
1133 }
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001134 rws.handlerDone = true
1135 w.Flush()
1136
Brad Fitzpatrick729bd722014-11-13 14:09:36 -08001137 w.rws = nil
Brad Fitzpatrick520123b2014-11-14 15:57:37 -08001138 responseWriterStatePool.Put(rws)
Brad Fitzpatrickb331b812014-11-13 11:51:54 -08001139}