Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1 | // Copyright 2014 The Go Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | // See https://code.google.com/p/go/source/browse/CONTRIBUTORS |
| 5 | // Licensed under the same terms as Go itself: |
| 6 | // https://code.google.com/p/go/source/browse/LICENSE |
| 7 | |
| 8 | package http2 |
| 9 | |
| 10 | import ( |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 11 | "bufio" |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 12 | "bytes" |
| 13 | "crypto/tls" |
| 14 | "errors" |
| 15 | "fmt" |
| 16 | "io" |
| 17 | "log" |
| 18 | "net" |
| 19 | "net/http" |
| 20 | "net/url" |
| 21 | "strconv" |
| 22 | "strings" |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 23 | "sync" |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 24 | |
| 25 | "github.com/bradfitz/http2/hpack" |
| 26 | ) |
| 27 | |
| 28 | // TODO: finish GOAWAY support. Consider each incoming frame type and whether |
| 29 | // it should be ignored during a shutdown race. |
| 30 | |
| 31 | // Server is an HTTP/2 server. |
| 32 | type Server struct { |
| 33 | // MaxStreams optionally ... |
| 34 | MaxStreams int |
| 35 | } |
| 36 | |
Brad Fitzpatrick | b5469d2 | 2014-11-13 12:17:52 -0800 | [diff] [blame] | 37 | var testHookOnConn func() // for testing |
| 38 | |
| 39 | // ConfigureServer adds HTTP/2 support to a net/http Server. |
| 40 | // |
| 41 | // The configuration conf may be nil. |
| 42 | // |
| 43 | // ConfigureServer must be called before s begins serving. |
| 44 | func ConfigureServer(s *http.Server, conf *Server) { |
| 45 | if conf == nil { |
| 46 | conf = new(Server) |
| 47 | } |
| 48 | if s.TLSConfig == nil { |
| 49 | s.TLSConfig = new(tls.Config) |
| 50 | } |
| 51 | haveNPN := false |
| 52 | for _, p := range s.TLSConfig.NextProtos { |
| 53 | if p == npnProto { |
| 54 | haveNPN = true |
| 55 | break |
| 56 | } |
| 57 | } |
| 58 | if !haveNPN { |
| 59 | s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, npnProto) |
| 60 | } |
| 61 | |
| 62 | if s.TLSNextProto == nil { |
| 63 | s.TLSNextProto = map[string]func(*http.Server, *tls.Conn, http.Handler){} |
| 64 | } |
| 65 | s.TLSNextProto[npnProto] = func(hs *http.Server, c *tls.Conn, h http.Handler) { |
| 66 | if testHookOnConn != nil { |
| 67 | testHookOnConn() |
| 68 | } |
| 69 | conf.handleConn(hs, c, h) |
| 70 | } |
| 71 | } |
| 72 | |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 73 | func (srv *Server) handleConn(hs *http.Server, c net.Conn, h http.Handler) { |
| 74 | sc := &serverConn{ |
| 75 | hs: hs, |
| 76 | conn: c, |
| 77 | handler: h, |
| 78 | framer: NewFramer(c, c), // TODO: write to a (custom?) buffered writer that can alternate when it's in buffered mode. |
| 79 | streams: make(map[uint32]*stream), |
| 80 | canonHeader: make(map[string]string), |
| 81 | readFrameCh: make(chan frameAndProcessed), |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 82 | readFrameErrCh: make(chan error, 1), // must be buffered for 1 |
| 83 | wantWriteFrameCh: make(chan frameWriteMsg, 8), |
| 84 | writeFrameCh: make(chan frameWriteMsg, 1), // may be 0 or 1, but more is useless. (max 1 in flight) |
| 85 | wroteFrameCh: make(chan struct{}, 1), |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 86 | flow: newFlow(initialWindowSize), |
| 87 | doneServing: make(chan struct{}), |
| 88 | maxWriteFrameSize: initialMaxFrameSize, |
| 89 | initialWindowSize: initialWindowSize, |
| 90 | serveG: newGoroutineLock(), |
| 91 | } |
| 92 | sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) |
| 93 | sc.hpackDecoder = hpack.NewDecoder(initialHeaderTableSize, sc.onNewHeaderField) |
| 94 | sc.serve() |
| 95 | } |
| 96 | |
| 97 | // frameAndProcessed coordinates the readFrames and serve goroutines, since |
| 98 | // the Framer interface only permits the most recently-read Frame from being |
| 99 | // accessed. The serve goroutine sends on processed to signal to the readFrames |
| 100 | // goroutine that another frame may be read. |
| 101 | type frameAndProcessed struct { |
| 102 | f Frame |
| 103 | processed chan struct{} |
| 104 | } |
| 105 | |
| 106 | type serverConn struct { |
| 107 | // Immutable: |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 108 | hs *http.Server |
| 109 | conn net.Conn |
| 110 | handler http.Handler |
| 111 | framer *Framer |
| 112 | hpackDecoder *hpack.Decoder |
| 113 | doneServing chan struct{} // closed when serverConn.serve ends |
| 114 | readFrameCh chan frameAndProcessed // written by serverConn.readFrames |
| 115 | readFrameErrCh chan error |
| 116 | wantWriteFrameCh chan frameWriteMsg // from handlers -> serve |
| 117 | writeFrameCh chan frameWriteMsg // from serve -> writeFrames |
| 118 | wroteFrameCh chan struct{} // from writeFrames -> serve, tickles more sends on writeFrameCh |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 119 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 120 | serveG goroutineLock // used to verify funcs are on serve() |
| 121 | writeG goroutineLock // used to verify things running on writeLoop |
| 122 | flow *flow // connection-wide (not stream-specific) flow control |
| 123 | |
| 124 | // Everything following is owned by the serve loop; use serveG.check(): |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 125 | maxStreamID uint32 // max ever seen |
| 126 | streams map[uint32]*stream |
| 127 | maxWriteFrameSize uint32 // TODO: update this when settings come in |
| 128 | initialWindowSize int32 |
| 129 | canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case |
| 130 | sentGoAway bool |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 131 | req requestParam // non-zero while reading request headers |
| 132 | writingFrame bool // sent on writeFrameCh but haven't heard back on wroteFrameCh yet |
| 133 | writeQueue []frameWriteMsg // TODO: proper scheduler, not a queue |
| 134 | |
| 135 | // Owned by the writeFrames goroutine; use writeG.check(): |
| 136 | headerWriteBuf bytes.Buffer |
| 137 | hpackEncoder *hpack.Encoder |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 138 | } |
| 139 | |
| 140 | // requestParam is the state of the next request, initialized over |
| 141 | // potentially several frames HEADERS + zero or more CONTINUATION |
| 142 | // frames. |
| 143 | type requestParam struct { |
| 144 | // stream is non-nil if we're reading (HEADER or CONTINUATION) |
| 145 | // frames for a request (but not DATA). |
| 146 | stream *stream |
| 147 | header http.Header |
| 148 | method, path string |
| 149 | scheme, authority string |
| 150 | sawRegularHeader bool // saw a non-pseudo header already |
| 151 | invalidHeader bool // an invalid header was seen |
| 152 | } |
| 153 | |
| 154 | type stream struct { |
| 155 | id uint32 |
| 156 | state streamState // owned by serverConn's processing loop |
| 157 | flow *flow // limits writing from Handler to client |
| 158 | body *pipe // non-nil if expecting DATA frames |
| 159 | |
| 160 | bodyBytes int64 // body bytes seen so far |
| 161 | declBodyBytes int64 // or -1 if undeclared |
| 162 | } |
| 163 | |
| 164 | func (sc *serverConn) state(streamID uint32) streamState { |
| 165 | sc.serveG.check() |
| 166 | // http://http2.github.io/http2-spec/#rfc.section.5.1 |
| 167 | if st, ok := sc.streams[streamID]; ok { |
| 168 | return st.state |
| 169 | } |
| 170 | // "The first use of a new stream identifier implicitly closes all |
| 171 | // streams in the "idle" state that might have been initiated by |
| 172 | // that peer with a lower-valued stream identifier. For example, if |
| 173 | // a client sends a HEADERS frame on stream 7 without ever sending a |
| 174 | // frame on stream 5, then stream 5 transitions to the "closed" |
| 175 | // state when the first frame for stream 7 is sent or received." |
| 176 | if streamID <= sc.maxStreamID { |
| 177 | return stateClosed |
| 178 | } |
| 179 | return stateIdle |
| 180 | } |
| 181 | |
| 182 | func (sc *serverConn) vlogf(format string, args ...interface{}) { |
| 183 | if VerboseLogs { |
| 184 | sc.logf(format, args...) |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | func (sc *serverConn) logf(format string, args ...interface{}) { |
| 189 | if lg := sc.hs.ErrorLog; lg != nil { |
| 190 | lg.Printf(format, args...) |
| 191 | } else { |
| 192 | log.Printf(format, args...) |
| 193 | } |
| 194 | } |
| 195 | |
| 196 | func (sc *serverConn) condlogf(err error, format string, args ...interface{}) { |
| 197 | if err == nil { |
| 198 | return |
| 199 | } |
| 200 | str := err.Error() |
| 201 | if strings.Contains(str, "use of closed network connection") { |
| 202 | // Boring, expected errors. |
| 203 | sc.vlogf(format, args...) |
| 204 | } else { |
| 205 | sc.logf(format, args...) |
| 206 | } |
| 207 | } |
| 208 | |
| 209 | func (sc *serverConn) onNewHeaderField(f hpack.HeaderField) { |
| 210 | sc.serveG.check() |
| 211 | switch { |
| 212 | case !validHeader(f.Name): |
| 213 | sc.req.invalidHeader = true |
| 214 | case strings.HasPrefix(f.Name, ":"): |
| 215 | if sc.req.sawRegularHeader { |
| 216 | sc.logf("pseudo-header after regular header") |
| 217 | sc.req.invalidHeader = true |
| 218 | return |
| 219 | } |
| 220 | var dst *string |
| 221 | switch f.Name { |
| 222 | case ":method": |
| 223 | dst = &sc.req.method |
| 224 | case ":path": |
| 225 | dst = &sc.req.path |
| 226 | case ":scheme": |
| 227 | dst = &sc.req.scheme |
| 228 | case ":authority": |
| 229 | dst = &sc.req.authority |
| 230 | default: |
| 231 | // 8.1.2.1 Pseudo-Header Fields |
| 232 | // "Endpoints MUST treat a request or response |
| 233 | // that contains undefined or invalid |
| 234 | // pseudo-header fields as malformed (Section |
| 235 | // 8.1.2.6)." |
| 236 | sc.logf("invalid pseudo-header %q", f.Name) |
| 237 | sc.req.invalidHeader = true |
| 238 | return |
| 239 | } |
| 240 | if *dst != "" { |
| 241 | sc.logf("duplicate pseudo-header %q sent", f.Name) |
| 242 | sc.req.invalidHeader = true |
| 243 | return |
| 244 | } |
| 245 | *dst = f.Value |
| 246 | case f.Name == "cookie": |
| 247 | sc.req.sawRegularHeader = true |
| 248 | if s, ok := sc.req.header["Cookie"]; ok && len(s) == 1 { |
| 249 | s[0] = s[0] + "; " + f.Value |
| 250 | } else { |
| 251 | sc.req.header.Add("Cookie", f.Value) |
| 252 | } |
| 253 | default: |
| 254 | sc.req.sawRegularHeader = true |
| 255 | sc.req.header.Add(sc.canonicalHeader(f.Name), f.Value) |
| 256 | } |
| 257 | } |
| 258 | |
| 259 | func (sc *serverConn) canonicalHeader(v string) string { |
| 260 | sc.serveG.check() |
| 261 | // TODO: use a sync.Pool instead of putting the cache on *serverConn? |
| 262 | cv, ok := sc.canonHeader[v] |
| 263 | if !ok { |
| 264 | cv = http.CanonicalHeaderKey(v) |
| 265 | sc.canonHeader[v] = cv |
| 266 | } |
| 267 | return cv |
| 268 | } |
| 269 | |
| 270 | // readFrames is the loop that reads incoming frames. |
| 271 | // It's run on its own goroutine. |
| 272 | func (sc *serverConn) readFrames() { |
| 273 | processed := make(chan struct{}, 1) |
| 274 | for { |
| 275 | f, err := sc.framer.ReadFrame() |
| 276 | if err != nil { |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 277 | sc.readFrameErrCh <- err // BEFORE the close |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 278 | close(sc.readFrameCh) |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 279 | return |
| 280 | } |
| 281 | sc.readFrameCh <- frameAndProcessed{f, processed} |
| 282 | <-processed |
| 283 | } |
| 284 | } |
| 285 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 286 | // writeFrames is the loop that writes frames to the peer |
| 287 | // and is responsible for prioritization and buffering. |
| 288 | // It's run on its own goroutine. |
| 289 | func (sc *serverConn) writeFrames() { |
| 290 | sc.writeG = newGoroutineLock() |
| 291 | for wm := range sc.writeFrameCh { |
| 292 | err := wm.write(sc, wm.v) |
| 293 | if ch := wm.done; ch != nil { |
Brad Fitzpatrick | be34115 | 2014-11-14 16:05:41 -0800 | [diff] [blame] | 294 | select { |
| 295 | case ch <- err: |
| 296 | default: |
| 297 | panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.v)) |
| 298 | } |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 299 | } |
| 300 | sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler |
| 301 | } |
| 302 | } |
| 303 | |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 304 | func (sc *serverConn) serve() { |
| 305 | sc.serveG.check() |
| 306 | defer sc.conn.Close() |
| 307 | defer close(sc.doneServing) |
| 308 | |
| 309 | sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs) |
| 310 | |
| 311 | // Read the client preface |
| 312 | buf := make([]byte, len(ClientPreface)) |
| 313 | // TODO: timeout reading from the client |
| 314 | if _, err := io.ReadFull(sc.conn, buf); err != nil { |
| 315 | sc.logf("error reading client preface: %v", err) |
| 316 | return |
| 317 | } |
| 318 | if !bytes.Equal(buf, clientPreface) { |
| 319 | sc.logf("bogus greeting from client: %q", buf) |
| 320 | return |
| 321 | } |
| 322 | sc.vlogf("client %v said hello", sc.conn.RemoteAddr()) |
| 323 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 324 | f, err := sc.framer.ReadFrame() // TODO: timeout |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 325 | if err != nil { |
| 326 | sc.logf("error reading initial frame from client: %v", err) |
| 327 | return |
| 328 | } |
| 329 | sf, ok := f.(*SettingsFrame) |
| 330 | if !ok { |
| 331 | sc.logf("invalid initial frame type %T received from client", f) |
| 332 | return |
| 333 | } |
| 334 | if err := sf.ForeachSetting(sc.processSetting); err != nil { |
| 335 | sc.logf("initial settings error: %v", err) |
| 336 | return |
| 337 | } |
| 338 | |
| 339 | // TODO: don't send two network packets for our SETTINGS + our |
| 340 | // ACK of their settings. But if we make framer write to a |
| 341 | // *bufio.Writer, that increases the per-connection memory |
| 342 | // overhead, and there could be many idle conns. So maybe some |
| 343 | // liveswitchWriter-like thing where we only switch to a |
| 344 | // *bufio Writer when we really need one temporarily, else go |
| 345 | // back to an unbuffered writes by default. |
| 346 | if err := sc.framer.WriteSettings( /* TODO: actual settings */ ); err != nil { |
| 347 | sc.logf("error writing server's initial settings: %v", err) |
| 348 | return |
| 349 | } |
| 350 | if err := sc.framer.WriteSettingsAck(); err != nil { |
| 351 | sc.logf("error writing server's ack of client's settings: %v", err) |
| 352 | return |
| 353 | } |
| 354 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 355 | go sc.readFrames() // closed by defer sc.conn.Close above |
| 356 | go sc.writeFrames() |
| 357 | defer close(sc.writeFrameCh) // shuts down writeFrames loop |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 358 | |
| 359 | for { |
| 360 | select { |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 361 | case wm := <-sc.wantWriteFrameCh: |
| 362 | sc.enqueueFrameWrite(wm) |
| 363 | case <-sc.wroteFrameCh: |
| 364 | sc.writingFrame = false |
| 365 | sc.scheduleFrameWrite() |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 366 | case fp, ok := <-sc.readFrameCh: |
| 367 | if !ok { |
| 368 | err := <-sc.readFrameErrCh |
| 369 | if err != io.EOF { |
| 370 | errstr := err.Error() |
| 371 | if !strings.Contains(errstr, "use of closed network connection") { |
| 372 | sc.logf("client %s stopped sending frames: %v", sc.conn.RemoteAddr(), errstr) |
| 373 | } |
| 374 | } |
| 375 | return |
| 376 | } |
| 377 | f := fp.f |
| 378 | sc.vlogf("got %v: %#v", f.Header(), f) |
| 379 | err := sc.processFrame(f) |
| 380 | fp.processed <- struct{}{} // let readFrames proceed |
| 381 | switch ev := err.(type) { |
| 382 | case nil: |
| 383 | // nothing. |
| 384 | case StreamError: |
| 385 | if err := sc.resetStreamInLoop(ev); err != nil { |
| 386 | sc.logf("Error writing RSTSTream: %v", err) |
| 387 | return |
| 388 | } |
| 389 | case ConnectionError: |
| 390 | sc.logf("Disconnecting; %v", ev) |
| 391 | return |
| 392 | case goAwayFlowError: |
| 393 | if err := sc.goAway(ErrCodeFlowControl); err != nil { |
| 394 | sc.condlogf(err, "failed to GOAWAY: %v", err) |
| 395 | return |
| 396 | } |
| 397 | default: |
| 398 | sc.logf("Disconnection due to other error: %v", err) |
| 399 | return |
| 400 | } |
| 401 | } |
| 402 | } |
| 403 | } |
| 404 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 405 | func (sc *serverConn) enqueueFrameWrite(wm frameWriteMsg) { |
| 406 | sc.serveG.check() |
| 407 | // Fast path for common case: |
| 408 | if !sc.writingFrame { |
| 409 | sc.writingFrame = true |
| 410 | sc.writeFrameCh <- wm |
| 411 | return |
| 412 | } |
| 413 | sc.writeQueue = append(sc.writeQueue, wm) // TODO: proper scheduler |
| 414 | } |
| 415 | |
| 416 | func (sc *serverConn) scheduleFrameWrite() { |
| 417 | sc.serveG.check() |
| 418 | if len(sc.writeQueue) == 0 { |
| 419 | // TODO: flush Framer's underlying buffered writer, once that's added |
| 420 | return |
| 421 | } |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 422 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 423 | // TODO: proper scheduler |
| 424 | wm := sc.writeQueue[0] |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 425 | // shift it all down. kinda lame. will be removed later anyway. |
| 426 | copy(sc.writeQueue, sc.writeQueue[1:]) |
| 427 | sc.writeQueue = sc.writeQueue[:len(sc.writeQueue)-1] |
| 428 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 429 | sc.writingFrame = true |
| 430 | sc.writeFrameCh <- wm |
| 431 | } |
| 432 | |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 433 | func (sc *serverConn) goAway(code ErrCode) error { |
| 434 | sc.serveG.check() |
| 435 | sc.sentGoAway = true |
| 436 | return sc.framer.WriteGoAway(sc.maxStreamID, code, nil) |
| 437 | } |
| 438 | |
| 439 | func (sc *serverConn) resetStreamInLoop(se StreamError) error { |
| 440 | sc.serveG.check() |
| 441 | if err := sc.framer.WriteRSTStream(se.streamID, uint32(se.code)); err != nil { |
| 442 | return err |
| 443 | } |
| 444 | delete(sc.streams, se.streamID) |
| 445 | return nil |
| 446 | } |
| 447 | |
| 448 | func (sc *serverConn) curHeaderStreamID() uint32 { |
| 449 | sc.serveG.check() |
| 450 | st := sc.req.stream |
| 451 | if st == nil { |
| 452 | return 0 |
| 453 | } |
| 454 | return st.id |
| 455 | } |
| 456 | |
| 457 | func (sc *serverConn) processFrame(f Frame) error { |
| 458 | sc.serveG.check() |
| 459 | |
| 460 | if s := sc.curHeaderStreamID(); s != 0 { |
| 461 | if cf, ok := f.(*ContinuationFrame); !ok { |
| 462 | return ConnectionError(ErrCodeProtocol) |
| 463 | } else if cf.Header().StreamID != s { |
| 464 | return ConnectionError(ErrCodeProtocol) |
| 465 | } |
| 466 | } |
| 467 | |
| 468 | switch f := f.(type) { |
| 469 | case *SettingsFrame: |
| 470 | return sc.processSettings(f) |
| 471 | case *HeadersFrame: |
| 472 | return sc.processHeaders(f) |
| 473 | case *ContinuationFrame: |
| 474 | return sc.processContinuation(f) |
| 475 | case *WindowUpdateFrame: |
| 476 | return sc.processWindowUpdate(f) |
| 477 | case *PingFrame: |
| 478 | return sc.processPing(f) |
| 479 | case *DataFrame: |
| 480 | return sc.processData(f) |
| 481 | default: |
| 482 | log.Printf("Ignoring unknown frame %#v", f) |
| 483 | return nil |
| 484 | } |
| 485 | } |
| 486 | |
| 487 | func (sc *serverConn) processPing(f *PingFrame) error { |
| 488 | sc.serveG.check() |
| 489 | if f.Flags.Has(FlagSettingsAck) { |
| 490 | // 6.7 PING: " An endpoint MUST NOT respond to PING frames |
| 491 | // containing this flag." |
| 492 | return nil |
| 493 | } |
| 494 | if f.StreamID != 0 { |
| 495 | // "PING frames are not associated with any individual |
| 496 | // stream. If a PING frame is received with a stream |
| 497 | // identifier field value other than 0x0, the recipient MUST |
| 498 | // respond with a connection error (Section 5.4.1) of type |
| 499 | // PROTOCOL_ERROR." |
| 500 | return ConnectionError(ErrCodeProtocol) |
| 501 | } |
| 502 | return sc.framer.WritePing(true, f.Data) |
| 503 | } |
| 504 | |
| 505 | func (sc *serverConn) processWindowUpdate(f *WindowUpdateFrame) error { |
| 506 | sc.serveG.check() |
| 507 | switch { |
| 508 | case f.StreamID != 0: // stream-level flow control |
| 509 | st := sc.streams[f.StreamID] |
| 510 | if st == nil { |
| 511 | // "WINDOW_UPDATE can be sent by a peer that has sent a |
| 512 | // frame bearing the END_STREAM flag. This means that a |
| 513 | // receiver could receive a WINDOW_UPDATE frame on a "half |
| 514 | // closed (remote)" or "closed" stream. A receiver MUST |
| 515 | // NOT treat this as an error, see Section 5.1." |
| 516 | return nil |
| 517 | } |
| 518 | if !st.flow.add(int32(f.Increment)) { |
| 519 | return StreamError{f.StreamID, ErrCodeFlowControl} |
| 520 | } |
| 521 | default: // connection-level flow control |
| 522 | if !sc.flow.add(int32(f.Increment)) { |
| 523 | return goAwayFlowError{} |
| 524 | } |
| 525 | } |
| 526 | return nil |
| 527 | } |
| 528 | |
| 529 | func (sc *serverConn) processSettings(f *SettingsFrame) error { |
| 530 | sc.serveG.check() |
| 531 | return f.ForeachSetting(sc.processSetting) |
| 532 | } |
| 533 | |
| 534 | func (sc *serverConn) processSetting(s Setting) error { |
| 535 | sc.serveG.check() |
| 536 | sc.vlogf("processing setting %v", s) |
| 537 | switch s.ID { |
| 538 | case SettingInitialWindowSize: |
| 539 | return sc.processSettingInitialWindowSize(s.Val) |
| 540 | } |
| 541 | log.Printf("TODO: handle %v", s) |
| 542 | return nil |
| 543 | } |
| 544 | |
| 545 | func (sc *serverConn) processSettingInitialWindowSize(val uint32) error { |
| 546 | sc.serveG.check() |
| 547 | if val > (1<<31 - 1) { |
| 548 | // 6.5.2 Defined SETTINGS Parameters |
| 549 | // "Values above the maximum flow control window size of |
| 550 | // 231-1 MUST be treated as a connection error (Section |
| 551 | // 5.4.1) of type FLOW_CONTROL_ERROR." |
| 552 | return ConnectionError(ErrCodeFlowControl) |
| 553 | } |
| 554 | |
| 555 | // "A SETTINGS frame can alter the initial flow control window |
| 556 | // size for all current streams. When the value of |
| 557 | // SETTINGS_INITIAL_WINDOW_SIZE changes, a receiver MUST |
| 558 | // adjust the size of all stream flow control windows that it |
| 559 | // maintains by the difference between the new value and the |
| 560 | // old value." |
| 561 | old := sc.initialWindowSize |
| 562 | sc.initialWindowSize = int32(val) |
| 563 | growth := sc.initialWindowSize - old // may be negative |
| 564 | for _, st := range sc.streams { |
| 565 | if !st.flow.add(growth) { |
| 566 | // 6.9.2 Initial Flow Control Window Size |
| 567 | // "An endpoint MUST treat a change to |
| 568 | // SETTINGS_INITIAL_WINDOW_SIZE that causes any flow |
| 569 | // control window to exceed the maximum size as a |
| 570 | // connection error (Section 5.4.1) of type |
| 571 | // FLOW_CONTROL_ERROR." |
| 572 | return ConnectionError(ErrCodeFlowControl) |
| 573 | } |
| 574 | } |
| 575 | return nil |
| 576 | } |
| 577 | |
| 578 | func (sc *serverConn) processData(f *DataFrame) error { |
| 579 | sc.serveG.check() |
| 580 | // "If a DATA frame is received whose stream is not in "open" |
| 581 | // or "half closed (local)" state, the recipient MUST respond |
| 582 | // with a stream error (Section 5.4.2) of type STREAM_CLOSED." |
| 583 | id := f.Header().StreamID |
| 584 | st, ok := sc.streams[id] |
| 585 | if !ok || (st.state != stateOpen && st.state != stateHalfClosedLocal) { |
| 586 | return StreamError{id, ErrCodeStreamClosed} |
| 587 | } |
| 588 | if st.body == nil { |
| 589 | // Not expecting data. |
| 590 | // TODO: which error code? |
| 591 | return StreamError{id, ErrCodeStreamClosed} |
| 592 | } |
| 593 | data := f.Data() |
| 594 | |
| 595 | // Sender sending more than they'd declared? |
| 596 | if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { |
| 597 | st.body.Close(fmt.Errorf("Sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) |
| 598 | return StreamError{id, ErrCodeStreamClosed} |
| 599 | } |
| 600 | if len(data) > 0 { |
| 601 | // TODO: verify they're allowed to write with the flow control |
| 602 | // window we'd advertised to them. |
| 603 | // TODO: verify n from Write |
| 604 | if _, err := st.body.Write(data); err != nil { |
| 605 | return StreamError{id, ErrCodeStreamClosed} |
| 606 | } |
| 607 | st.bodyBytes += int64(len(data)) |
| 608 | } |
| 609 | if f.Header().Flags.Has(FlagDataEndStream) { |
| 610 | if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes { |
| 611 | st.body.Close(fmt.Errorf("Request declared a Content-Length of %d but only wrote %d bytes", |
| 612 | st.declBodyBytes, st.bodyBytes)) |
| 613 | } else { |
| 614 | st.body.Close(io.EOF) |
| 615 | } |
| 616 | } |
| 617 | return nil |
| 618 | } |
| 619 | |
| 620 | func (sc *serverConn) processHeaders(f *HeadersFrame) error { |
| 621 | sc.serveG.check() |
| 622 | id := f.Header().StreamID |
| 623 | if sc.sentGoAway { |
| 624 | // Ignore. |
| 625 | return nil |
| 626 | } |
| 627 | // http://http2.github.io/http2-spec/#rfc.section.5.1.1 |
| 628 | if id%2 != 1 || id <= sc.maxStreamID || sc.req.stream != nil { |
| 629 | // Streams initiated by a client MUST use odd-numbered |
| 630 | // stream identifiers. [...] The identifier of a newly |
| 631 | // established stream MUST be numerically greater than all |
| 632 | // streams that the initiating endpoint has opened or |
| 633 | // reserved. [...] An endpoint that receives an unexpected |
| 634 | // stream identifier MUST respond with a connection error |
| 635 | // (Section 5.4.1) of type PROTOCOL_ERROR. |
| 636 | return ConnectionError(ErrCodeProtocol) |
| 637 | } |
| 638 | if id > sc.maxStreamID { |
| 639 | sc.maxStreamID = id |
| 640 | } |
| 641 | st := &stream{ |
| 642 | id: id, |
| 643 | state: stateOpen, |
| 644 | flow: newFlow(sc.initialWindowSize), |
| 645 | } |
| 646 | if f.Header().Flags.Has(FlagHeadersEndStream) { |
| 647 | st.state = stateHalfClosedRemote |
| 648 | } |
| 649 | sc.streams[id] = st |
| 650 | sc.req = requestParam{ |
| 651 | stream: st, |
| 652 | header: make(http.Header), |
| 653 | } |
| 654 | return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded()) |
| 655 | } |
| 656 | |
| 657 | func (sc *serverConn) processContinuation(f *ContinuationFrame) error { |
| 658 | sc.serveG.check() |
| 659 | st := sc.streams[f.Header().StreamID] |
| 660 | if st == nil || sc.curHeaderStreamID() != st.id { |
| 661 | return ConnectionError(ErrCodeProtocol) |
| 662 | } |
| 663 | return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded()) |
| 664 | } |
| 665 | |
| 666 | func (sc *serverConn) processHeaderBlockFragment(st *stream, frag []byte, end bool) error { |
| 667 | sc.serveG.check() |
| 668 | if _, err := sc.hpackDecoder.Write(frag); err != nil { |
| 669 | // TODO: convert to stream error I assume? |
| 670 | return err |
| 671 | } |
| 672 | if !end { |
| 673 | return nil |
| 674 | } |
| 675 | if err := sc.hpackDecoder.Close(); err != nil { |
| 676 | // TODO: convert to stream error I assume? |
| 677 | return err |
| 678 | } |
| 679 | rw, req, err := sc.newWriterAndRequest() |
| 680 | sc.req = requestParam{} |
| 681 | if err != nil { |
| 682 | return err |
| 683 | } |
| 684 | st.body = req.Body.(*requestBody).pipe // may be nil |
| 685 | st.declBodyBytes = req.ContentLength |
| 686 | go sc.runHandler(rw, req) |
| 687 | return nil |
| 688 | } |
| 689 | |
| 690 | func (sc *serverConn) newWriterAndRequest() (*responseWriter, *http.Request, error) { |
| 691 | sc.serveG.check() |
| 692 | rp := &sc.req |
| 693 | if rp.invalidHeader || rp.method == "" || rp.path == "" || |
| 694 | (rp.scheme != "https" && rp.scheme != "http") { |
| 695 | // See 8.1.2.6 Malformed Requests and Responses: |
| 696 | // |
| 697 | // Malformed requests or responses that are detected |
| 698 | // MUST be treated as a stream error (Section 5.4.2) |
| 699 | // of type PROTOCOL_ERROR." |
| 700 | // |
| 701 | // 8.1.2.3 Request Pseudo-Header Fields |
| 702 | // "All HTTP/2 requests MUST include exactly one valid |
| 703 | // value for the :method, :scheme, and :path |
| 704 | // pseudo-header fields" |
| 705 | return nil, nil, StreamError{rp.stream.id, ErrCodeProtocol} |
| 706 | } |
| 707 | var tlsState *tls.ConnectionState // make this non-nil if https |
| 708 | if rp.scheme == "https" { |
| 709 | // TODO: get from sc's ConnectionState |
| 710 | tlsState = &tls.ConnectionState{} |
| 711 | } |
| 712 | authority := rp.authority |
| 713 | if authority == "" { |
| 714 | authority = rp.header.Get("Host") |
| 715 | } |
| 716 | bodyOpen := rp.stream.state == stateOpen |
| 717 | body := &requestBody{ |
| 718 | sc: sc, |
| 719 | streamID: rp.stream.id, |
| 720 | } |
Brad Fitzpatrick | ff0471b | 2014-11-14 21:55:14 -0800 | [diff] [blame^] | 721 | url, err := url.ParseRequestURI(rp.path) |
| 722 | if err != nil { |
| 723 | // TODO: find the right error code? |
| 724 | return nil, nil, StreamError{rp.stream.id, ErrCodeProtocol} |
| 725 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 726 | req := &http.Request{ |
| 727 | Method: rp.method, |
Brad Fitzpatrick | ff0471b | 2014-11-14 21:55:14 -0800 | [diff] [blame^] | 728 | URL: url, |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 729 | RemoteAddr: sc.conn.RemoteAddr().String(), |
| 730 | Header: rp.header, |
| 731 | RequestURI: rp.path, |
| 732 | Proto: "HTTP/2.0", |
| 733 | ProtoMajor: 2, |
| 734 | ProtoMinor: 0, |
| 735 | TLS: tlsState, |
| 736 | Host: authority, |
| 737 | Body: body, |
| 738 | } |
| 739 | if bodyOpen { |
| 740 | body.pipe = &pipe{ |
| 741 | b: buffer{buf: make([]byte, 65536)}, // TODO: share/remove |
| 742 | } |
| 743 | body.pipe.c.L = &body.pipe.m |
| 744 | |
| 745 | if vv, ok := rp.header["Content-Length"]; ok { |
| 746 | req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) |
| 747 | } else { |
| 748 | req.ContentLength = -1 |
| 749 | } |
| 750 | } |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 751 | |
| 752 | rws := responseWriterStatePool.Get().(*responseWriterState) |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 753 | bwSave := rws.bw |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 754 | *rws = responseWriterState{} // zero all the fields |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 755 | rws.bw = bwSave |
| 756 | rws.bw.Reset(chunkWriter{rws}) |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 757 | rws.sc = sc |
| 758 | rws.streamID = rp.stream.id |
| 759 | rws.req = req |
| 760 | rws.body = body |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 761 | rws.chunkWrittenCh = make(chan error, 1) |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 762 | |
| 763 | rw := &responseWriter{rws: rws} |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 764 | return rw, req, nil |
| 765 | } |
| 766 | |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 767 | const handlerChunkWriteSize = 4 << 10 |
| 768 | |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 769 | var responseWriterStatePool = sync.Pool{ |
| 770 | New: func() interface{} { |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 771 | rws := &responseWriterState{} |
| 772 | rws.bw = bufio.NewWriterSize(chunkWriter{rws}, handlerChunkWriteSize) |
| 773 | return rws |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 774 | }, |
| 775 | } |
| 776 | |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 777 | // Run on its own goroutine. |
| 778 | func (sc *serverConn) runHandler(rw *responseWriter, req *http.Request) { |
| 779 | defer rw.handlerDone() |
| 780 | // TODO: catch panics like net/http.Server |
| 781 | sc.handler.ServeHTTP(rw, req) |
| 782 | } |
| 783 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 784 | type frameWriteMsg struct { |
| 785 | // write runs on the writeFrames goroutine. |
| 786 | write func(sc *serverConn, v interface{}) error |
| 787 | |
| 788 | v interface{} // passed to write |
| 789 | cost uint32 // number of flow control bytes required |
| 790 | streamID uint32 // used for prioritization |
| 791 | |
| 792 | // done, if non-nil, must be a buffered channel with space for |
| 793 | // 1 message and is sent the return value from write (or an |
| 794 | // earlier error) when the frame has been written. |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 795 | done chan error |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 796 | } |
| 797 | |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 798 | // headerWriteReq is a request to write an HTTP response header from a server Handler. |
| 799 | type headerWriteReq struct { |
| 800 | streamID uint32 |
| 801 | httpResCode int |
| 802 | h http.Header // may be nil |
| 803 | endStream bool |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 804 | |
| 805 | contentType string |
| 806 | contentLength string |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 807 | } |
| 808 | |
| 809 | // called from handler goroutines. |
| 810 | // h may be nil. |
| 811 | func (sc *serverConn) writeHeader(req headerWriteReq) { |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 812 | var errc chan error |
| 813 | if req.h != nil { |
| 814 | // If there's a header map (which we don't own), so we have to block on |
| 815 | // waiting for this frame to be written, so an http.Flush mid-handler |
| 816 | // writes out the correct value of keys, before a handler later potentially |
| 817 | // mutates it. |
| 818 | errc = make(chan error, 1) |
| 819 | } |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 820 | sc.wantWriteFrameCh <- frameWriteMsg{ |
| 821 | write: (*serverConn).writeHeaderInLoop, |
| 822 | v: req, |
| 823 | streamID: req.streamID, |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 824 | done: errc, |
| 825 | } |
| 826 | if errc != nil { |
| 827 | <-errc |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 828 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 829 | } |
| 830 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 831 | func (sc *serverConn) writeHeaderInLoop(v interface{}) error { |
| 832 | sc.writeG.check() |
| 833 | req := v.(headerWriteReq) |
| 834 | |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 835 | sc.headerWriteBuf.Reset() |
| 836 | sc.hpackEncoder.WriteField(hpack.HeaderField{Name: ":status", Value: httpCodeString(req.httpResCode)}) |
| 837 | for k, vv := range req.h { |
| 838 | for _, v := range vv { |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 839 | // TODO: more of "8.1.2.2 Connection-Specific Header Fields" |
| 840 | if k == "Transfer-Encoding" && v != "trailers" { |
| 841 | continue |
| 842 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 843 | // TODO: for gargage, cache lowercase copies of headers at |
| 844 | // least for common ones and/or popular recent ones for |
| 845 | // this serverConn. LRU? |
| 846 | sc.hpackEncoder.WriteField(hpack.HeaderField{Name: strings.ToLower(k), Value: v}) |
| 847 | } |
| 848 | } |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 849 | if req.contentType != "" { |
| 850 | sc.hpackEncoder.WriteField(hpack.HeaderField{Name: "content-type", Value: req.contentType}) |
| 851 | } |
| 852 | if req.contentLength != "" { |
| 853 | sc.hpackEncoder.WriteField(hpack.HeaderField{Name: "content-length", Value: req.contentLength}) |
| 854 | } |
| 855 | |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 856 | headerBlock := sc.headerWriteBuf.Bytes() |
| 857 | if len(headerBlock) > int(sc.maxWriteFrameSize) { |
| 858 | // we'll need continuation ones. |
| 859 | panic("TODO") |
| 860 | } |
| 861 | return sc.framer.WriteHeaders(HeadersFrameParam{ |
| 862 | StreamID: req.streamID, |
| 863 | BlockFragment: headerBlock, |
| 864 | EndStream: req.endStream, |
| 865 | EndHeaders: true, // no continuation yet |
| 866 | }) |
| 867 | } |
| 868 | |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 869 | func (sc *serverConn) writeDataInLoop(v interface{}) error { |
| 870 | sc.writeG.check() |
| 871 | rws := v.(*responseWriterState) |
| 872 | return sc.framer.WriteData(rws.streamID, rws.curChunkIsFinal, rws.curChunk) |
| 873 | } |
| 874 | |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 875 | type windowUpdateReq struct { |
| 876 | streamID uint32 |
| 877 | n uint32 |
| 878 | } |
| 879 | |
| 880 | // called from handler goroutines |
| 881 | func (sc *serverConn) sendWindowUpdate(streamID uint32, n int) { |
| 882 | const maxUint32 = 2147483647 |
| 883 | for n >= maxUint32 { |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 884 | sc.wantWriteFrameCh <- frameWriteMsg{ |
| 885 | write: (*serverConn).sendWindowUpdateInLoop, |
| 886 | v: windowUpdateReq{streamID, maxUint32}, |
| 887 | streamID: streamID, |
| 888 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 889 | n -= maxUint32 |
| 890 | } |
| 891 | if n > 0 { |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 892 | sc.wantWriteFrameCh <- frameWriteMsg{ |
| 893 | write: (*serverConn).sendWindowUpdateInLoop, |
| 894 | v: windowUpdateReq{streamID, uint32(n)}, |
| 895 | streamID: streamID, |
| 896 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 897 | } |
| 898 | } |
| 899 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 900 | func (sc *serverConn) sendWindowUpdateInLoop(v interface{}) error { |
| 901 | sc.writeG.check() |
| 902 | wu := v.(windowUpdateReq) |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 903 | if err := sc.framer.WriteWindowUpdate(0, wu.n); err != nil { |
| 904 | return err |
| 905 | } |
| 906 | if err := sc.framer.WriteWindowUpdate(wu.streamID, wu.n); err != nil { |
| 907 | return err |
| 908 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 909 | return nil |
| 910 | } |
| 911 | |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 912 | type requestBody struct { |
| 913 | sc *serverConn |
| 914 | streamID uint32 |
| 915 | closed bool |
| 916 | pipe *pipe // non-nil if we have a HTTP entity message body |
| 917 | } |
| 918 | |
| 919 | var errClosedBody = errors.New("body closed by handler") |
| 920 | |
| 921 | func (b *requestBody) Close() error { |
| 922 | if b.pipe != nil { |
| 923 | b.pipe.Close(errClosedBody) |
| 924 | } |
| 925 | b.closed = true |
| 926 | return nil |
| 927 | } |
| 928 | |
| 929 | func (b *requestBody) Read(p []byte) (n int, err error) { |
| 930 | if b.pipe == nil { |
| 931 | return 0, io.EOF |
| 932 | } |
| 933 | n, err = b.pipe.Read(p) |
| 934 | if n > 0 { |
| 935 | b.sc.sendWindowUpdate(b.streamID, n) |
| 936 | // TODO: tell b.sc to send back 'n' flow control quota credits to the sender |
| 937 | } |
| 938 | return |
| 939 | } |
| 940 | |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 941 | // responseWriter is the http.ResponseWriter implementation. It's |
| 942 | // intentionally small (1 pointer wide) to minimize garbage. The |
| 943 | // responseWriterState pointer inside is zeroed at the end of a |
| 944 | // request (in handlerDone) and calls on the responseWriter thereafter |
| 945 | // simply crash (caller's mistake), but the much larger responseWriterState |
| 946 | // and buffers are reused between multiple requests. |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 947 | type responseWriter struct { |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 948 | rws *responseWriterState |
| 949 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 950 | |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 951 | // Optional http.ResponseWriter interfaces implemented. |
| 952 | var ( |
| 953 | _ http.Flusher = (*responseWriter)(nil) |
| 954 | _ stringWriter = (*responseWriter)(nil) |
| 955 | // TODO: hijacker for websockets? |
| 956 | ) |
| 957 | |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 958 | type responseWriterState struct { |
| 959 | // immutable within a request: |
| 960 | sc *serverConn |
| 961 | streamID uint32 |
| 962 | req *http.Request |
| 963 | body *requestBody // to close at end of request, if DATA frames didn't |
| 964 | |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 965 | // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 966 | bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState} |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 967 | |
| 968 | // mutated by http.Handler goroutine: |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 969 | handlerHeader http.Header // nil until called |
| 970 | snapHeader http.Header // snapshot of handlerHeader at WriteHeader time |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 971 | wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet. |
| 972 | status int // status code passed to WriteHeader |
| 973 | wroteContinue bool // 100 Continue response was written |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 974 | sentHeader bool // have we sent the header frame? |
| 975 | handlerDone bool // handler has finished |
| 976 | |
| 977 | curChunk []byte // current chunk we're writing |
| 978 | curChunkIsFinal bool |
| 979 | chunkWrittenCh chan error |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 980 | } |
| 981 | |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 982 | type chunkWriter struct{ rws *responseWriterState } |
| 983 | |
| 984 | // chunkWriter.Write is called from bufio.Writer. Because bufio.Writer passes through large |
| 985 | // writes, we break them up here if they're too big. |
| 986 | func (cw chunkWriter) Write(p []byte) (n int, err error) { |
| 987 | for len(p) > 0 { |
| 988 | chunk := p |
| 989 | if len(chunk) > handlerChunkWriteSize { |
| 990 | chunk = chunk[:handlerChunkWriteSize] |
| 991 | } |
| 992 | _, err = cw.rws.writeChunk(chunk) |
| 993 | if err != nil { |
| 994 | return |
| 995 | } |
| 996 | n += len(chunk) |
| 997 | p = p[len(chunk):] |
| 998 | } |
| 999 | return n, nil |
| 1000 | } |
| 1001 | |
| 1002 | // writeChunk writes small (max 4k, or handlerChunkWriteSize) chunks. |
| 1003 | // It's also responsible for sending the HEADER response. |
| 1004 | func (rws *responseWriterState) writeChunk(p []byte) (n int, err error) { |
| 1005 | if !rws.wroteHeader { |
| 1006 | rws.writeHeader(200) |
| 1007 | } |
| 1008 | if !rws.sentHeader { |
| 1009 | rws.sentHeader = true |
| 1010 | var ctype, clen string // implicit ones, if we can calculate it |
| 1011 | if rws.handlerDone && rws.snapHeader.Get("Content-Length") == "" { |
| 1012 | clen = strconv.Itoa(len(p)) |
| 1013 | } |
| 1014 | if rws.snapHeader.Get("Content-Type") == "" { |
| 1015 | ctype = http.DetectContentType(p) |
| 1016 | } |
| 1017 | rws.sc.writeHeader(headerWriteReq{ |
| 1018 | streamID: rws.streamID, |
| 1019 | httpResCode: rws.status, |
| 1020 | h: rws.snapHeader, |
| 1021 | endStream: rws.handlerDone && len(p) == 0, |
| 1022 | contentType: ctype, |
| 1023 | contentLength: clen, |
| 1024 | }) |
| 1025 | } |
| 1026 | if len(p) == 0 && !rws.handlerDone { |
| 1027 | return |
| 1028 | } |
| 1029 | rws.curChunk = p |
| 1030 | rws.curChunkIsFinal = rws.handlerDone |
| 1031 | |
| 1032 | // TODO: await flow control tokens for both stream and conn |
| 1033 | rws.sc.wantWriteFrameCh <- frameWriteMsg{ |
| 1034 | cost: uint32(len(p)), |
| 1035 | streamID: rws.streamID, |
| 1036 | write: (*serverConn).writeDataInLoop, |
| 1037 | done: rws.chunkWrittenCh, |
| 1038 | v: rws, // writeDataInLoop uses only rws.curChunk and rws.curChunkIsFinal |
| 1039 | } |
| 1040 | err = <-rws.chunkWrittenCh // block until it's written |
| 1041 | return len(p), err |
| 1042 | } |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 1043 | |
| 1044 | func (w *responseWriter) Flush() { |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 1045 | rws := w.rws |
| 1046 | if rws == nil { |
| 1047 | panic("Header called after Handler finished") |
| 1048 | } |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 1049 | if rws.bw.Buffered() > 0 { |
| 1050 | if err := rws.bw.Flush(); err != nil { |
| 1051 | // Ignore the error. The frame writer already knows. |
| 1052 | return |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 1053 | } |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 1054 | } else { |
| 1055 | // The bufio.Writer won't call chunkWriter.Write |
| 1056 | // (writeChunk with zero bytes, so we have to do it |
| 1057 | // ourselves to force the HTTP response header and/or |
| 1058 | // final DATA frame (with END_STREAM) to be sent. |
| 1059 | rws.writeChunk(nil) |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 1060 | } |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 1061 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1062 | |
| 1063 | func (w *responseWriter) Header() http.Header { |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 1064 | rws := w.rws |
| 1065 | if rws == nil { |
| 1066 | panic("Header called after Handler finished") |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1067 | } |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 1068 | if rws.handlerHeader == nil { |
| 1069 | rws.handlerHeader = make(http.Header) |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 1070 | } |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 1071 | return rws.handlerHeader |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1072 | } |
| 1073 | |
| 1074 | func (w *responseWriter) WriteHeader(code int) { |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 1075 | rws := w.rws |
| 1076 | if rws == nil { |
| 1077 | panic("WriteHeader called after Handler finished") |
| 1078 | } |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 1079 | rws.writeHeader(code) |
| 1080 | } |
| 1081 | |
| 1082 | func (rws *responseWriterState) writeHeader(code int) { |
| 1083 | if !rws.wroteHeader { |
| 1084 | rws.wroteHeader = true |
| 1085 | rws.status = code |
| 1086 | if len(rws.handlerHeader) > 0 { |
| 1087 | rws.snapHeader = cloneHeader(rws.handlerHeader) |
| 1088 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1089 | } |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 1090 | } |
| 1091 | |
| 1092 | func cloneHeader(h http.Header) http.Header { |
| 1093 | h2 := make(http.Header, len(h)) |
| 1094 | for k, vv := range h { |
| 1095 | vv2 := make([]string, len(vv)) |
| 1096 | copy(vv2, vv) |
| 1097 | h2[k] = vv2 |
| 1098 | } |
| 1099 | return h2 |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1100 | } |
| 1101 | |
Brad Fitzpatrick | 15a4bf3 | 2014-11-14 10:56:12 -0800 | [diff] [blame] | 1102 | // The Life Of A Write is like this: |
| 1103 | // |
| 1104 | // TODO: copy/adapt the similar comment from Go's http server.go |
| 1105 | func (w *responseWriter) Write(p []byte) (n int, err error) { |
| 1106 | return w.write(len(p), p, "") |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 1107 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1108 | |
Brad Fitzpatrick | 15a4bf3 | 2014-11-14 10:56:12 -0800 | [diff] [blame] | 1109 | func (w *responseWriter) WriteString(s string) (n int, err error) { |
| 1110 | return w.write(len(s), nil, s) |
| 1111 | } |
| 1112 | |
| 1113 | // either dataB or dataS is non-zero. |
| 1114 | func (w *responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) { |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 1115 | rws := w.rws |
| 1116 | if rws == nil { |
| 1117 | panic("Write called after Handler finished") |
| 1118 | } |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 1119 | if !rws.wroteHeader { |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1120 | w.WriteHeader(200) |
| 1121 | } |
Brad Fitzpatrick | 15a4bf3 | 2014-11-14 10:56:12 -0800 | [diff] [blame] | 1122 | if dataB != nil { |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 1123 | return rws.bw.Write(dataB) |
Brad Fitzpatrick | 15a4bf3 | 2014-11-14 10:56:12 -0800 | [diff] [blame] | 1124 | } else { |
Brad Fitzpatrick | 390047e | 2014-11-14 20:37:08 -0800 | [diff] [blame] | 1125 | return rws.bw.WriteString(dataS) |
Brad Fitzpatrick | 15a4bf3 | 2014-11-14 10:56:12 -0800 | [diff] [blame] | 1126 | } |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1127 | } |
| 1128 | |
| 1129 | func (w *responseWriter) handlerDone() { |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 1130 | rws := w.rws |
| 1131 | if rws == nil { |
| 1132 | panic("handlerDone called twice") |
| 1133 | } |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 1134 | rws.handlerDone = true |
| 1135 | w.Flush() |
| 1136 | |
Brad Fitzpatrick | 729bd72 | 2014-11-13 14:09:36 -0800 | [diff] [blame] | 1137 | w.rws = nil |
Brad Fitzpatrick | 520123b | 2014-11-14 15:57:37 -0800 | [diff] [blame] | 1138 | responseWriterStatePool.Put(rws) |
Brad Fitzpatrick | b331b81 | 2014-11-13 11:51:54 -0800 | [diff] [blame] | 1139 | } |