| // This file is autogenerated using x/tools/cmd/bundle from |
| // https://go-review.googlesource.com/#/c/15850/ |
| // Usage: |
| // $ bundle golang.org/x/net/http2 net/http http2 > /tmp/x.go; mv /tmp/x.go $GOROOT/src/net/http/h2_bundle.go |
| |
| // Copyright 2015 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package http |
| |
| import ( |
| "bufio" |
| "bytes" |
| "crypto/tls" |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "io" |
| "log" |
| "net" |
| "net/url" |
| "os" |
| "runtime" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| "golang.org/x/net/http2/hpack" |
| ) |
| |
| // buffer is an io.ReadWriteCloser backed by a fixed size buffer. |
| // It never allocates, but moves old data as new data is written. |
| type http2buffer struct { |
| buf []byte |
| r, w int |
| closed bool |
| err error // err to return to reader |
| } |
| |
| var ( |
| http2errReadEmpty = errors.New("read from empty buffer") |
| http2errWriteClosed = errors.New("write on closed buffer") |
| http2errWriteFull = errors.New("write on full buffer") |
| ) |
| |
| // Read copies bytes from the buffer into p. |
| // It is an error to read when no data is available. |
| func (b *http2buffer) Read(p []byte) (n int, err error) { |
| n = copy(p, b.buf[b.r:b.w]) |
| b.r += n |
| if b.closed && b.r == b.w { |
| err = b.err |
| } else if b.r == b.w && n == 0 { |
| err = http2errReadEmpty |
| } |
| return n, err |
| } |
| |
| // Len returns the number of bytes of the unread portion of the buffer. |
| func (b *http2buffer) Len() int { |
| return b.w - b.r |
| } |
| |
| // Write copies bytes from p into the buffer. |
| // It is an error to write more data than the buffer can hold. |
| func (b *http2buffer) Write(p []byte) (n int, err error) { |
| if b.closed { |
| return 0, http2errWriteClosed |
| } |
| |
| if b.r > 0 && len(p) > len(b.buf)-b.w { |
| copy(b.buf, b.buf[b.r:b.w]) |
| b.w -= b.r |
| b.r = 0 |
| } |
| |
| n = copy(b.buf[b.w:], p) |
| b.w += n |
| if n < len(p) { |
| err = http2errWriteFull |
| } |
| return n, err |
| } |
| |
| // Close marks the buffer as closed. Future calls to Write will |
| // return an error. Future calls to Read, once the buffer is |
| // empty, will return err. |
| func (b *http2buffer) Close(err error) { |
| if !b.closed { |
| b.closed = true |
| b.err = err |
| } |
| } |
| |
| // An ErrCode is an unsigned 32-bit error code as defined in the HTTP/2 spec. |
| type http2ErrCode uint32 |
| |
| const ( |
| http2ErrCodeNo http2ErrCode = 0x0 |
| http2ErrCodeProtocol http2ErrCode = 0x1 |
| http2ErrCodeInternal http2ErrCode = 0x2 |
| http2ErrCodeFlowControl http2ErrCode = 0x3 |
| http2ErrCodeSettingsTimeout http2ErrCode = 0x4 |
| http2ErrCodeStreamClosed http2ErrCode = 0x5 |
| http2ErrCodeFrameSize http2ErrCode = 0x6 |
| http2ErrCodeRefusedStream http2ErrCode = 0x7 |
| http2ErrCodeCancel http2ErrCode = 0x8 |
| http2ErrCodeCompression http2ErrCode = 0x9 |
| http2ErrCodeConnect http2ErrCode = 0xa |
| http2ErrCodeEnhanceYourCalm http2ErrCode = 0xb |
| http2ErrCodeInadequateSecurity http2ErrCode = 0xc |
| http2ErrCodeHTTP11Required http2ErrCode = 0xd |
| ) |
| |
| var http2errCodeName = map[http2ErrCode]string{ |
| http2ErrCodeNo: "NO_ERROR", |
| http2ErrCodeProtocol: "PROTOCOL_ERROR", |
| http2ErrCodeInternal: "INTERNAL_ERROR", |
| http2ErrCodeFlowControl: "FLOW_CONTROL_ERROR", |
| http2ErrCodeSettingsTimeout: "SETTINGS_TIMEOUT", |
| http2ErrCodeStreamClosed: "STREAM_CLOSED", |
| http2ErrCodeFrameSize: "FRAME_SIZE_ERROR", |
| http2ErrCodeRefusedStream: "REFUSED_STREAM", |
| http2ErrCodeCancel: "CANCEL", |
| http2ErrCodeCompression: "COMPRESSION_ERROR", |
| http2ErrCodeConnect: "CONNECT_ERROR", |
| http2ErrCodeEnhanceYourCalm: "ENHANCE_YOUR_CALM", |
| http2ErrCodeInadequateSecurity: "INADEQUATE_SECURITY", |
| http2ErrCodeHTTP11Required: "HTTP_1_1_REQUIRED", |
| } |
| |
| func (e http2ErrCode) String() string { |
| if s, ok := http2errCodeName[e]; ok { |
| return s |
| } |
| return fmt.Sprintf("unknown error code 0x%x", uint32(e)) |
| } |
| |
| // ConnectionError is an error that results in the termination of the |
| // entire connection. |
| type http2ConnectionError http2ErrCode |
| |
| func (e http2ConnectionError) Error() string { |
| return fmt.Sprintf("connection error: %s", http2ErrCode(e)) |
| } |
| |
| // StreamError is an error that only affects one stream within an |
| // HTTP/2 connection. |
| type http2StreamError struct { |
| StreamID uint32 |
| Code http2ErrCode |
| } |
| |
| func (e http2StreamError) Error() string { |
| return fmt.Sprintf("stream error: stream ID %d; %v", e.StreamID, e.Code) |
| } |
| |
| // 6.9.1 The Flow Control Window |
| // "If a sender receives a WINDOW_UPDATE that causes a flow control |
| // window to exceed this maximum it MUST terminate either the stream |
| // or the connection, as appropriate. For streams, [...]; for the |
| // connection, a GOAWAY frame with a FLOW_CONTROL_ERROR code." |
| type http2goAwayFlowError struct{} |
| |
| func (http2goAwayFlowError) Error() string { return "connection exceeded flow control window size" } |
| |
| // flow is the flow control window's size. |
| type http2flow struct { |
| // n is the number of DATA bytes we're allowed to send. |
| // A flow is kept both on a conn and a per-stream. |
| n int32 |
| |
| // conn points to the shared connection-level flow that is |
| // shared by all streams on that conn. It is nil for the flow |
| // that's on the conn directly. |
| conn *http2flow |
| } |
| |
| func (f *http2flow) setConnFlow(cf *http2flow) { f.conn = cf } |
| |
| func (f *http2flow) available() int32 { |
| n := f.n |
| if f.conn != nil && f.conn.n < n { |
| n = f.conn.n |
| } |
| return n |
| } |
| |
| func (f *http2flow) take(n int32) { |
| if n > f.available() { |
| panic("internal error: took too much") |
| } |
| f.n -= n |
| if f.conn != nil { |
| f.conn.n -= n |
| } |
| } |
| |
| // add adds n bytes (positive or negative) to the flow control window. |
| // It returns false if the sum would exceed 2^31-1. |
| func (f *http2flow) add(n int32) bool { |
| remain := (1<<31 - 1) - f.n |
| if n > remain { |
| return false |
| } |
| f.n += n |
| return true |
| } |
| |
| const http2frameHeaderLen = 9 |
| |
| var http2padZeros = make([]byte, 255) // zeros for padding |
| |
| // A FrameType is a registered frame type as defined in |
| // http://http2.github.io/http2-spec/#rfc.section.11.2 |
| type http2FrameType uint8 |
| |
| const ( |
| http2FrameData http2FrameType = 0x0 |
| http2FrameHeaders http2FrameType = 0x1 |
| http2FramePriority http2FrameType = 0x2 |
| http2FrameRSTStream http2FrameType = 0x3 |
| http2FrameSettings http2FrameType = 0x4 |
| http2FramePushPromise http2FrameType = 0x5 |
| http2FramePing http2FrameType = 0x6 |
| http2FrameGoAway http2FrameType = 0x7 |
| http2FrameWindowUpdate http2FrameType = 0x8 |
| http2FrameContinuation http2FrameType = 0x9 |
| ) |
| |
| var http2frameName = map[http2FrameType]string{ |
| http2FrameData: "DATA", |
| http2FrameHeaders: "HEADERS", |
| http2FramePriority: "PRIORITY", |
| http2FrameRSTStream: "RST_STREAM", |
| http2FrameSettings: "SETTINGS", |
| http2FramePushPromise: "PUSH_PROMISE", |
| http2FramePing: "PING", |
| http2FrameGoAway: "GOAWAY", |
| http2FrameWindowUpdate: "WINDOW_UPDATE", |
| http2FrameContinuation: "CONTINUATION", |
| } |
| |
| func (t http2FrameType) String() string { |
| if s, ok := http2frameName[t]; ok { |
| return s |
| } |
| return fmt.Sprintf("UNKNOWN_FRAME_TYPE_%d", uint8(t)) |
| } |
| |
| // Flags is a bitmask of HTTP/2 flags. |
| // The meaning of flags varies depending on the frame type. |
| type http2Flags uint8 |
| |
| // Has reports whether f contains all (0 or more) flags in v. |
| func (f http2Flags) Has(v http2Flags) bool { |
| return (f & v) == v |
| } |
| |
| // Frame-specific FrameHeader flag bits. |
| const ( |
| // Data Frame |
| http2FlagDataEndStream http2Flags = 0x1 |
| http2FlagDataPadded http2Flags = 0x8 |
| |
| // Headers Frame |
| http2FlagHeadersEndStream http2Flags = 0x1 |
| http2FlagHeadersEndHeaders http2Flags = 0x4 |
| http2FlagHeadersPadded http2Flags = 0x8 |
| http2FlagHeadersPriority http2Flags = 0x20 |
| |
| // Settings Frame |
| http2FlagSettingsAck http2Flags = 0x1 |
| |
| // Ping Frame |
| http2FlagPingAck http2Flags = 0x1 |
| |
| // Continuation Frame |
| http2FlagContinuationEndHeaders http2Flags = 0x4 |
| |
| http2FlagPushPromiseEndHeaders http2Flags = 0x4 |
| http2FlagPushPromisePadded http2Flags = 0x8 |
| ) |
| |
| var http2flagName = map[http2FrameType]map[http2Flags]string{ |
| http2FrameData: { |
| http2FlagDataEndStream: "END_STREAM", |
| http2FlagDataPadded: "PADDED", |
| }, |
| http2FrameHeaders: { |
| http2FlagHeadersEndStream: "END_STREAM", |
| http2FlagHeadersEndHeaders: "END_HEADERS", |
| http2FlagHeadersPadded: "PADDED", |
| http2FlagHeadersPriority: "PRIORITY", |
| }, |
| http2FrameSettings: { |
| http2FlagSettingsAck: "ACK", |
| }, |
| http2FramePing: { |
| http2FlagPingAck: "ACK", |
| }, |
| http2FrameContinuation: { |
| http2FlagContinuationEndHeaders: "END_HEADERS", |
| }, |
| http2FramePushPromise: { |
| http2FlagPushPromiseEndHeaders: "END_HEADERS", |
| http2FlagPushPromisePadded: "PADDED", |
| }, |
| } |
| |
| // a frameParser parses a frame given its FrameHeader and payload |
| // bytes. The length of payload will always equal fh.Length (which |
| // might be 0). |
| type http2frameParser func(fh http2FrameHeader, payload []byte) (http2Frame, error) |
| |
| var http2frameParsers = map[http2FrameType]http2frameParser{ |
| http2FrameData: http2parseDataFrame, |
| http2FrameHeaders: http2parseHeadersFrame, |
| http2FramePriority: http2parsePriorityFrame, |
| http2FrameRSTStream: http2parseRSTStreamFrame, |
| http2FrameSettings: http2parseSettingsFrame, |
| http2FramePushPromise: http2parsePushPromise, |
| http2FramePing: http2parsePingFrame, |
| http2FrameGoAway: http2parseGoAwayFrame, |
| http2FrameWindowUpdate: http2parseWindowUpdateFrame, |
| http2FrameContinuation: http2parseContinuationFrame, |
| } |
| |
| func http2typeFrameParser(t http2FrameType) http2frameParser { |
| if f := http2frameParsers[t]; f != nil { |
| return f |
| } |
| return http2parseUnknownFrame |
| } |
| |
| // A FrameHeader is the 9 byte header of all HTTP/2 frames. |
| // |
| // See http://http2.github.io/http2-spec/#FrameHeader |
| type http2FrameHeader struct { |
| valid bool // caller can access []byte fields in the Frame |
| |
| // Type is the 1 byte frame type. There are ten standard frame |
| // types, but extension frame types may be written by WriteRawFrame |
| // and will be returned by ReadFrame (as UnknownFrame). |
| Type http2FrameType |
| |
| // Flags are the 1 byte of 8 potential bit flags per frame. |
| // They are specific to the frame type. |
| Flags http2Flags |
| |
| // Length is the length of the frame, not including the 9 byte header. |
| // The maximum size is one byte less than 16MB (uint24), but only |
| // frames up to 16KB are allowed without peer agreement. |
| Length uint32 |
| |
| // StreamID is which stream this frame is for. Certain frames |
| // are not stream-specific, in which case this field is 0. |
| StreamID uint32 |
| } |
| |
| // Header returns h. It exists so FrameHeaders can be embedded in other |
| // specific frame types and implement the Frame interface. |
| func (h http2FrameHeader) Header() http2FrameHeader { return h } |
| |
| func (h http2FrameHeader) String() string { |
| var buf bytes.Buffer |
| buf.WriteString("[FrameHeader ") |
| buf.WriteString(h.Type.String()) |
| if h.Flags != 0 { |
| buf.WriteString(" flags=") |
| set := 0 |
| for i := uint8(0); i < 8; i++ { |
| if h.Flags&(1<<i) == 0 { |
| continue |
| } |
| set++ |
| if set > 1 { |
| buf.WriteByte('|') |
| } |
| name := http2flagName[h.Type][http2Flags(1<<i)] |
| if name != "" { |
| buf.WriteString(name) |
| } else { |
| fmt.Fprintf(&buf, "0x%x", 1<<i) |
| } |
| } |
| } |
| if h.StreamID != 0 { |
| fmt.Fprintf(&buf, " stream=%d", h.StreamID) |
| } |
| fmt.Fprintf(&buf, " len=%d]", h.Length) |
| return buf.String() |
| } |
| |
| func (h *http2FrameHeader) checkValid() { |
| if !h.valid { |
| panic("Frame accessor called on non-owned Frame") |
| } |
| } |
| |
| func (h *http2FrameHeader) invalidate() { h.valid = false } |
| |
| // frame header bytes. |
| // Used only by ReadFrameHeader. |
| var http2fhBytes = sync.Pool{ |
| New: func() interface{} { |
| buf := make([]byte, http2frameHeaderLen) |
| return &buf |
| }, |
| } |
| |
| // ReadFrameHeader reads 9 bytes from r and returns a FrameHeader. |
| // Most users should use Framer.ReadFrame instead. |
| func http2ReadFrameHeader(r io.Reader) (http2FrameHeader, error) { |
| bufp := http2fhBytes.Get().(*[]byte) |
| defer http2fhBytes.Put(bufp) |
| return http2readFrameHeader(*bufp, r) |
| } |
| |
| func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) { |
| _, err := io.ReadFull(r, buf[:http2frameHeaderLen]) |
| if err != nil { |
| return http2FrameHeader{}, err |
| } |
| return http2FrameHeader{ |
| Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])), |
| Type: http2FrameType(buf[3]), |
| Flags: http2Flags(buf[4]), |
| StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1), |
| valid: true, |
| }, nil |
| } |
| |
| // A Frame is the base interface implemented by all frame types. |
| // Callers will generally type-assert the specific frame type: |
| // *HeadersFrame, *SettingsFrame, *WindowUpdateFrame, etc. |
| // |
| // Frames are only valid until the next call to Framer.ReadFrame. |
| type http2Frame interface { |
| Header() http2FrameHeader |
| |
| // invalidate is called by Framer.ReadFrame to make this |
| // frame's buffers as being invalid, since the subsequent |
| // frame will reuse them. |
| invalidate() |
| } |
| |
| // A Framer reads and writes Frames. |
| type http2Framer struct { |
| r io.Reader |
| lastFrame http2Frame |
| |
| maxReadSize uint32 |
| headerBuf [http2frameHeaderLen]byte |
| |
| // TODO: let getReadBuf be configurable, and use a less memory-pinning |
| // allocator in server.go to minimize memory pinned for many idle conns. |
| // Will probably also need to make frame invalidation have a hook too. |
| getReadBuf func(size uint32) []byte |
| readBuf []byte // cache for default getReadBuf |
| |
| maxWriteSize uint32 // zero means unlimited; TODO: implement |
| |
| w io.Writer |
| wbuf []byte |
| |
| // AllowIllegalWrites permits the Framer's Write methods to |
| // write frames that do not conform to the HTTP/2 spec. This |
| // permits using the Framer to test other HTTP/2 |
| // implementations' conformance to the spec. |
| // If false, the Write methods will prefer to return an error |
| // rather than comply. |
| AllowIllegalWrites bool |
| } |
| |
| func (f *http2Framer) startWrite(ftype http2FrameType, flags http2Flags, streamID uint32) { |
| |
| f.wbuf = append(f.wbuf[:0], |
| 0, |
| 0, |
| 0, |
| byte(ftype), |
| byte(flags), |
| byte(streamID>>24), |
| byte(streamID>>16), |
| byte(streamID>>8), |
| byte(streamID)) |
| } |
| |
| func (f *http2Framer) endWrite() error { |
| |
| length := len(f.wbuf) - http2frameHeaderLen |
| if length >= (1 << 24) { |
| return http2ErrFrameTooLarge |
| } |
| _ = append(f.wbuf[:0], |
| byte(length>>16), |
| byte(length>>8), |
| byte(length)) |
| n, err := f.w.Write(f.wbuf) |
| if err == nil && n != len(f.wbuf) { |
| err = io.ErrShortWrite |
| } |
| return err |
| } |
| |
| func (f *http2Framer) writeByte(v byte) { f.wbuf = append(f.wbuf, v) } |
| |
| func (f *http2Framer) writeBytes(v []byte) { f.wbuf = append(f.wbuf, v...) } |
| |
| func (f *http2Framer) writeUint16(v uint16) { f.wbuf = append(f.wbuf, byte(v>>8), byte(v)) } |
| |
| func (f *http2Framer) writeUint32(v uint32) { |
| f.wbuf = append(f.wbuf, byte(v>>24), byte(v>>16), byte(v>>8), byte(v)) |
| } |
| |
| const ( |
| http2minMaxFrameSize = 1 << 14 |
| http2maxFrameSize = 1<<24 - 1 |
| ) |
| |
| // NewFramer returns a Framer that writes frames to w and reads them from r. |
| func http2NewFramer(w io.Writer, r io.Reader) *http2Framer { |
| fr := &http2Framer{ |
| w: w, |
| r: r, |
| } |
| fr.getReadBuf = func(size uint32) []byte { |
| if cap(fr.readBuf) >= int(size) { |
| return fr.readBuf[:size] |
| } |
| fr.readBuf = make([]byte, size) |
| return fr.readBuf |
| } |
| fr.SetMaxReadFrameSize(http2maxFrameSize) |
| return fr |
| } |
| |
| // SetMaxReadFrameSize sets the maximum size of a frame |
| // that will be read by a subsequent call to ReadFrame. |
| // It is the caller's responsibility to advertise this |
| // limit with a SETTINGS frame. |
| func (fr *http2Framer) SetMaxReadFrameSize(v uint32) { |
| if v > http2maxFrameSize { |
| v = http2maxFrameSize |
| } |
| fr.maxReadSize = v |
| } |
| |
| // ErrFrameTooLarge is returned from Framer.ReadFrame when the peer |
| // sends a frame that is larger than declared with SetMaxReadFrameSize. |
| var http2ErrFrameTooLarge = errors.New("http2: frame too large") |
| |
| // ReadFrame reads a single frame. The returned Frame is only valid |
| // until the next call to ReadFrame. |
| // If the frame is larger than previously set with SetMaxReadFrameSize, |
| // the returned error is ErrFrameTooLarge. |
| func (fr *http2Framer) ReadFrame() (http2Frame, error) { |
| if fr.lastFrame != nil { |
| fr.lastFrame.invalidate() |
| } |
| fh, err := http2readFrameHeader(fr.headerBuf[:], fr.r) |
| if err != nil { |
| return nil, err |
| } |
| if fh.Length > fr.maxReadSize { |
| return nil, http2ErrFrameTooLarge |
| } |
| payload := fr.getReadBuf(fh.Length) |
| if _, err := io.ReadFull(fr.r, payload); err != nil { |
| return nil, err |
| } |
| f, err := http2typeFrameParser(fh.Type)(fh, payload) |
| if err != nil { |
| return nil, err |
| } |
| fr.lastFrame = f |
| return f, nil |
| } |
| |
| // A DataFrame conveys arbitrary, variable-length sequences of octets |
| // associated with a stream. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.1 |
| type http2DataFrame struct { |
| http2FrameHeader |
| data []byte |
| } |
| |
| func (f *http2DataFrame) StreamEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagDataEndStream) |
| } |
| |
| // Data returns the frame's data octets, not including any padding |
| // size byte or padding suffix bytes. |
| // The caller must not retain the returned memory past the next |
| // call to ReadFrame. |
| func (f *http2DataFrame) Data() []byte { |
| f.checkValid() |
| return f.data |
| } |
| |
| func http2parseDataFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { |
| if fh.StreamID == 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| f := &http2DataFrame{ |
| http2FrameHeader: fh, |
| } |
| var padSize byte |
| if fh.Flags.Has(http2FlagDataPadded) { |
| var err error |
| payload, padSize, err = http2readByte(payload) |
| if err != nil { |
| return nil, err |
| } |
| } |
| if int(padSize) > len(payload) { |
| |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| f.data = payload[:len(payload)-int(padSize)] |
| return f, nil |
| } |
| |
| var http2errStreamID = errors.New("invalid streamid") |
| |
| func http2validStreamID(streamID uint32) bool { |
| return streamID != 0 && streamID&(1<<31) == 0 |
| } |
| |
| // WriteData writes a DATA frame. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteData(streamID uint32, endStream bool, data []byte) error { |
| |
| if !http2validStreamID(streamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| var flags http2Flags |
| if endStream { |
| flags |= http2FlagDataEndStream |
| } |
| f.startWrite(http2FrameData, flags, streamID) |
| f.wbuf = append(f.wbuf, data...) |
| return f.endWrite() |
| } |
| |
| // A SettingsFrame conveys configuration parameters that affect how |
| // endpoints communicate, such as preferences and constraints on peer |
| // behavior. |
| // |
| // See http://http2.github.io/http2-spec/#SETTINGS |
| type http2SettingsFrame struct { |
| http2FrameHeader |
| p []byte |
| } |
| |
| func http2parseSettingsFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| if fh.Flags.Has(http2FlagSettingsAck) && fh.Length > 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| if fh.StreamID != 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if len(p)%6 != 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| f := &http2SettingsFrame{http2FrameHeader: fh, p: p} |
| if v, ok := f.Value(http2SettingInitialWindowSize); ok && v > (1<<31)-1 { |
| |
| return nil, http2ConnectionError(http2ErrCodeFlowControl) |
| } |
| return f, nil |
| } |
| |
| func (f *http2SettingsFrame) IsAck() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagSettingsAck) |
| } |
| |
| func (f *http2SettingsFrame) Value(s http2SettingID) (v uint32, ok bool) { |
| f.checkValid() |
| buf := f.p |
| for len(buf) > 0 { |
| settingID := http2SettingID(binary.BigEndian.Uint16(buf[:2])) |
| if settingID == s { |
| return binary.BigEndian.Uint32(buf[2:6]), true |
| } |
| buf = buf[6:] |
| } |
| return 0, false |
| } |
| |
| // ForeachSetting runs fn for each setting. |
| // It stops and returns the first error. |
| func (f *http2SettingsFrame) ForeachSetting(fn func(http2Setting) error) error { |
| f.checkValid() |
| buf := f.p |
| for len(buf) > 0 { |
| if err := fn(http2Setting{ |
| http2SettingID(binary.BigEndian.Uint16(buf[:2])), |
| binary.BigEndian.Uint32(buf[2:6]), |
| }); err != nil { |
| return err |
| } |
| buf = buf[6:] |
| } |
| return nil |
| } |
| |
| // WriteSettings writes a SETTINGS frame with zero or more settings |
| // specified and the ACK bit not set. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteSettings(settings ...http2Setting) error { |
| f.startWrite(http2FrameSettings, 0, 0) |
| for _, s := range settings { |
| f.writeUint16(uint16(s.ID)) |
| f.writeUint32(s.Val) |
| } |
| return f.endWrite() |
| } |
| |
| // WriteSettings writes an empty SETTINGS frame with the ACK bit set. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteSettingsAck() error { |
| f.startWrite(http2FrameSettings, http2FlagSettingsAck, 0) |
| return f.endWrite() |
| } |
| |
| // A PingFrame is a mechanism for measuring a minimal round trip time |
| // from the sender, as well as determining whether an idle connection |
| // is still functional. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.7 |
| type http2PingFrame struct { |
| http2FrameHeader |
| Data [8]byte |
| } |
| |
| func http2parsePingFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { |
| if len(payload) != 8 { |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| if fh.StreamID != 0 { |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| f := &http2PingFrame{http2FrameHeader: fh} |
| copy(f.Data[:], payload) |
| return f, nil |
| } |
| |
| func (f *http2Framer) WritePing(ack bool, data [8]byte) error { |
| var flags http2Flags |
| if ack { |
| flags = http2FlagPingAck |
| } |
| f.startWrite(http2FramePing, flags, 0) |
| f.writeBytes(data[:]) |
| return f.endWrite() |
| } |
| |
| // A GoAwayFrame informs the remote peer to stop creating streams on this connection. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.8 |
| type http2GoAwayFrame struct { |
| http2FrameHeader |
| LastStreamID uint32 |
| ErrCode http2ErrCode |
| debugData []byte |
| } |
| |
| // DebugData returns any debug data in the GOAWAY frame. Its contents |
| // are not defined. |
| // The caller must not retain the returned memory past the next |
| // call to ReadFrame. |
| func (f *http2GoAwayFrame) DebugData() []byte { |
| f.checkValid() |
| return f.debugData |
| } |
| |
| func http2parseGoAwayFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| if fh.StreamID != 0 { |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if len(p) < 8 { |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| return &http2GoAwayFrame{ |
| http2FrameHeader: fh, |
| LastStreamID: binary.BigEndian.Uint32(p[:4]) & (1<<31 - 1), |
| ErrCode: http2ErrCode(binary.BigEndian.Uint32(p[4:8])), |
| debugData: p[8:], |
| }, nil |
| } |
| |
| func (f *http2Framer) WriteGoAway(maxStreamID uint32, code http2ErrCode, debugData []byte) error { |
| f.startWrite(http2FrameGoAway, 0, 0) |
| f.writeUint32(maxStreamID & (1<<31 - 1)) |
| f.writeUint32(uint32(code)) |
| f.writeBytes(debugData) |
| return f.endWrite() |
| } |
| |
| // An UnknownFrame is the frame type returned when the frame type is unknown |
| // or no specific frame type parser exists. |
| type http2UnknownFrame struct { |
| http2FrameHeader |
| p []byte |
| } |
| |
| // Payload returns the frame's payload (after the header). It is not |
| // valid to call this method after a subsequent call to |
| // Framer.ReadFrame, nor is it valid to retain the returned slice. |
| // The memory is owned by the Framer and is invalidated when the next |
| // frame is read. |
| func (f *http2UnknownFrame) Payload() []byte { |
| f.checkValid() |
| return f.p |
| } |
| |
| func http2parseUnknownFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| return &http2UnknownFrame{fh, p}, nil |
| } |
| |
| // A WindowUpdateFrame is used to implement flow control. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.9 |
| type http2WindowUpdateFrame struct { |
| http2FrameHeader |
| Increment uint32 |
| } |
| |
| func http2parseWindowUpdateFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| if len(p) != 4 { |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff |
| if inc == 0 { |
| |
| if fh.StreamID == 0 { |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| return nil, http2StreamError{fh.StreamID, http2ErrCodeProtocol} |
| } |
| return &http2WindowUpdateFrame{ |
| http2FrameHeader: fh, |
| Increment: inc, |
| }, nil |
| } |
| |
| // WriteWindowUpdate writes a WINDOW_UPDATE frame. |
| // The increment value must be between 1 and 2,147,483,647, inclusive. |
| // If the Stream ID is zero, the window update applies to the |
| // connection as a whole. |
| func (f *http2Framer) WriteWindowUpdate(streamID, incr uint32) error { |
| |
| if (incr < 1 || incr > 2147483647) && !f.AllowIllegalWrites { |
| return errors.New("illegal window increment value") |
| } |
| f.startWrite(http2FrameWindowUpdate, 0, streamID) |
| f.writeUint32(incr) |
| return f.endWrite() |
| } |
| |
| // A HeadersFrame is used to open a stream and additionally carries a |
| // header block fragment. |
| type http2HeadersFrame struct { |
| http2FrameHeader |
| |
| // Priority is set if FlagHeadersPriority is set in the FrameHeader. |
| Priority http2PriorityParam |
| |
| headerFragBuf []byte // not owned |
| } |
| |
| func (f *http2HeadersFrame) HeaderBlockFragment() []byte { |
| f.checkValid() |
| return f.headerFragBuf |
| } |
| |
| func (f *http2HeadersFrame) HeadersEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagHeadersEndHeaders) |
| } |
| |
| func (f *http2HeadersFrame) StreamEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagHeadersEndStream) |
| } |
| |
| func (f *http2HeadersFrame) HasPriority() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagHeadersPriority) |
| } |
| |
| func http2parseHeadersFrame(fh http2FrameHeader, p []byte) (_ http2Frame, err error) { |
| hf := &http2HeadersFrame{ |
| http2FrameHeader: fh, |
| } |
| if fh.StreamID == 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| var padLength uint8 |
| if fh.Flags.Has(http2FlagHeadersPadded) { |
| if p, padLength, err = http2readByte(p); err != nil { |
| return |
| } |
| } |
| if fh.Flags.Has(http2FlagHeadersPriority) { |
| var v uint32 |
| p, v, err = http2readUint32(p) |
| if err != nil { |
| return nil, err |
| } |
| hf.Priority.StreamDep = v & 0x7fffffff |
| hf.Priority.Exclusive = (v != hf.Priority.StreamDep) |
| p, hf.Priority.Weight, err = http2readByte(p) |
| if err != nil { |
| return nil, err |
| } |
| } |
| if len(p)-int(padLength) <= 0 { |
| return nil, http2StreamError{fh.StreamID, http2ErrCodeProtocol} |
| } |
| hf.headerFragBuf = p[:len(p)-int(padLength)] |
| return hf, nil |
| } |
| |
| // HeadersFrameParam are the parameters for writing a HEADERS frame. |
| type http2HeadersFrameParam struct { |
| // StreamID is the required Stream ID to initiate. |
| StreamID uint32 |
| // BlockFragment is part (or all) of a Header Block. |
| BlockFragment []byte |
| |
| // EndStream indicates that the header block is the last that |
| // the endpoint will send for the identified stream. Setting |
| // this flag causes the stream to enter one of "half closed" |
| // states. |
| EndStream bool |
| |
| // EndHeaders indicates that this frame contains an entire |
| // header block and is not followed by any |
| // CONTINUATION frames. |
| EndHeaders bool |
| |
| // PadLength is the optional number of bytes of zeros to add |
| // to this frame. |
| PadLength uint8 |
| |
| // Priority, if non-zero, includes stream priority information |
| // in the HEADER frame. |
| Priority http2PriorityParam |
| } |
| |
| // WriteHeaders writes a single HEADERS frame. |
| // |
| // This is a low-level header writing method. Encoding headers and |
| // splitting them into any necessary CONTINUATION frames is handled |
| // elsewhere. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteHeaders(p http2HeadersFrameParam) error { |
| if !http2validStreamID(p.StreamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| var flags http2Flags |
| if p.PadLength != 0 { |
| flags |= http2FlagHeadersPadded |
| } |
| if p.EndStream { |
| flags |= http2FlagHeadersEndStream |
| } |
| if p.EndHeaders { |
| flags |= http2FlagHeadersEndHeaders |
| } |
| if !p.Priority.IsZero() { |
| flags |= http2FlagHeadersPriority |
| } |
| f.startWrite(http2FrameHeaders, flags, p.StreamID) |
| if p.PadLength != 0 { |
| f.writeByte(p.PadLength) |
| } |
| if !p.Priority.IsZero() { |
| v := p.Priority.StreamDep |
| if !http2validStreamID(v) && !f.AllowIllegalWrites { |
| return errors.New("invalid dependent stream id") |
| } |
| if p.Priority.Exclusive { |
| v |= 1 << 31 |
| } |
| f.writeUint32(v) |
| f.writeByte(p.Priority.Weight) |
| } |
| f.wbuf = append(f.wbuf, p.BlockFragment...) |
| f.wbuf = append(f.wbuf, http2padZeros[:p.PadLength]...) |
| return f.endWrite() |
| } |
| |
| // A PriorityFrame specifies the sender-advised priority of a stream. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.3 |
| type http2PriorityFrame struct { |
| http2FrameHeader |
| http2PriorityParam |
| } |
| |
| // PriorityParam are the stream prioritzation parameters. |
| type http2PriorityParam struct { |
| // StreamDep is a 31-bit stream identifier for the |
| // stream that this stream depends on. Zero means no |
| // dependency. |
| StreamDep uint32 |
| |
| // Exclusive is whether the dependency is exclusive. |
| Exclusive bool |
| |
| // Weight is the stream's zero-indexed weight. It should be |
| // set together with StreamDep, or neither should be set. Per |
| // the spec, "Add one to the value to obtain a weight between |
| // 1 and 256." |
| Weight uint8 |
| } |
| |
| func (p http2PriorityParam) IsZero() bool { |
| return p == http2PriorityParam{} |
| } |
| |
| func http2parsePriorityFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { |
| if fh.StreamID == 0 { |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if len(payload) != 5 { |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| v := binary.BigEndian.Uint32(payload[:4]) |
| streamID := v & 0x7fffffff |
| return &http2PriorityFrame{ |
| http2FrameHeader: fh, |
| http2PriorityParam: http2PriorityParam{ |
| Weight: payload[4], |
| StreamDep: streamID, |
| Exclusive: streamID != v, |
| }, |
| }, nil |
| } |
| |
| // WritePriority writes a PRIORITY frame. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WritePriority(streamID uint32, p http2PriorityParam) error { |
| if !http2validStreamID(streamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| f.startWrite(http2FramePriority, 0, streamID) |
| v := p.StreamDep |
| if p.Exclusive { |
| v |= 1 << 31 |
| } |
| f.writeUint32(v) |
| f.writeByte(p.Weight) |
| return f.endWrite() |
| } |
| |
| // A RSTStreamFrame allows for abnormal termination of a stream. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.4 |
| type http2RSTStreamFrame struct { |
| http2FrameHeader |
| ErrCode http2ErrCode |
| } |
| |
| func http2parseRSTStreamFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| if len(p) != 4 { |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| if fh.StreamID == 0 { |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| return &http2RSTStreamFrame{fh, http2ErrCode(binary.BigEndian.Uint32(p[:4]))}, nil |
| } |
| |
| // WriteRSTStream writes a RST_STREAM frame. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteRSTStream(streamID uint32, code http2ErrCode) error { |
| if !http2validStreamID(streamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| f.startWrite(http2FrameRSTStream, 0, streamID) |
| f.writeUint32(uint32(code)) |
| return f.endWrite() |
| } |
| |
| // A ContinuationFrame is used to continue a sequence of header block fragments. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.10 |
| type http2ContinuationFrame struct { |
| http2FrameHeader |
| headerFragBuf []byte |
| } |
| |
| func http2parseContinuationFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| return &http2ContinuationFrame{fh, p}, nil |
| } |
| |
| func (f *http2ContinuationFrame) StreamEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagDataEndStream) |
| } |
| |
| func (f *http2ContinuationFrame) HeaderBlockFragment() []byte { |
| f.checkValid() |
| return f.headerFragBuf |
| } |
| |
| func (f *http2ContinuationFrame) HeadersEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagContinuationEndHeaders) |
| } |
| |
| // WriteContinuation writes a CONTINUATION frame. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteContinuation(streamID uint32, endHeaders bool, headerBlockFragment []byte) error { |
| if !http2validStreamID(streamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| var flags http2Flags |
| if endHeaders { |
| flags |= http2FlagContinuationEndHeaders |
| } |
| f.startWrite(http2FrameContinuation, flags, streamID) |
| f.wbuf = append(f.wbuf, headerBlockFragment...) |
| return f.endWrite() |
| } |
| |
| // A PushPromiseFrame is used to initiate a server stream. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.6 |
| type http2PushPromiseFrame struct { |
| http2FrameHeader |
| PromiseID uint32 |
| headerFragBuf []byte // not owned |
| } |
| |
| func (f *http2PushPromiseFrame) HeaderBlockFragment() []byte { |
| f.checkValid() |
| return f.headerFragBuf |
| } |
| |
| func (f *http2PushPromiseFrame) HeadersEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagPushPromiseEndHeaders) |
| } |
| |
| func http2parsePushPromise(fh http2FrameHeader, p []byte) (_ http2Frame, err error) { |
| pp := &http2PushPromiseFrame{ |
| http2FrameHeader: fh, |
| } |
| if pp.StreamID == 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| // The PUSH_PROMISE frame includes optional padding. |
| // Padding fields and flags are identical to those defined for DATA frames |
| var padLength uint8 |
| if fh.Flags.Has(http2FlagPushPromisePadded) { |
| if p, padLength, err = http2readByte(p); err != nil { |
| return |
| } |
| } |
| |
| p, pp.PromiseID, err = http2readUint32(p) |
| if err != nil { |
| return |
| } |
| pp.PromiseID = pp.PromiseID & (1<<31 - 1) |
| |
| if int(padLength) > len(p) { |
| |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| pp.headerFragBuf = p[:len(p)-int(padLength)] |
| return pp, nil |
| } |
| |
| // PushPromiseParam are the parameters for writing a PUSH_PROMISE frame. |
| type http2PushPromiseParam struct { |
| // StreamID is the required Stream ID to initiate. |
| StreamID uint32 |
| |
| // PromiseID is the required Stream ID which this |
| // Push Promises |
| PromiseID uint32 |
| |
| // BlockFragment is part (or all) of a Header Block. |
| BlockFragment []byte |
| |
| // EndHeaders indicates that this frame contains an entire |
| // header block and is not followed by any |
| // CONTINUATION frames. |
| EndHeaders bool |
| |
| // PadLength is the optional number of bytes of zeros to add |
| // to this frame. |
| PadLength uint8 |
| } |
| |
| // WritePushPromise writes a single PushPromise Frame. |
| // |
| // As with Header Frames, This is the low level call for writing |
| // individual frames. Continuation frames are handled elsewhere. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WritePushPromise(p http2PushPromiseParam) error { |
| if !http2validStreamID(p.StreamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| var flags http2Flags |
| if p.PadLength != 0 { |
| flags |= http2FlagPushPromisePadded |
| } |
| if p.EndHeaders { |
| flags |= http2FlagPushPromiseEndHeaders |
| } |
| f.startWrite(http2FramePushPromise, flags, p.StreamID) |
| if p.PadLength != 0 { |
| f.writeByte(p.PadLength) |
| } |
| if !http2validStreamID(p.PromiseID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| f.writeUint32(p.PromiseID) |
| f.wbuf = append(f.wbuf, p.BlockFragment...) |
| f.wbuf = append(f.wbuf, http2padZeros[:p.PadLength]...) |
| return f.endWrite() |
| } |
| |
| // WriteRawFrame writes a raw frame. This can be used to write |
| // extension frames unknown to this package. |
| func (f *http2Framer) WriteRawFrame(t http2FrameType, flags http2Flags, streamID uint32, payload []byte) error { |
| f.startWrite(t, flags, streamID) |
| f.writeBytes(payload) |
| return f.endWrite() |
| } |
| |
| func http2readByte(p []byte) (remain []byte, b byte, err error) { |
| if len(p) == 0 { |
| return nil, 0, io.ErrUnexpectedEOF |
| } |
| return p[1:], p[0], nil |
| } |
| |
| func http2readUint32(p []byte) (remain []byte, v uint32, err error) { |
| if len(p) < 4 { |
| return nil, 0, io.ErrUnexpectedEOF |
| } |
| return p[4:], binary.BigEndian.Uint32(p[:4]), nil |
| } |
| |
| type http2streamEnder interface { |
| StreamEnded() bool |
| } |
| |
| type http2headersEnder interface { |
| HeadersEnded() bool |
| } |
| |
| var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1" |
| |
| type http2goroutineLock uint64 |
| |
| func http2newGoroutineLock() http2goroutineLock { |
| if !http2DebugGoroutines { |
| return 0 |
| } |
| return http2goroutineLock(http2curGoroutineID()) |
| } |
| |
| func (g http2goroutineLock) check() { |
| if !http2DebugGoroutines { |
| return |
| } |
| if http2curGoroutineID() != uint64(g) { |
| panic("running on the wrong goroutine") |
| } |
| } |
| |
| func (g http2goroutineLock) checkNotOn() { |
| if !http2DebugGoroutines { |
| return |
| } |
| if http2curGoroutineID() == uint64(g) { |
| panic("running on the wrong goroutine") |
| } |
| } |
| |
| var http2goroutineSpace = []byte("goroutine ") |
| |
| func http2curGoroutineID() uint64 { |
| bp := http2littleBuf.Get().(*[]byte) |
| defer http2littleBuf.Put(bp) |
| b := *bp |
| b = b[:runtime.Stack(b, false)] |
| |
| b = bytes.TrimPrefix(b, http2goroutineSpace) |
| i := bytes.IndexByte(b, ' ') |
| if i < 0 { |
| panic(fmt.Sprintf("No space found in %q", b)) |
| } |
| b = b[:i] |
| n, err := http2parseUintBytes(b, 10, 64) |
| if err != nil { |
| panic(fmt.Sprintf("Failed to parse goroutine ID out of %q: %v", b, err)) |
| } |
| return n |
| } |
| |
| var http2littleBuf = sync.Pool{ |
| New: func() interface{} { |
| buf := make([]byte, 64) |
| return &buf |
| }, |
| } |
| |
| // parseUintBytes is like strconv.ParseUint, but using a []byte. |
| func http2parseUintBytes(s []byte, base int, bitSize int) (n uint64, err error) { |
| var cutoff, maxVal uint64 |
| |
| if bitSize == 0 { |
| bitSize = int(strconv.IntSize) |
| } |
| |
| s0 := s |
| switch { |
| case len(s) < 1: |
| err = strconv.ErrSyntax |
| goto Error |
| |
| case 2 <= base && base <= 36: |
| |
| case base == 0: |
| |
| switch { |
| case s[0] == '0' && len(s) > 1 && (s[1] == 'x' || s[1] == 'X'): |
| base = 16 |
| s = s[2:] |
| if len(s) < 1 { |
| err = strconv.ErrSyntax |
| goto Error |
| } |
| case s[0] == '0': |
| base = 8 |
| default: |
| base = 10 |
| } |
| |
| default: |
| err = errors.New("invalid base " + strconv.Itoa(base)) |
| goto Error |
| } |
| |
| n = 0 |
| cutoff = http2cutoff64(base) |
| maxVal = 1<<uint(bitSize) - 1 |
| |
| for i := 0; i < len(s); i++ { |
| var v byte |
| d := s[i] |
| switch { |
| case '0' <= d && d <= '9': |
| v = d - '0' |
| case 'a' <= d && d <= 'z': |
| v = d - 'a' + 10 |
| case 'A' <= d && d <= 'Z': |
| v = d - 'A' + 10 |
| default: |
| n = 0 |
| err = strconv.ErrSyntax |
| goto Error |
| } |
| if int(v) >= base { |
| n = 0 |
| err = strconv.ErrSyntax |
| goto Error |
| } |
| |
| if n >= cutoff { |
| |
| n = 1<<64 - 1 |
| err = strconv.ErrRange |
| goto Error |
| } |
| n *= uint64(base) |
| |
| n1 := n + uint64(v) |
| if n1 < n || n1 > maxVal { |
| |
| n = 1<<64 - 1 |
| err = strconv.ErrRange |
| goto Error |
| } |
| n = n1 |
| } |
| |
| return n, nil |
| |
| Error: |
| return n, &strconv.NumError{Func: "ParseUint", Num: string(s0), Err: err} |
| } |
| |
| // Return the first number n such that n*base >= 1<<64. |
| func http2cutoff64(base int) uint64 { |
| if base < 2 { |
| return 0 |
| } |
| return (1<<64-1)/uint64(base) + 1 |
| } |
| |
| var ( |
| http2commonLowerHeader = map[string]string{} // Go-Canonical-Case -> lower-case |
| http2commonCanonHeader = map[string]string{} // lower-case -> Go-Canonical-Case |
| ) |
| |
| func init() { |
| for _, v := range []string{ |
| "accept", |
| "accept-charset", |
| "accept-encoding", |
| "accept-language", |
| "accept-ranges", |
| "age", |
| "access-control-allow-origin", |
| "allow", |
| "authorization", |
| "cache-control", |
| "content-disposition", |
| "content-encoding", |
| "content-language", |
| "content-length", |
| "content-location", |
| "content-range", |
| "content-type", |
| "cookie", |
| "date", |
| "etag", |
| "expect", |
| "expires", |
| "from", |
| "host", |
| "if-match", |
| "if-modified-since", |
| "if-none-match", |
| "if-unmodified-since", |
| "last-modified", |
| "link", |
| "location", |
| "max-forwards", |
| "proxy-authenticate", |
| "proxy-authorization", |
| "range", |
| "referer", |
| "refresh", |
| "retry-after", |
| "server", |
| "set-cookie", |
| "strict-transport-security", |
| "transfer-encoding", |
| "user-agent", |
| "vary", |
| "via", |
| "www-authenticate", |
| } { |
| chk := CanonicalHeaderKey(v) |
| http2commonLowerHeader[chk] = v |
| http2commonCanonHeader[v] = chk |
| } |
| } |
| |
| func http2lowerHeader(v string) string { |
| if s, ok := http2commonLowerHeader[v]; ok { |
| return s |
| } |
| return strings.ToLower(v) |
| } |
| |
| var http2VerboseLogs = false |
| |
| const ( |
| // ClientPreface is the string that must be sent by new |
| // connections from clients. |
| http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" |
| |
| // SETTINGS_MAX_FRAME_SIZE default |
| // http://http2.github.io/http2-spec/#rfc.section.6.5.2 |
| http2initialMaxFrameSize = 16384 |
| |
| // NextProtoTLS is the NPN/ALPN protocol negotiated during |
| // HTTP/2's TLS setup. |
| http2NextProtoTLS = "h2" |
| |
| // http://http2.github.io/http2-spec/#SettingValues |
| http2initialHeaderTableSize = 4096 |
| |
| http2initialWindowSize = 65535 // 6.9.2 Initial Flow Control Window Size |
| |
| http2defaultMaxReadFrameSize = 1 << 20 |
| ) |
| |
| var ( |
| http2clientPreface = []byte(http2ClientPreface) |
| ) |
| |
| type http2streamState int |
| |
| const ( |
| http2stateIdle http2streamState = iota |
| http2stateOpen |
| http2stateHalfClosedLocal |
| http2stateHalfClosedRemote |
| http2stateResvLocal |
| http2stateResvRemote |
| http2stateClosed |
| ) |
| |
| var http2stateName = [...]string{ |
| http2stateIdle: "Idle", |
| http2stateOpen: "Open", |
| http2stateHalfClosedLocal: "HalfClosedLocal", |
| http2stateHalfClosedRemote: "HalfClosedRemote", |
| http2stateResvLocal: "ResvLocal", |
| http2stateResvRemote: "ResvRemote", |
| http2stateClosed: "Closed", |
| } |
| |
| func (st http2streamState) String() string { |
| return http2stateName[st] |
| } |
| |
| // Setting is a setting parameter: which setting it is, and its value. |
| type http2Setting struct { |
| // ID is which setting is being set. |
| // See http://http2.github.io/http2-spec/#SettingValues |
| ID http2SettingID |
| |
| // Val is the value. |
| Val uint32 |
| } |
| |
| func (s http2Setting) String() string { |
| return fmt.Sprintf("[%v = %d]", s.ID, s.Val) |
| } |
| |
| // Valid reports whether the setting is valid. |
| func (s http2Setting) Valid() error { |
| |
| switch s.ID { |
| case http2SettingEnablePush: |
| if s.Val != 1 && s.Val != 0 { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| case http2SettingInitialWindowSize: |
| if s.Val > 1<<31-1 { |
| return http2ConnectionError(http2ErrCodeFlowControl) |
| } |
| case http2SettingMaxFrameSize: |
| if s.Val < 16384 || s.Val > 1<<24-1 { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| } |
| return nil |
| } |
| |
| // A SettingID is an HTTP/2 setting as defined in |
| // http://http2.github.io/http2-spec/#iana-settings |
| type http2SettingID uint16 |
| |
| const ( |
| http2SettingHeaderTableSize http2SettingID = 0x1 |
| http2SettingEnablePush http2SettingID = 0x2 |
| http2SettingMaxConcurrentStreams http2SettingID = 0x3 |
| http2SettingInitialWindowSize http2SettingID = 0x4 |
| http2SettingMaxFrameSize http2SettingID = 0x5 |
| http2SettingMaxHeaderListSize http2SettingID = 0x6 |
| ) |
| |
| var http2settingName = map[http2SettingID]string{ |
| http2SettingHeaderTableSize: "HEADER_TABLE_SIZE", |
| http2SettingEnablePush: "ENABLE_PUSH", |
| http2SettingMaxConcurrentStreams: "MAX_CONCURRENT_STREAMS", |
| http2SettingInitialWindowSize: "INITIAL_WINDOW_SIZE", |
| http2SettingMaxFrameSize: "MAX_FRAME_SIZE", |
| http2SettingMaxHeaderListSize: "MAX_HEADER_LIST_SIZE", |
| } |
| |
| func (s http2SettingID) String() string { |
| if v, ok := http2settingName[s]; ok { |
| return v |
| } |
| return fmt.Sprintf("UNKNOWN_SETTING_%d", uint16(s)) |
| } |
| |
| func http2validHeader(v string) bool { |
| if len(v) == 0 { |
| return false |
| } |
| for _, r := range v { |
| |
| if r >= 127 || ('A' <= r && r <= 'Z') { |
| return false |
| } |
| } |
| return true |
| } |
| |
| var http2httpCodeStringCommon = map[int]string{} // n -> strconv.Itoa(n) |
| |
| func init() { |
| for i := 100; i <= 999; i++ { |
| if v := StatusText(i); v != "" { |
| http2httpCodeStringCommon[i] = strconv.Itoa(i) |
| } |
| } |
| } |
| |
| func http2httpCodeString(code int) string { |
| if s, ok := http2httpCodeStringCommon[code]; ok { |
| return s |
| } |
| return strconv.Itoa(code) |
| } |
| |
| // from pkg io |
| type http2stringWriter interface { |
| WriteString(s string) (n int, err error) |
| } |
| |
| // A gate lets two goroutines coordinate their activities. |
| type http2gate chan struct{} |
| |
| func (g http2gate) Done() { g <- struct{}{} } |
| |
| func (g http2gate) Wait() { <-g } |
| |
| // A closeWaiter is like a sync.WaitGroup but only goes 1 to 0 (open to closed). |
| type http2closeWaiter chan struct{} |
| |
| // Init makes a closeWaiter usable. |
| // It exists because so a closeWaiter value can be placed inside a |
| // larger struct and have the Mutex and Cond's memory in the same |
| // allocation. |
| func (cw *http2closeWaiter) Init() { |
| *cw = make(chan struct{}) |
| } |
| |
| // Close marks the closeWaiter as closed and unblocks any waiters. |
| func (cw http2closeWaiter) Close() { |
| close(cw) |
| } |
| |
| // Wait waits for the closeWaiter to become closed. |
| func (cw http2closeWaiter) Wait() { |
| <-cw |
| } |
| |
| // bufferedWriter is a buffered writer that writes to w. |
| // Its buffered writer is lazily allocated as needed, to minimize |
| // idle memory usage with many connections. |
| type http2bufferedWriter struct { |
| w io.Writer // immutable |
| bw *bufio.Writer // non-nil when data is buffered |
| } |
| |
| func http2newBufferedWriter(w io.Writer) *http2bufferedWriter { |
| return &http2bufferedWriter{w: w} |
| } |
| |
| var http2bufWriterPool = sync.Pool{ |
| New: func() interface{} { |
| |
| return bufio.NewWriterSize(nil, 4<<10) |
| }, |
| } |
| |
| func (w *http2bufferedWriter) Write(p []byte) (n int, err error) { |
| if w.bw == nil { |
| bw := http2bufWriterPool.Get().(*bufio.Writer) |
| bw.Reset(w.w) |
| w.bw = bw |
| } |
| return w.bw.Write(p) |
| } |
| |
| func (w *http2bufferedWriter) Flush() error { |
| bw := w.bw |
| if bw == nil { |
| return nil |
| } |
| err := bw.Flush() |
| bw.Reset(nil) |
| http2bufWriterPool.Put(bw) |
| w.bw = nil |
| return err |
| } |
| |
| type http2pipe struct { |
| b http2buffer |
| c sync.Cond |
| m sync.Mutex |
| } |
| |
| // Read waits until data is available and copies bytes |
| // from the buffer into p. |
| func (r *http2pipe) Read(p []byte) (n int, err error) { |
| r.c.L.Lock() |
| defer r.c.L.Unlock() |
| for r.b.Len() == 0 && !r.b.closed { |
| r.c.Wait() |
| } |
| return r.b.Read(p) |
| } |
| |
| // Write copies bytes from p into the buffer and wakes a reader. |
| // It is an error to write more data than the buffer can hold. |
| func (w *http2pipe) Write(p []byte) (n int, err error) { |
| w.c.L.Lock() |
| defer w.c.L.Unlock() |
| defer w.c.Signal() |
| return w.b.Write(p) |
| } |
| |
| func (c *http2pipe) Close(err error) { |
| c.c.L.Lock() |
| defer c.c.L.Unlock() |
| defer c.c.Signal() |
| c.b.Close(err) |
| } |
| |
| const ( |
| http2prefaceTimeout = 10 * time.Second |
| http2firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway |
| http2handlerChunkWriteSize = 4 << 10 |
| http2defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to? |
| ) |
| |
| var ( |
| http2errClientDisconnected = errors.New("client disconnected") |
| http2errClosedBody = errors.New("body closed by handler") |
| http2errStreamBroken = errors.New("http2: stream broken") |
| ) |
| |
| var http2responseWriterStatePool = sync.Pool{ |
| New: func() interface{} { |
| rws := &http2responseWriterState{} |
| rws.bw = bufio.NewWriterSize(http2chunkWriter{rws}, http2handlerChunkWriteSize) |
| return rws |
| }, |
| } |
| |
| // Test hooks. |
| var ( |
| http2testHookOnConn func() |
| http2testHookGetServerConn func(*http2serverConn) |
| http2testHookOnPanicMu *sync.Mutex // nil except in tests |
| http2testHookOnPanic func(sc *http2serverConn, panicVal interface{}) (rePanic bool) |
| ) |
| |
| // Server is an HTTP/2 server. |
| type http2Server struct { |
| // MaxHandlers limits the number of http.Handler ServeHTTP goroutines |
| // which may run at a time over all connections. |
| // Negative or zero no limit. |
| // TODO: implement |
| MaxHandlers int |
| |
| // MaxConcurrentStreams optionally specifies the number of |
| // concurrent streams that each client may have open at a |
| // time. This is unrelated to the number of http.Handler goroutines |
| // which may be active globally, which is MaxHandlers. |
| // If zero, MaxConcurrentStreams defaults to at least 100, per |
| // the HTTP/2 spec's recommendations. |
| MaxConcurrentStreams uint32 |
| |
| // MaxReadFrameSize optionally specifies the largest frame |
| // this server is willing to read. A valid value is between |
| // 16k and 16M, inclusive. If zero or otherwise invalid, a |
| // default value is used. |
| MaxReadFrameSize uint32 |
| |
| // PermitProhibitedCipherSuites, if true, permits the use of |
| // cipher suites prohibited by the HTTP/2 spec. |
| PermitProhibitedCipherSuites bool |
| } |
| |
| func (s *http2Server) maxReadFrameSize() uint32 { |
| if v := s.MaxReadFrameSize; v >= http2minMaxFrameSize && v <= http2maxFrameSize { |
| return v |
| } |
| return http2defaultMaxReadFrameSize |
| } |
| |
| func (s *http2Server) maxConcurrentStreams() uint32 { |
| if v := s.MaxConcurrentStreams; v > 0 { |
| return v |
| } |
| return http2defaultMaxStreams |
| } |
| |
| // ConfigureServer adds HTTP/2 support to a net/http Server. |
| // |
| // The configuration conf may be nil. |
| // |
| // ConfigureServer must be called before s begins serving. |
| func http2ConfigureServer(s *Server, conf *http2Server) { |
| if conf == nil { |
| conf = new(http2Server) |
| } |
| if s.TLSConfig == nil { |
| s.TLSConfig = new(tls.Config) |
| } |
| |
| if s.TLSConfig.CipherSuites != nil { |
| const requiredCipher = tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 |
| haveRequired := false |
| for _, v := range s.TLSConfig.CipherSuites { |
| if v == requiredCipher { |
| haveRequired = true |
| break |
| } |
| } |
| if !haveRequired { |
| s.TLSConfig.CipherSuites = append(s.TLSConfig.CipherSuites, requiredCipher) |
| } |
| } |
| |
| haveNPN := false |
| for _, p := range s.TLSConfig.NextProtos { |
| if p == http2NextProtoTLS { |
| haveNPN = true |
| break |
| } |
| } |
| if !haveNPN { |
| s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, http2NextProtoTLS) |
| } |
| |
| s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, "h2-14") |
| |
| if s.TLSNextProto == nil { |
| s.TLSNextProto = map[string]func(*Server, *tls.Conn, Handler){} |
| } |
| protoHandler := func(hs *Server, c *tls.Conn, h Handler) { |
| if http2testHookOnConn != nil { |
| http2testHookOnConn() |
| } |
| conf.handleConn(hs, c, h) |
| } |
| s.TLSNextProto[http2NextProtoTLS] = protoHandler |
| s.TLSNextProto["h2-14"] = protoHandler |
| } |
| |
| func (srv *http2Server) handleConn(hs *Server, c net.Conn, h Handler) { |
| sc := &http2serverConn{ |
| srv: srv, |
| hs: hs, |
| conn: c, |
| remoteAddrStr: c.RemoteAddr().String(), |
| bw: http2newBufferedWriter(c), |
| handler: h, |
| streams: make(map[uint32]*http2stream), |
| readFrameCh: make(chan http2readFrameResult), |
| wantWriteFrameCh: make(chan http2frameWriteMsg, 8), |
| wroteFrameCh: make(chan struct{}, 1), |
| bodyReadCh: make(chan http2bodyReadMsg), |
| doneServing: make(chan struct{}), |
| advMaxStreams: srv.maxConcurrentStreams(), |
| writeSched: http2writeScheduler{ |
| maxFrameSize: http2initialMaxFrameSize, |
| }, |
| initialWindowSize: http2initialWindowSize, |
| headerTableSize: http2initialHeaderTableSize, |
| serveG: http2newGoroutineLock(), |
| pushEnabled: true, |
| } |
| sc.flow.add(http2initialWindowSize) |
| sc.inflow.add(http2initialWindowSize) |
| sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) |
| sc.hpackDecoder = hpack.NewDecoder(http2initialHeaderTableSize, sc.onNewHeaderField) |
| sc.hpackDecoder.SetMaxStringLength(sc.maxHeaderStringLen()) |
| |
| fr := http2NewFramer(sc.bw, c) |
| fr.SetMaxReadFrameSize(srv.maxReadFrameSize()) |
| sc.framer = fr |
| |
| if tc, ok := c.(*tls.Conn); ok { |
| sc.tlsState = new(tls.ConnectionState) |
| *sc.tlsState = tc.ConnectionState() |
| |
| if sc.tlsState.Version < tls.VersionTLS12 { |
| sc.rejectConn(http2ErrCodeInadequateSecurity, "TLS version too low") |
| return |
| } |
| |
| if sc.tlsState.ServerName == "" { |
| |
| } |
| |
| if !srv.PermitProhibitedCipherSuites && http2isBadCipher(sc.tlsState.CipherSuite) { |
| |
| sc.rejectConn(http2ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite)) |
| return |
| } |
| } |
| |
| if hook := http2testHookGetServerConn; hook != nil { |
| hook(sc) |
| } |
| sc.serve() |
| } |
| |
| // isBadCipher reports whether the cipher is blacklisted by the HTTP/2 spec. |
| func http2isBadCipher(cipher uint16) bool { |
| switch cipher { |
| case tls.TLS_RSA_WITH_RC4_128_SHA, |
| tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA, |
| tls.TLS_RSA_WITH_AES_128_CBC_SHA, |
| tls.TLS_RSA_WITH_AES_256_CBC_SHA, |
| tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA, |
| tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, |
| tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, |
| tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA, |
| tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA, |
| tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, |
| tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA: |
| |
| return true |
| default: |
| return false |
| } |
| } |
| |
| func (sc *http2serverConn) rejectConn(err http2ErrCode, debug string) { |
| sc.vlogf("REJECTING conn: %v, %s", err, debug) |
| |
| sc.framer.WriteGoAway(0, err, []byte(debug)) |
| sc.bw.Flush() |
| sc.conn.Close() |
| } |
| |
| type http2serverConn struct { |
| // Immutable: |
| srv *http2Server |
| hs *Server |
| conn net.Conn |
| bw *http2bufferedWriter // writing to conn |
| handler Handler |
| framer *http2Framer |
| hpackDecoder *hpack.Decoder |
| doneServing chan struct{} // closed when serverConn.serve ends |
| readFrameCh chan http2readFrameResult // written by serverConn.readFrames |
| wantWriteFrameCh chan http2frameWriteMsg // from handlers -> serve |
| wroteFrameCh chan struct{} // from writeFrameAsync -> serve, tickles more frame writes |
| bodyReadCh chan http2bodyReadMsg // from handlers -> serve |
| testHookCh chan func(int) // code to run on the serve loop |
| flow http2flow // conn-wide (not stream-specific) outbound flow control |
| inflow http2flow // conn-wide inbound flow control |
| tlsState *tls.ConnectionState // shared by all handlers, like net/http |
| remoteAddrStr string |
| |
| // Everything following is owned by the serve loop; use serveG.check(): |
| serveG http2goroutineLock // used to verify funcs are on serve() |
| pushEnabled bool |
| sawFirstSettings bool // got the initial SETTINGS frame after the preface |
| needToSendSettingsAck bool |
| unackedSettings int // how many SETTINGS have we sent without ACKs? |
| clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) |
| advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client |
| curOpenStreams uint32 // client's number of open streams |
| maxStreamID uint32 // max ever seen |
| streams map[uint32]*http2stream |
| initialWindowSize int32 |
| headerTableSize uint32 |
| peerMaxHeaderListSize uint32 // zero means unknown (default) |
| canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case |
| req http2requestParam // non-zero while reading request headers |
| writingFrame bool // started write goroutine but haven't heard back on wroteFrameCh |
| needsFrameFlush bool // last frame write wasn't a flush |
| writeSched http2writeScheduler |
| inGoAway bool // we've started to or sent GOAWAY |
| needToSendGoAway bool // we need to schedule a GOAWAY frame write |
| goAwayCode http2ErrCode |
| shutdownTimerCh <-chan time.Time // nil until used |
| shutdownTimer *time.Timer // nil until used |
| |
| // Owned by the writeFrameAsync goroutine: |
| headerWriteBuf bytes.Buffer |
| hpackEncoder *hpack.Encoder |
| } |
| |
| func (sc *http2serverConn) maxHeaderStringLen() int { |
| v := sc.maxHeaderListSize() |
| if uint32(int(v)) == v { |
| return int(v) |
| } |
| |
| return 0 |
| } |
| |
| func (sc *http2serverConn) maxHeaderListSize() uint32 { |
| n := sc.hs.MaxHeaderBytes |
| if n <= 0 { |
| n = DefaultMaxHeaderBytes |
| } |
| // http2's count is in a slightly different unit and includes 32 bytes per pair. |
| // So, take the net/http.Server value and pad it up a bit, assuming 10 headers. |
| const perFieldOverhead = 32 // per http2 spec |
| const typicalHeaders = 10 // conservative |
| return uint32(n + typicalHeaders*perFieldOverhead) |
| } |
| |
| // requestParam is the state of the next request, initialized over |
| // potentially several frames HEADERS + zero or more CONTINUATION |
| // frames. |
| type http2requestParam struct { |
| // stream is non-nil if we're reading (HEADER or CONTINUATION) |
| // frames for a request (but not DATA). |
| stream *http2stream |
| header Header |
| method, path string |
| scheme, authority string |
| sawRegularHeader bool // saw a non-pseudo header already |
| invalidHeader bool // an invalid header was seen |
| headerListSize int64 // actually uint32, but easier math this way |
| } |
| |
| // stream represents a stream. This is the minimal metadata needed by |
| // the serve goroutine. Most of the actual stream state is owned by |
| // the http.Handler's goroutine in the responseWriter. Because the |
| // responseWriter's responseWriterState is recycled at the end of a |
| // handler, this struct intentionally has no pointer to the |
| // *responseWriter{,State} itself, as the Handler ending nils out the |
| // responseWriter's state field. |
| type http2stream struct { |
| // immutable: |
| id uint32 |
| body *http2pipe // non-nil if expecting DATA frames |
| cw http2closeWaiter // closed wait stream transitions to closed state |
| |
| // owned by serverConn's serve loop: |
| bodyBytes int64 // body bytes seen so far |
| declBodyBytes int64 // or -1 if undeclared |
| flow http2flow // limits writing from Handler to client |
| inflow http2flow // what the client is allowed to POST/etc to us |
| parent *http2stream // or nil |
| weight uint8 |
| state http2streamState |
| sentReset bool // only true once detached from streams map |
| gotReset bool // only true once detacted from streams map |
| } |
| |
| func (sc *http2serverConn) Framer() *http2Framer { return sc.framer } |
| |
| func (sc *http2serverConn) CloseConn() error { return sc.conn.Close() } |
| |
| func (sc *http2serverConn) Flush() error { return sc.bw.Flush() } |
| |
| func (sc *http2serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) { |
| return sc.hpackEncoder, &sc.headerWriteBuf |
| } |
| |
| func (sc *http2serverConn) state(streamID uint32) (http2streamState, *http2stream) { |
| sc.serveG.check() |
| |
| if st, ok := sc.streams[streamID]; ok { |
| return st.state, st |
| } |
| |
| if streamID <= sc.maxStreamID { |
| return http2stateClosed, nil |
| } |
| return http2stateIdle, nil |
| } |
| |
| func (sc *http2serverConn) vlogf(format string, args ...interface{}) { |
| if http2VerboseLogs { |
| sc.logf(format, args...) |
| } |
| } |
| |
| func (sc *http2serverConn) logf(format string, args ...interface{}) { |
| if lg := sc.hs.ErrorLog; lg != nil { |
| lg.Printf(format, args...) |
| } else { |
| log.Printf(format, args...) |
| } |
| } |
| |
| func (sc *http2serverConn) condlogf(err error, format string, args ...interface{}) { |
| if err == nil { |
| return |
| } |
| str := err.Error() |
| if err == io.EOF || strings.Contains(str, "use of closed network connection") { |
| |
| sc.vlogf(format, args...) |
| } else { |
| sc.logf(format, args...) |
| } |
| } |
| |
| func (sc *http2serverConn) onNewHeaderField(f hpack.HeaderField) { |
| sc.serveG.check() |
| sc.vlogf("got header field %+v", f) |
| switch { |
| case !http2validHeader(f.Name): |
| sc.req.invalidHeader = true |
| case strings.HasPrefix(f.Name, ":"): |
| if sc.req.sawRegularHeader { |
| sc.logf("pseudo-header after regular header") |
| sc.req.invalidHeader = true |
| return |
| } |
| var dst *string |
| switch f.Name { |
| case ":method": |
| dst = &sc.req.method |
| case ":path": |
| dst = &sc.req.path |
| case ":scheme": |
| dst = &sc.req.scheme |
| case ":authority": |
| dst = &sc.req.authority |
| default: |
| |
| sc.logf("invalid pseudo-header %q", f.Name) |
| sc.req.invalidHeader = true |
| return |
| } |
| if *dst != "" { |
| sc.logf("duplicate pseudo-header %q sent", f.Name) |
| sc.req.invalidHeader = true |
| return |
| } |
| *dst = f.Value |
| default: |
| sc.req.sawRegularHeader = true |
| sc.req.header.Add(sc.canonicalHeader(f.Name), f.Value) |
| const headerFieldOverhead = 32 // per spec |
| sc.req.headerListSize += int64(len(f.Name)) + int64(len(f.Value)) + headerFieldOverhead |
| if sc.req.headerListSize > int64(sc.maxHeaderListSize()) { |
| sc.hpackDecoder.SetEmitEnabled(false) |
| } |
| } |
| } |
| |
| func (sc *http2serverConn) canonicalHeader(v string) string { |
| sc.serveG.check() |
| cv, ok := http2commonCanonHeader[v] |
| if ok { |
| return cv |
| } |
| cv, ok = sc.canonHeader[v] |
| if ok { |
| return cv |
| } |
| if sc.canonHeader == nil { |
| sc.canonHeader = make(map[string]string) |
| } |
| cv = CanonicalHeaderKey(v) |
| sc.canonHeader[v] = cv |
| return cv |
| } |
| |
| type http2readFrameResult struct { |
| f http2Frame // valid until readMore is called |
| err error |
| |
| // readMore should be called once the consumer no longer needs or |
| // retains f. After readMore, f is invalid and more frames can be |
| // read. |
| readMore func() |
| } |
| |
| // readFrames is the loop that reads incoming frames. |
| // It takes care to only read one frame at a time, blocking until the |
| // consumer is done with the frame. |
| // It's run on its own goroutine. |
| func (sc *http2serverConn) readFrames() { |
| gate := make(http2gate) |
| for { |
| f, err := sc.framer.ReadFrame() |
| select { |
| case sc.readFrameCh <- http2readFrameResult{f, err, gate.Done}: |
| case <-sc.doneServing: |
| return |
| } |
| select { |
| case <-gate: |
| case <-sc.doneServing: |
| return |
| } |
| } |
| } |
| |
| // writeFrameAsync runs in its own goroutine and writes a single frame |
| // and then reports when it's done. |
| // At most one goroutine can be running writeFrameAsync at a time per |
| // serverConn. |
| func (sc *http2serverConn) writeFrameAsync(wm http2frameWriteMsg) { |
| err := wm.write.writeFrame(sc) |
| if ch := wm.done; ch != nil { |
| select { |
| case ch <- err: |
| default: |
| panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.write)) |
| } |
| } |
| sc.wroteFrameCh <- struct{}{} |
| } |
| |
| func (sc *http2serverConn) closeAllStreamsOnConnClose() { |
| sc.serveG.check() |
| for _, st := range sc.streams { |
| sc.closeStream(st, http2errClientDisconnected) |
| } |
| } |
| |
| func (sc *http2serverConn) stopShutdownTimer() { |
| sc.serveG.check() |
| if t := sc.shutdownTimer; t != nil { |
| t.Stop() |
| } |
| } |
| |
| func (sc *http2serverConn) notePanic() { |
| if http2testHookOnPanicMu != nil { |
| http2testHookOnPanicMu.Lock() |
| defer http2testHookOnPanicMu.Unlock() |
| } |
| if http2testHookOnPanic != nil { |
| if e := recover(); e != nil { |
| if http2testHookOnPanic(sc, e) { |
| panic(e) |
| } |
| } |
| } |
| } |
| |
| func (sc *http2serverConn) serve() { |
| sc.serveG.check() |
| defer sc.notePanic() |
| defer sc.conn.Close() |
| defer sc.closeAllStreamsOnConnClose() |
| defer sc.stopShutdownTimer() |
| defer close(sc.doneServing) |
| |
| sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs) |
| |
| sc.writeFrame(http2frameWriteMsg{ |
| write: http2writeSettings{ |
| {http2SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, |
| {http2SettingMaxConcurrentStreams, sc.advMaxStreams}, |
| {http2SettingMaxHeaderListSize, sc.maxHeaderListSize()}, |
| }, |
| }) |
| sc.unackedSettings++ |
| |
| if err := sc.readPreface(); err != nil { |
| sc.condlogf(err, "error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) |
| return |
| } |
| |
| go sc.readFrames() |
| |
| settingsTimer := time.NewTimer(http2firstSettingsTimeout) |
| loopNum := 0 |
| for { |
| loopNum++ |
| select { |
| case wm := <-sc.wantWriteFrameCh: |
| sc.writeFrame(wm) |
| case <-sc.wroteFrameCh: |
| if sc.writingFrame != true { |
| panic("internal error: expected to be already writing a frame") |
| } |
| sc.writingFrame = false |
| sc.scheduleFrameWrite() |
| case res := <-sc.readFrameCh: |
| if !sc.processFrameFromReader(res) { |
| return |
| } |
| res.readMore() |
| if settingsTimer.C != nil { |
| settingsTimer.Stop() |
| settingsTimer.C = nil |
| } |
| case m := <-sc.bodyReadCh: |
| sc.noteBodyRead(m.st, m.n) |
| case <-settingsTimer.C: |
| sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) |
| return |
| case <-sc.shutdownTimerCh: |
| sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) |
| return |
| case fn := <-sc.testHookCh: |
| fn(loopNum) |
| } |
| } |
| } |
| |
| // readPreface reads the ClientPreface greeting from the peer |
| // or returns an error on timeout or an invalid greeting. |
| func (sc *http2serverConn) readPreface() error { |
| errc := make(chan error, 1) |
| go func() { |
| |
| buf := make([]byte, len(http2ClientPreface)) |
| if _, err := io.ReadFull(sc.conn, buf); err != nil { |
| errc <- err |
| } else if !bytes.Equal(buf, http2clientPreface) { |
| errc <- fmt.Errorf("bogus greeting %q", buf) |
| } else { |
| errc <- nil |
| } |
| }() |
| timer := time.NewTimer(http2prefaceTimeout) |
| defer timer.Stop() |
| select { |
| case <-timer.C: |
| return errors.New("timeout waiting for client preface") |
| case err := <-errc: |
| if err == nil { |
| sc.vlogf("client %v said hello", sc.conn.RemoteAddr()) |
| } |
| return err |
| } |
| } |
| |
| var http2errChanPool = sync.Pool{ |
| New: func() interface{} { return make(chan error, 1) }, |
| } |
| |
| // writeDataFromHandler writes the data described in req to stream.id. |
| // |
| // The flow control currently happens in the Handler where it waits |
| // for 1 or more bytes to be available to then write here. So at this |
| // point we know that we have flow control. But this might have to |
| // change when priority is implemented, so the serve goroutine knows |
| // the total amount of bytes waiting to be sent and can can have more |
| // scheduling decisions available. |
| func (sc *http2serverConn) writeDataFromHandler(stream *http2stream, writeData *http2writeData) error { |
| ch := http2errChanPool.Get().(chan error) |
| sc.writeFrameFromHandler(http2frameWriteMsg{ |
| write: writeData, |
| stream: stream, |
| done: ch, |
| }) |
| select { |
| case err := <-ch: |
| http2errChanPool.Put(ch) |
| return err |
| case <-sc.doneServing: |
| return http2errClientDisconnected |
| case <-stream.cw: |
| return http2errStreamBroken |
| } |
| } |
| |
| // writeFrameFromHandler sends wm to sc.wantWriteFrameCh, but aborts |
| // if the connection has gone away. |
| // |
| // This must not be run from the serve goroutine itself, else it might |
| // deadlock writing to sc.wantWriteFrameCh (which is only mildly |
| // buffered and is read by serve itself). If you're on the serve |
| // goroutine, call writeFrame instead. |
| func (sc *http2serverConn) writeFrameFromHandler(wm http2frameWriteMsg) { |
| sc.serveG.checkNotOn() |
| var scheduled bool |
| select { |
| case sc.wantWriteFrameCh <- wm: |
| scheduled = true |
| case <-sc.doneServing: |
| |
| case <-wm.stream.cw: |
| |
| } |
| |
| if !scheduled && wm.done != nil { |
| select { |
| case wm.done <- http2errStreamBroken: |
| default: |
| panic("expected buffered channel") |
| } |
| } |
| } |
| |
| // writeFrame schedules a frame to write and sends it if there's nothing |
| // already being written. |
| // |
| // There is no pushback here (the serve goroutine never blocks). It's |
| // the http.Handlers that block, waiting for their previous frames to |
| // make it onto the wire |
| // |
| // If you're not on the serve goroutine, use writeFrameFromHandler instead. |
| func (sc *http2serverConn) writeFrame(wm http2frameWriteMsg) { |
| sc.serveG.check() |
| sc.writeSched.add(wm) |
| sc.scheduleFrameWrite() |
| } |
| |
| // startFrameWrite starts a goroutine to write wm (in a separate |
| // goroutine since that might block on the network), and updates the |
| // serve goroutine's state about the world, updated from info in wm. |
| func (sc *http2serverConn) startFrameWrite(wm http2frameWriteMsg) { |
| sc.serveG.check() |
| if sc.writingFrame { |
| panic("internal error: can only be writing one frame at a time") |
| } |
| sc.writingFrame = true |
| |
| st := wm.stream |
| if st != nil { |
| switch st.state { |
| case http2stateHalfClosedLocal: |
| panic("internal error: attempt to send frame on half-closed-local stream") |
| case http2stateClosed: |
| if st.sentReset || st.gotReset { |
| |
| sc.wroteFrameCh <- struct{}{} |
| return |
| } |
| panic(fmt.Sprintf("internal error: attempt to send a write %v on a closed stream", wm)) |
| } |
| } |
| |
| sc.needsFrameFlush = true |
| if http2endsStream(wm.write) { |
| if st == nil { |
| panic("internal error: expecting non-nil stream") |
| } |
| switch st.state { |
| case http2stateOpen: |
| |
| st.state = http2stateHalfClosedLocal |
| errCancel := http2StreamError{st.id, http2ErrCodeCancel} |
| sc.resetStream(errCancel) |
| case http2stateHalfClosedRemote: |
| sc.closeStream(st, nil) |
| } |
| } |
| go sc.writeFrameAsync(wm) |
| } |
| |
| // scheduleFrameWrite tickles the frame writing scheduler. |
| // |
| // If a frame is already being written, nothing happens. This will be called again |
| // when the frame is done being written. |
| // |
| // If a frame isn't being written we need to send one, the best frame |
| // to send is selected, preferring first things that aren't |
| // stream-specific (e.g. ACKing settings), and then finding the |
| // highest priority stream. |
| // |
| // If a frame isn't being written and there's nothing else to send, we |
| // flush the write buffer. |
| func (sc *http2serverConn) scheduleFrameWrite() { |
| sc.serveG.check() |
| if sc.writingFrame { |
| return |
| } |
| if sc.needToSendGoAway { |
| sc.needToSendGoAway = false |
| sc.startFrameWrite(http2frameWriteMsg{ |
| write: &http2writeGoAway{ |
| maxStreamID: sc.maxStreamID, |
| code: sc.goAwayCode, |
| }, |
| }) |
| return |
| } |
| if sc.needToSendSettingsAck { |
| sc.needToSendSettingsAck = false |
| sc.startFrameWrite(http2frameWriteMsg{write: http2writeSettingsAck{}}) |
| return |
| } |
| if !sc.inGoAway { |
| if wm, ok := sc.writeSched.take(); ok { |
| sc.startFrameWrite(wm) |
| return |
| } |
| } |
| if sc.needsFrameFlush { |
| sc.startFrameWrite(http2frameWriteMsg{write: http2flushFrameWriter{}}) |
| sc.needsFrameFlush = false |
| return |
| } |
| } |
| |
| func (sc *http2serverConn) goAway(code http2ErrCode) { |
| sc.serveG.check() |
| if sc.inGoAway { |
| return |
| } |
| if code != http2ErrCodeNo { |
| sc.shutDownIn(250 * time.Millisecond) |
| } else { |
| |
| sc.shutDownIn(1 * time.Second) |
| } |
| sc.inGoAway = true |
| sc.needToSendGoAway = true |
| sc.goAwayCode = code |
| sc.scheduleFrameWrite() |
| } |
| |
| func (sc *http2serverConn) shutDownIn(d time.Duration) { |
| sc.serveG.check() |
| sc.shutdownTimer = time.NewTimer(d) |
| sc.shutdownTimerCh = sc.shutdownTimer.C |
| } |
| |
| func (sc *http2serverConn) resetStream(se http2StreamError) { |
| sc.serveG.check() |
| sc.writeFrame(http2frameWriteMsg{write: se}) |
| if st, ok := sc.streams[se.StreamID]; ok { |
| st.sentReset = true |
| sc.closeStream(st, se) |
| } |
| } |
| |
| // curHeaderStreamID returns the stream ID of the header block we're |
| // currently in the middle of reading. If this returns non-zero, the |
| // next frame must be a CONTINUATION with this stream id. |
| func (sc *http2serverConn) curHeaderStreamID() uint32 { |
| sc.serveG.check() |
| st := sc.req.stream |
| if st == nil { |
| return 0 |
| } |
| return st.id |
| } |
| |
| // processFrameFromReader processes the serve loop's read from readFrameCh from the |
| // frame-reading goroutine. |
| // processFrameFromReader returns whether the connection should be kept open. |
| func (sc *http2serverConn) processFrameFromReader(res http2readFrameResult) bool { |
| sc.serveG.check() |
| err := res.err |
| if err != nil { |
| if err == http2ErrFrameTooLarge { |
| sc.goAway(http2ErrCodeFrameSize) |
| return true |
| } |
| clientGone := err == io.EOF || strings.Contains(err.Error(), "use of closed network connection") |
| if clientGone { |
| |
| return false |
| } |
| } else { |
| f := res.f |
| sc.vlogf("got %v: %#v", f.Header(), f) |
| err = sc.processFrame(f) |
| if err == nil { |
| return true |
| } |
| } |
| |
| switch ev := err.(type) { |
| case http2StreamError: |
| sc.resetStream(ev) |
| return true |
| case http2goAwayFlowError: |
| sc.goAway(http2ErrCodeFlowControl) |
| return true |
| case http2ConnectionError: |
| sc.logf("%v: %v", sc.conn.RemoteAddr(), ev) |
| sc.goAway(http2ErrCode(ev)) |
| return true |
| default: |
| if res.err != nil { |
| sc.logf("disconnecting; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err) |
| } else { |
| sc.logf("disconnection due to other error: %v", err) |
| } |
| return false |
| } |
| } |
| |
| func (sc *http2serverConn) processFrame(f http2Frame) error { |
| sc.serveG.check() |
| |
| if !sc.sawFirstSettings { |
| if _, ok := f.(*http2SettingsFrame); !ok { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| sc.sawFirstSettings = true |
| } |
| |
| if s := sc.curHeaderStreamID(); s != 0 { |
| if cf, ok := f.(*http2ContinuationFrame); !ok { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } else if cf.Header().StreamID != s { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| } |
| |
| switch f := f.(type) { |
| case *http2SettingsFrame: |
| return sc.processSettings(f) |
| case *http2HeadersFrame: |
| return sc.processHeaders(f) |
| case *http2ContinuationFrame: |
| return sc.processContinuation(f) |
| case *http2WindowUpdateFrame: |
| return sc.processWindowUpdate(f) |
| case *http2PingFrame: |
| return sc.processPing(f) |
| case *http2DataFrame: |
| return sc.processData(f) |
| case *http2RSTStreamFrame: |
| return sc.processResetStream(f) |
| case *http2PriorityFrame: |
| return sc.processPriority(f) |
| case *http2PushPromiseFrame: |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| default: |
| sc.vlogf("Ignoring frame: %v", f.Header()) |
| return nil |
| } |
| } |
| |
| func (sc *http2serverConn) processPing(f *http2PingFrame) error { |
| sc.serveG.check() |
| if f.Flags.Has(http2FlagSettingsAck) { |
| |
| return nil |
| } |
| if f.StreamID != 0 { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| sc.writeFrame(http2frameWriteMsg{write: http2writePingAck{f}}) |
| return nil |
| } |
| |
| func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error { |
| sc.serveG.check() |
| switch { |
| case f.StreamID != 0: |
| st := sc.streams[f.StreamID] |
| if st == nil { |
| |
| return nil |
| } |
| if !st.flow.add(int32(f.Increment)) { |
| return http2StreamError{f.StreamID, http2ErrCodeFlowControl} |
| } |
| default: |
| if !sc.flow.add(int32(f.Increment)) { |
| return http2goAwayFlowError{} |
| } |
| } |
| sc.scheduleFrameWrite() |
| return nil |
| } |
| |
| func (sc *http2serverConn) processResetStream(f *http2RSTStreamFrame) error { |
| sc.serveG.check() |
| |
| state, st := sc.state(f.StreamID) |
| if state == http2stateIdle { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if st != nil { |
| st.gotReset = true |
| sc.closeStream(st, http2StreamError{f.StreamID, f.ErrCode}) |
| } |
| return nil |
| } |
| |
| func (sc *http2serverConn) closeStream(st *http2stream, err error) { |
| sc.serveG.check() |
| if st.state == http2stateIdle || st.state == http2stateClosed { |
| panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state)) |
| } |
| st.state = http2stateClosed |
| sc.curOpenStreams-- |
| delete(sc.streams, st.id) |
| if p := st.body; p != nil { |
| p.Close(err) |
| } |
| st.cw.Close() |
| sc.writeSched.forgetStream(st.id) |
| } |
| |
| func (sc *http2serverConn) processSettings(f *http2SettingsFrame) error { |
| sc.serveG.check() |
| if f.IsAck() { |
| sc.unackedSettings-- |
| if sc.unackedSettings < 0 { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| return nil |
| } |
| if err := f.ForeachSetting(sc.processSetting); err != nil { |
| return err |
| } |
| sc.needToSendSettingsAck = true |
| sc.scheduleFrameWrite() |
| return nil |
| } |
| |
| func (sc *http2serverConn) processSetting(s http2Setting) error { |
| sc.serveG.check() |
| if err := s.Valid(); err != nil { |
| return err |
| } |
| sc.vlogf("processing setting %v", s) |
| switch s.ID { |
| case http2SettingHeaderTableSize: |
| sc.headerTableSize = s.Val |
| sc.hpackEncoder.SetMaxDynamicTableSize(s.Val) |
| case http2SettingEnablePush: |
| sc.pushEnabled = s.Val != 0 |
| case http2SettingMaxConcurrentStreams: |
| sc.clientMaxStreams = s.Val |
| case http2SettingInitialWindowSize: |
| return sc.processSettingInitialWindowSize(s.Val) |
| case http2SettingMaxFrameSize: |
| sc.writeSched.maxFrameSize = s.Val |
| case http2SettingMaxHeaderListSize: |
| sc.peerMaxHeaderListSize = s.Val |
| default: |
| |
| } |
| return nil |
| } |
| |
| func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error { |
| sc.serveG.check() |
| |
| old := sc.initialWindowSize |
| sc.initialWindowSize = int32(val) |
| growth := sc.initialWindowSize - old |
| for _, st := range sc.streams { |
| if !st.flow.add(growth) { |
| |
| return http2ConnectionError(http2ErrCodeFlowControl) |
| } |
| } |
| return nil |
| } |
| |
| func (sc *http2serverConn) processData(f *http2DataFrame) error { |
| sc.serveG.check() |
| |
| id := f.Header().StreamID |
| st, ok := sc.streams[id] |
| if !ok || st.state != http2stateOpen { |
| |
| return http2StreamError{id, http2ErrCodeStreamClosed} |
| } |
| if st.body == nil { |
| panic("internal error: should have a body in this state") |
| } |
| data := f.Data() |
| |
| if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { |
| st.body.Close(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) |
| return http2StreamError{id, http2ErrCodeStreamClosed} |
| } |
| if len(data) > 0 { |
| |
| if int(st.inflow.available()) < len(data) { |
| return http2StreamError{id, http2ErrCodeFlowControl} |
| } |
| st.inflow.take(int32(len(data))) |
| wrote, err := st.body.Write(data) |
| if err != nil { |
| return http2StreamError{id, http2ErrCodeStreamClosed} |
| } |
| if wrote != len(data) { |
| panic("internal error: bad Writer") |
| } |
| st.bodyBytes += int64(len(data)) |
| } |
| if f.StreamEnded() { |
| if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes { |
| st.body.Close(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes", |
| st.declBodyBytes, st.bodyBytes)) |
| } else { |
| st.body.Close(io.EOF) |
| } |
| st.state = http2stateHalfClosedRemote |
| } |
| return nil |
| } |
| |
| func (sc *http2serverConn) processHeaders(f *http2HeadersFrame) error { |
| sc.serveG.check() |
| id := f.Header().StreamID |
| if sc.inGoAway { |
| |
| return nil |
| } |
| |
| if id%2 != 1 || id <= sc.maxStreamID || sc.req.stream != nil { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if id > sc.maxStreamID { |
| sc.maxStreamID = id |
| } |
| st := &http2stream{ |
| id: id, |
| state: http2stateOpen, |
| } |
| if f.StreamEnded() { |
| st.state = http2stateHalfClosedRemote |
| } |
| st.cw.Init() |
| |
| st.flow.conn = &sc.flow |
| st.flow.add(sc.initialWindowSize) |
| st.inflow.conn = &sc.inflow |
| st.inflow.add(http2initialWindowSize) |
| |
| sc.streams[id] = st |
| if f.HasPriority() { |
| http2adjustStreamPriority(sc.streams, st.id, f.Priority) |
| } |
| sc.curOpenStreams++ |
| sc.req = http2requestParam{ |
| stream: st, |
| header: make(Header), |
| } |
| sc.hpackDecoder.SetEmitEnabled(true) |
| return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded()) |
| } |
| |
| func (sc *http2serverConn) processContinuation(f *http2ContinuationFrame) error { |
| sc.serveG.check() |
| st := sc.streams[f.Header().StreamID] |
| if st == nil || sc.curHeaderStreamID() != st.id { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| return sc.processHeaderBlockFragment(st, f.HeaderBlockFragment(), f.HeadersEnded()) |
| } |
| |
| func (sc *http2serverConn) processHeaderBlockFragment(st *http2stream, frag []byte, end bool) error { |
| sc.serveG.check() |
| if _, err := sc.hpackDecoder.Write(frag); err != nil { |
| return http2ConnectionError(http2ErrCodeCompression) |
| } |
| if !end { |
| return nil |
| } |
| if err := sc.hpackDecoder.Close(); err != nil { |
| return http2ConnectionError(http2ErrCodeCompression) |
| } |
| defer sc.resetPendingRequest() |
| if sc.curOpenStreams > sc.advMaxStreams { |
| |
| if sc.unackedSettings == 0 { |
| |
| return http2StreamError{st.id, http2ErrCodeProtocol} |
| } |
| |
| return http2StreamError{st.id, http2ErrCodeRefusedStream} |
| } |
| |
| rw, req, err := sc.newWriterAndRequest() |
| if err != nil { |
| return err |
| } |
| st.body = req.Body.(*http2requestBody).pipe |
| st.declBodyBytes = req.ContentLength |
| |
| handler := sc.handler.ServeHTTP |
| if !sc.hpackDecoder.EmitEnabled() { |
| |
| handler = http2handleHeaderListTooLong |
| } |
| |
| go sc.runHandler(rw, req, handler) |
| return nil |
| } |
| |
| func (sc *http2serverConn) processPriority(f *http2PriorityFrame) error { |
| http2adjustStreamPriority(sc.streams, f.StreamID, f.http2PriorityParam) |
| return nil |
| } |
| |
| func http2adjustStreamPriority(streams map[uint32]*http2stream, streamID uint32, priority http2PriorityParam) { |
| st, ok := streams[streamID] |
| if !ok { |
| |
| return |
| } |
| st.weight = priority.Weight |
| parent := streams[priority.StreamDep] |
| if parent == st { |
| |
| return |
| } |
| |
| for piter := parent; piter != nil; piter = piter.parent { |
| if piter == st { |
| parent.parent = st.parent |
| break |
| } |
| } |
| st.parent = parent |
| if priority.Exclusive && (st.parent != nil || priority.StreamDep == 0) { |
| for _, openStream := range streams { |
| if openStream != st && openStream.parent == st.parent { |
| openStream.parent = st |
| } |
| } |
| } |
| } |
| |
| // resetPendingRequest zeros out all state related to a HEADERS frame |
| // and its zero or more CONTINUATION frames sent to start a new |
| // request. |
| func (sc *http2serverConn) resetPendingRequest() { |
| sc.serveG.check() |
| sc.req = http2requestParam{} |
| } |
| |
| func (sc *http2serverConn) newWriterAndRequest() (*http2responseWriter, *Request, error) { |
| sc.serveG.check() |
| rp := &sc.req |
| if rp.invalidHeader || rp.method == "" || rp.path == "" || |
| (rp.scheme != "https" && rp.scheme != "http") { |
| |
| return nil, nil, http2StreamError{rp.stream.id, http2ErrCodeProtocol} |
| } |
| var tlsState *tls.ConnectionState // nil if not scheme https |
| if rp.scheme == "https" { |
| tlsState = sc.tlsState |
| } |
| authority := rp.authority |
| if authority == "" { |
| authority = rp.header.Get("Host") |
| } |
| needsContinue := rp.header.Get("Expect") == "100-continue" |
| if needsContinue { |
| rp.header.Del("Expect") |
| } |
| |
| if cookies := rp.header["Cookie"]; len(cookies) > 1 { |
| rp.header.Set("Cookie", strings.Join(cookies, "; ")) |
| } |
| bodyOpen := rp.stream.state == http2stateOpen |
| body := &http2requestBody{ |
| conn: sc, |
| stream: rp.stream, |
| needsContinue: needsContinue, |
| } |
| |
| url, err := url.ParseRequestURI(rp.path) |
| if err != nil { |
| |
| return nil, nil, http2StreamError{rp.stream.id, http2ErrCodeProtocol} |
| } |
| req := &Request{ |
| Method: rp.method, |
| URL: url, |
| RemoteAddr: sc.remoteAddrStr, |
| Header: rp.header, |
| RequestURI: rp.path, |
| Proto: "HTTP/2.0", |
| ProtoMajor: 2, |
| ProtoMinor: 0, |
| TLS: tlsState, |
| Host: authority, |
| Body: body, |
| } |
| if bodyOpen { |
| body.pipe = &http2pipe{ |
| b: http2buffer{buf: make([]byte, http2initialWindowSize)}, |
| } |
| body.pipe.c.L = &body.pipe.m |
| |
| if vv, ok := rp.header["Content-Length"]; ok { |
| req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) |
| } else { |
| req.ContentLength = -1 |
| } |
| } |
| |
| rws := http2responseWriterStatePool.Get().(*http2responseWriterState) |
| bwSave := rws.bw |
| *rws = http2responseWriterState{} |
| rws.conn = sc |
| rws.bw = bwSave |
| rws.bw.Reset(http2chunkWriter{rws}) |
| rws.stream = rp.stream |
| rws.req = req |
| rws.body = body |
| |
| rw := &http2responseWriter{rws: rws} |
| return rw, req, nil |
| } |
| |
| // Run on its own goroutine. |
| func (sc *http2serverConn) runHandler(rw *http2responseWriter, req *Request, handler func(ResponseWriter, *Request)) { |
| defer rw.handlerDone() |
| |
| handler(rw, req) |
| } |
| |
| func http2handleHeaderListTooLong(w ResponseWriter, r *Request) { |
| // 10.5.1 Limits on Header Block Size: |
| // .. "A server that receives a larger header block than it is |
| // willing to handle can send an HTTP 431 (Request Header Fields Too |
| // Large) status code" |
| const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+ |
| w.WriteHeader(statusRequestHeaderFieldsTooLarge) |
| io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>") |
| } |
| |
| // called from handler goroutines. |
| // h may be nil. |
| func (sc *http2serverConn) writeHeaders(st *http2stream, headerData *http2writeResHeaders) { |
| sc.serveG.checkNotOn() |
| var errc chan error |
| if headerData.h != nil { |
| |
| errc = http2errChanPool.Get().(chan error) |
| } |
| sc.writeFrameFromHandler(http2frameWriteMsg{ |
| write: headerData, |
| stream: st, |
| done: errc, |
| }) |
| if errc != nil { |
| select { |
| case <-errc: |
| |
| http2errChanPool.Put(errc) |
| case <-sc.doneServing: |
| |
| case <-st.cw: |
| |
| } |
| } |
| } |
| |
| // called from handler goroutines. |
| func (sc *http2serverConn) write100ContinueHeaders(st *http2stream) { |
| sc.writeFrameFromHandler(http2frameWriteMsg{ |
| write: http2write100ContinueHeadersFrame{st.id}, |
| stream: st, |
| }) |
| } |
| |
| // A bodyReadMsg tells the server loop that the http.Handler read n |
| // bytes of the DATA from the client on the given stream. |
| type http2bodyReadMsg struct { |
| st *http2stream |
| n int |
| } |
| |
| // called from handler goroutines. |
| // Notes that the handler for the given stream ID read n bytes of its body |
| // and schedules flow control tokens to be sent. |
| func (sc *http2serverConn) noteBodyReadFromHandler(st *http2stream, n int) { |
| sc.serveG.checkNotOn() |
| sc.bodyReadCh <- http2bodyReadMsg{st, n} |
| } |
| |
| func (sc *http2serverConn) noteBodyRead(st *http2stream, n int) { |
| sc.serveG.check() |
| sc.sendWindowUpdate(nil, n) |
| if st.state != http2stateHalfClosedRemote && st.state != http2stateClosed { |
| |
| sc.sendWindowUpdate(st, n) |
| } |
| } |
| |
| // st may be nil for conn-level |
| func (sc *http2serverConn) sendWindowUpdate(st *http2stream, n int) { |
| sc.serveG.check() |
| // "The legal range for the increment to the flow control |
| // window is 1 to 2^31-1 (2,147,483,647) octets." |
| // A Go Read call on 64-bit machines could in theory read |
| // a larger Read than this. Very unlikely, but we handle it here |
| // rather than elsewhere for now. |
| const maxUint31 = 1<<31 - 1 |
| for n >= maxUint31 { |
| sc.sendWindowUpdate32(st, maxUint31) |
| n -= maxUint31 |
| } |
| sc.sendWindowUpdate32(st, int32(n)) |
| } |
| |
| // st may be nil for conn-level |
| func (sc *http2serverConn) sendWindowUpdate32(st *http2stream, n int32) { |
| sc.serveG.check() |
| if n == 0 { |
| return |
| } |
| if n < 0 { |
| panic("negative update") |
| } |
| var streamID uint32 |
| if st != nil { |
| streamID = st.id |
| } |
| sc.writeFrame(http2frameWriteMsg{ |
| write: http2writeWindowUpdate{streamID: streamID, n: uint32(n)}, |
| stream: st, |
| }) |
| var ok bool |
| if st == nil { |
| ok = sc.inflow.add(n) |
| } else { |
| ok = st.inflow.add(n) |
| } |
| if !ok { |
| panic("internal error; sent too many window updates without decrements?") |
| } |
| } |
| |
| type http2requestBody struct { |
| stream *http2stream |
| conn *http2serverConn |
| closed bool |
| pipe *http2pipe // non-nil if we have a HTTP entity message body |
| needsContinue bool // need to send a 100-continue |
| } |
| |
| func (b *http2requestBody) Close() error { |
| if b.pipe != nil { |
| b.pipe.Close(http2errClosedBody) |
| } |
| b.closed = true |
| return nil |
| } |
| |
| func (b *http2requestBody) Read(p []byte) (n int, err error) { |
| if b.needsContinue { |
| b.needsContinue = false |
| b.conn.write100ContinueHeaders(b.stream) |
| } |
| if b.pipe == nil { |
| return 0, io.EOF |
| } |
| n, err = b.pipe.Read(p) |
| if n > 0 { |
| b.conn.noteBodyReadFromHandler(b.stream, n) |
| } |
| return |
| } |
| |
| // responseWriter is the http.ResponseWriter implementation. It's |
| // intentionally small (1 pointer wide) to minimize garbage. The |
| // responseWriterState pointer inside is zeroed at the end of a |
| // request (in handlerDone) and calls on the responseWriter thereafter |
| // simply crash (caller's mistake), but the much larger responseWriterState |
| // and buffers are reused between multiple requests. |
| type http2responseWriter struct { |
| rws *http2responseWriterState |
| } |
| |
| // Optional http.ResponseWriter interfaces implemented. |
| var ( |
| _ CloseNotifier = (*http2responseWriter)(nil) |
| _ Flusher = (*http2responseWriter)(nil) |
| _ http2stringWriter = (*http2responseWriter)(nil) |
| ) |
| |
| type http2responseWriterState struct { |
| // immutable within a request: |
| stream *http2stream |
| req *Request |
| body *http2requestBody // to close at end of request, if DATA frames didn't |
| conn *http2serverConn |
| |
| // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc |
| bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState} |
| |
| // mutated by http.Handler goroutine: |
| handlerHeader Header // nil until called |
| snapHeader Header // snapshot of handlerHeader at WriteHeader time |
| status int // status code passed to WriteHeader |
| wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet. |
| sentHeader bool // have we sent the header frame? |
| handlerDone bool // handler has finished |
| curWrite http2writeData |
| |
| closeNotifierMu sync.Mutex // guards closeNotifierCh |
| closeNotifierCh chan bool // nil until first used |
| } |
| |
| type http2chunkWriter struct{ rws *http2responseWriterState } |
| |
| func (cw http2chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) } |
| |
| // writeChunk writes chunks from the bufio.Writer. But because |
| // bufio.Writer may bypass its chunking, sometimes p may be |
| // arbitrarily large. |
| // |
| // writeChunk is also responsible (on the first chunk) for sending the |
| // HEADER response. |
| func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) { |
| if !rws.wroteHeader { |
| rws.writeHeader(200) |
| } |
| if !rws.sentHeader { |
| rws.sentHeader = true |
| var ctype, clen string // implicit ones, if we can calculate it |
| if rws.handlerDone && rws.snapHeader.Get("Content-Length") == "" { |
| clen = strconv.Itoa(len(p)) |
| } |
| if rws.snapHeader.Get("Content-Type") == "" { |
| ctype = DetectContentType(p) |
| } |
| endStream := rws.handlerDone && len(p) == 0 |
| rws.conn.writeHeaders(rws.stream, &http2writeResHeaders{ |
| streamID: rws.stream.id, |
| httpResCode: rws.status, |
| h: rws.snapHeader, |
| endStream: endStream, |
| contentType: ctype, |
| contentLength: clen, |
| }) |
| if endStream { |
| return 0, nil |
| } |
| } |
| if len(p) == 0 && !rws.handlerDone { |
| return 0, nil |
| } |
| curWrite := &rws.curWrite |
| curWrite.streamID = rws.stream.id |
| curWrite.p = p |
| curWrite.endStream = rws.handlerDone |
| if err := rws.conn.writeDataFromHandler(rws.stream, curWrite); err != nil { |
| return 0, err |
| } |
| return len(p), nil |
| } |
| |
| func (w *http2responseWriter) Flush() { |
| rws := w.rws |
| if rws == nil { |
| panic("Header called after Handler finished") |
| } |
| if rws.bw.Buffered() > 0 { |
| if err := rws.bw.Flush(); err != nil { |
| |
| return |
| } |
| } else { |
| |
| rws.writeChunk(nil) |
| } |
| } |
| |
| func (w *http2responseWriter) CloseNotify() <-chan bool { |
| rws := w.rws |
| if rws == nil { |
| panic("CloseNotify called after Handler finished") |
| } |
| rws.closeNotifierMu.Lock() |
| ch := rws.closeNotifierCh |
| if ch == nil { |
| ch = make(chan bool, 1) |
| rws.closeNotifierCh = ch |
| go func() { |
| rws.stream.cw.Wait() |
| ch <- true |
| }() |
| } |
| rws.closeNotifierMu.Unlock() |
| return ch |
| } |
| |
| func (w *http2responseWriter) Header() Header { |
| rws := w.rws |
| if rws == nil { |
| panic("Header called after Handler finished") |
| } |
| if rws.handlerHeader == nil { |
| rws.handlerHeader = make(Header) |
| } |
| return rws.handlerHeader |
| } |
| |
| func (w *http2responseWriter) WriteHeader(code int) { |
| rws := w.rws |
| if rws == nil { |
| panic("WriteHeader called after Handler finished") |
| } |
| rws.writeHeader(code) |
| } |
| |
| func (rws *http2responseWriterState) writeHeader(code int) { |
| if !rws.wroteHeader { |
| rws.wroteHeader = true |
| rws.status = code |
| if len(rws.handlerHeader) > 0 { |
| rws.snapHeader = http2cloneHeader(rws.handlerHeader) |
| } |
| } |
| } |
| |
| func http2cloneHeader(h Header) Header { |
| h2 := make(Header, len(h)) |
| for k, vv := range h { |
| vv2 := make([]string, len(vv)) |
| copy(vv2, vv) |
| h2[k] = vv2 |
| } |
| return h2 |
| } |
| |
| // The Life Of A Write is like this: |
| // |
| // * Handler calls w.Write or w.WriteString -> |
| // * -> rws.bw (*bufio.Writer) -> |
| // * (Handler migth call Flush) |
| // * -> chunkWriter{rws} |
| // * -> responseWriterState.writeChunk(p []byte) |
| // * -> responseWriterState.writeChunk (most of the magic; see comment there) |
| func (w *http2responseWriter) Write(p []byte) (n int, err error) { |
| return w.write(len(p), p, "") |
| } |
| |
| func (w *http2responseWriter) WriteString(s string) (n int, err error) { |
| return w.write(len(s), nil, s) |
| } |
| |
| // either dataB or dataS is non-zero. |
| func (w *http2responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) { |
| rws := w.rws |
| if rws == nil { |
| panic("Write called after Handler finished") |
| } |
| if !rws.wroteHeader { |
| w.WriteHeader(200) |
| } |
| if dataB != nil { |
| return rws.bw.Write(dataB) |
| } else { |
| return rws.bw.WriteString(dataS) |
| } |
| } |
| |
| func (w *http2responseWriter) handlerDone() { |
| rws := w.rws |
| if rws == nil { |
| panic("handlerDone called twice") |
| } |
| rws.handlerDone = true |
| w.Flush() |
| w.rws = nil |
| http2responseWriterStatePool.Put(rws) |
| } |
| |
| type http2Transport struct { |
| Fallback RoundTripper |
| |
| // TODO: remove this and make more general with a TLS dial hook, like http |
| InsecureTLSDial bool |
| |
| connMu sync.Mutex |
| conns map[string][]*http2clientConn // key is host:port |
| } |
| |
| type http2clientConn struct { |
| t *http2Transport |
| tconn *tls.Conn |
| tlsState *tls.ConnectionState |
| connKey []string // key(s) this connection is cached in, in t.conns |
| |
| readerDone chan struct{} // closed on error |
| readerErr error // set before readerDone is closed |
| hdec *hpack.Decoder |
| nextRes *Response |
| |
| mu sync.Mutex |
| closed bool |
| goAway *http2GoAwayFrame // if non-nil, the GoAwayFrame we received |
| streams map[uint32]*http2clientStream |
| nextStreamID uint32 |
| bw *bufio.Writer |
| werr error // first write error that has occurred |
| br *bufio.Reader |
| fr *http2Framer |
| // Settings from peer: |
| maxFrameSize uint32 |
| maxConcurrentStreams uint32 |
| initialWindowSize uint32 |
| hbuf bytes.Buffer // HPACK encoder writes into this |
| henc *hpack.Encoder |
| } |
| |
| type http2clientStream struct { |
| ID uint32 |
| resc chan http2resAndError |
| pw *io.PipeWriter |
| pr *io.PipeReader |
| } |
| |
| type http2stickyErrWriter struct { |
| w io.Writer |
| err *error |
| } |
| |
| func (sew http2stickyErrWriter) Write(p []byte) (n int, err error) { |
| if *sew.err != nil { |
| return 0, *sew.err |
| } |
| n, err = sew.w.Write(p) |
| *sew.err = err |
| return |
| } |
| |
| func (t *http2Transport) RoundTrip(req *Request) (*Response, error) { |
| if req.URL.Scheme != "https" { |
| if t.Fallback == nil { |
| return nil, errors.New("http2: unsupported scheme and no Fallback") |
| } |
| return t.Fallback.RoundTrip(req) |
| } |
| |
| host, port, err := net.SplitHostPort(req.URL.Host) |
| if err != nil { |
| host = req.URL.Host |
| port = "443" |
| } |
| |
| for { |
| cc, err := t.getClientConn(host, port) |
| if err != nil { |
| return nil, err |
| } |
| res, err := cc.roundTrip(req) |
| if http2shouldRetryRequest(err) { |
| continue |
| } |
| if err != nil { |
| return nil, err |
| } |
| return res, nil |
| } |
| } |
| |
| // CloseIdleConnections closes any connections which were previously |
| // connected from previous requests but are now sitting idle. |
| // It does not interrupt any connections currently in use. |
| func (t *http2Transport) CloseIdleConnections() { |
| t.connMu.Lock() |
| defer t.connMu.Unlock() |
| for _, vv := range t.conns { |
| for _, cc := range vv { |
| cc.closeIfIdle() |
| } |
| } |
| } |
| |
| var http2errClientConnClosed = errors.New("http2: client conn is closed") |
| |
| func http2shouldRetryRequest(err error) bool { |
| |
| return err == http2errClientConnClosed |
| } |
| |
| func (t *http2Transport) removeClientConn(cc *http2clientConn) { |
| t.connMu.Lock() |
| defer t.connMu.Unlock() |
| for _, key := range cc.connKey { |
| vv, ok := t.conns[key] |
| if !ok { |
| continue |
| } |
| newList := http2filterOutClientConn(vv, cc) |
| if len(newList) > 0 { |
| t.conns[key] = newList |
| } else { |
| delete(t.conns, key) |
| } |
| } |
| } |
| |
| func http2filterOutClientConn(in []*http2clientConn, exclude *http2clientConn) []*http2clientConn { |
| out := in[:0] |
| for _, v := range in { |
| if v != exclude { |
| out = append(out, v) |
| } |
| } |
| return out |
| } |
| |
| func (t *http2Transport) getClientConn(host, port string) (*http2clientConn, error) { |
| t.connMu.Lock() |
| defer t.connMu.Unlock() |
| |
| key := net.JoinHostPort(host, port) |
| |
| for _, cc := range t.conns[key] { |
| if cc.canTakeNewRequest() { |
| return cc, nil |
| } |
| } |
| if t.conns == nil { |
| t.conns = make(map[string][]*http2clientConn) |
| } |
| cc, err := t.newClientConn(host, port, key) |
| if err != nil { |
| return nil, err |
| } |
| t.conns[key] = append(t.conns[key], cc) |
| return cc, nil |
| } |
| |
| func (t *http2Transport) newClientConn(host, port, key string) (*http2clientConn, error) { |
| cfg := &tls.Config{ |
| ServerName: host, |
| NextProtos: []string{http2NextProtoTLS}, |
| InsecureSkipVerify: t.InsecureTLSDial, |
| } |
| tconn, err := tls.Dial("tcp", net.JoinHostPort(host, port), cfg) |
| if err != nil { |
| return nil, err |
| } |
| if err := tconn.Handshake(); err != nil { |
| return nil, err |
| } |
| if !t.InsecureTLSDial { |
| if err := tconn.VerifyHostname(cfg.ServerName); err != nil { |
| return nil, err |
| } |
| } |
| state := tconn.ConnectionState() |
| if p := state.NegotiatedProtocol; p != http2NextProtoTLS { |
| |
| return nil, fmt.Errorf("bad protocol: %v", p) |
| } |
| if !state.NegotiatedProtocolIsMutual { |
| return nil, errors.New("could not negotiate protocol mutually") |
| } |
| if _, err := tconn.Write(http2clientPreface); err != nil { |
| return nil, err |
| } |
| |
| cc := &http2clientConn{ |
| t: t, |
| tconn: tconn, |
| connKey: []string{key}, |
| tlsState: &state, |
| readerDone: make(chan struct{}), |
| nextStreamID: 1, |
| maxFrameSize: 16 << 10, |
| initialWindowSize: 65535, |
| maxConcurrentStreams: 1000, |
| streams: make(map[uint32]*http2clientStream), |
| } |
| cc.bw = bufio.NewWriter(http2stickyErrWriter{tconn, &cc.werr}) |
| cc.br = bufio.NewReader(tconn) |
| cc.fr = http2NewFramer(cc.bw, cc.br) |
| cc.henc = hpack.NewEncoder(&cc.hbuf) |
| |
| cc.fr.WriteSettings() |
| |
| cc.fr.WriteWindowUpdate(0, 1<<30) |
| cc.bw.Flush() |
| if cc.werr != nil { |
| return nil, cc.werr |
| } |
| |
| f, err := cc.fr.ReadFrame() |
| if err != nil { |
| return nil, err |
| } |
| sf, ok := f.(*http2SettingsFrame) |
| if !ok { |
| return nil, fmt.Errorf("expected settings frame, got: %T", f) |
| } |
| cc.fr.WriteSettingsAck() |
| cc.bw.Flush() |
| |
| sf.ForeachSetting(func(s http2Setting) error { |
| switch s.ID { |
| case http2SettingMaxFrameSize: |
| cc.maxFrameSize = s.Val |
| case http2SettingMaxConcurrentStreams: |
| cc.maxConcurrentStreams = s.Val |
| case http2SettingInitialWindowSize: |
| cc.initialWindowSize = s.Val |
| default: |
| |
| log.Printf("Unhandled Setting: %v", s) |
| } |
| return nil |
| }) |
| |
| cc.hdec = hpack.NewDecoder(http2initialHeaderTableSize, cc.onNewHeaderField) |
| |
| go cc.readLoop() |
| return cc, nil |
| } |
| |
| func (cc *http2clientConn) setGoAway(f *http2GoAwayFrame) { |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| cc.goAway = f |
| } |
| |
| func (cc *http2clientConn) canTakeNewRequest() bool { |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| return cc.goAway == nil && |
| int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && |
| cc.nextStreamID < 2147483647 |
| } |
| |
| func (cc *http2clientConn) closeIfIdle() { |
| cc.mu.Lock() |
| if len(cc.streams) > 0 { |
| cc.mu.Unlock() |
| return |
| } |
| cc.closed = true |
| |
| cc.mu.Unlock() |
| |
| cc.tconn.Close() |
| } |
| |
| func (cc *http2clientConn) roundTrip(req *Request) (*Response, error) { |
| cc.mu.Lock() |
| |
| if cc.closed { |
| cc.mu.Unlock() |
| return nil, http2errClientConnClosed |
| } |
| |
| cs := cc.newStream() |
| hasBody := false |
| |
| hdrs := cc.encodeHeaders(req) |
| first := true |
| for len(hdrs) > 0 { |
| chunk := hdrs |
| if len(chunk) > int(cc.maxFrameSize) { |
| chunk = chunk[:cc.maxFrameSize] |
| } |
| hdrs = hdrs[len(chunk):] |
| endHeaders := len(hdrs) == 0 |
| if first { |
| cc.fr.WriteHeaders(http2HeadersFrameParam{ |
| StreamID: cs.ID, |
| BlockFragment: chunk, |
| EndStream: !hasBody, |
| EndHeaders: endHeaders, |
| }) |
| first = false |
| } else { |
| cc.fr.WriteContinuation(cs.ID, endHeaders, chunk) |
| } |
| } |
| cc.bw.Flush() |
| werr := cc.werr |
| cc.mu.Unlock() |
| |
| if hasBody { |
| |
| } |
| |
| if werr != nil { |
| return nil, werr |
| } |
| |
| re := <-cs.resc |
| if re.err != nil { |
| return nil, re.err |
| } |
| res := re.res |
| res.Request = req |
| res.TLS = cc.tlsState |
| return res, nil |
| } |
| |
| // requires cc.mu be held. |
| func (cc *http2clientConn) encodeHeaders(req *Request) []byte { |
| cc.hbuf.Reset() |
| |
| host := req.Host |
| if host == "" { |
| host = req.URL.Host |
| } |
| |
| path := req.URL.Path |
| if path == "" { |
| path = "/" |
| } |
| |
| cc.writeHeader(":authority", host) |
| cc.writeHeader(":method", req.Method) |
| cc.writeHeader(":path", path) |
| cc.writeHeader(":scheme", "https") |
| |
| for k, vv := range req.Header { |
| lowKey := strings.ToLower(k) |
| if lowKey == "host" { |
| continue |
| } |
| for _, v := range vv { |
| cc.writeHeader(lowKey, v) |
| } |
| } |
| return cc.hbuf.Bytes() |
| } |
| |
| func (cc *http2clientConn) writeHeader(name, value string) { |
| log.Printf("sending %q = %q", name, value) |
| cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value}) |
| } |
| |
| type http2resAndError struct { |
| res *Response |
| err error |
| } |
| |
| // requires cc.mu be held. |
| func (cc *http2clientConn) newStream() *http2clientStream { |
| cs := &http2clientStream{ |
| ID: cc.nextStreamID, |
| resc: make(chan http2resAndError, 1), |
| } |
| cc.nextStreamID += 2 |
| cc.streams[cs.ID] = cs |
| return cs |
| } |
| |
| func (cc *http2clientConn) streamByID(id uint32, andRemove bool) *http2clientStream { |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| cs := cc.streams[id] |
| if andRemove { |
| delete(cc.streams, id) |
| } |
| return cs |
| } |
| |
| // runs in its own goroutine. |
| func (cc *http2clientConn) readLoop() { |
| defer cc.t.removeClientConn(cc) |
| defer close(cc.readerDone) |
| |
| activeRes := map[uint32]*http2clientStream{} |
| |
| defer func() { |
| err := cc.readerErr |
| if err == io.EOF { |
| err = io.ErrUnexpectedEOF |
| } |
| for _, cs := range activeRes { |
| cs.pw.CloseWithError(err) |
| } |
| }() |
| |
| // continueStreamID is the stream ID we're waiting for |
| // continuation frames for. |
| var continueStreamID uint32 |
| |
| for { |
| f, err := cc.fr.ReadFrame() |
| if err != nil { |
| cc.readerErr = err |
| return |
| } |
| log.Printf("Transport received %v: %#v", f.Header(), f) |
| |
| streamID := f.Header().StreamID |
| |
| _, isContinue := f.(*http2ContinuationFrame) |
| if isContinue { |
| if streamID != continueStreamID { |
| log.Printf("Protocol violation: got CONTINUATION with id %d; want %d", streamID, continueStreamID) |
| cc.readerErr = http2ConnectionError(http2ErrCodeProtocol) |
| return |
| } |
| } else if continueStreamID != 0 { |
| |
| log.Printf("Protocol violation: got %T for stream %d, want CONTINUATION for %d", f, streamID, continueStreamID) |
| cc.readerErr = http2ConnectionError(http2ErrCodeProtocol) |
| return |
| } |
| |
| if streamID%2 == 0 { |
| |
| continue |
| } |
| streamEnded := false |
| if ff, ok := f.(http2streamEnder); ok { |
| streamEnded = ff.StreamEnded() |
| } |
| |
| cs := cc.streamByID(streamID, streamEnded) |
| if cs == nil { |
| log.Printf("Received frame for untracked stream ID %d", streamID) |
| continue |
| } |
| |
| switch f := f.(type) { |
| case *http2HeadersFrame: |
| cc.nextRes = &Response{ |
| Proto: "HTTP/2.0", |
| ProtoMajor: 2, |
| Header: make(Header), |
| } |
| cs.pr, cs.pw = io.Pipe() |
| cc.hdec.Write(f.HeaderBlockFragment()) |
| case *http2ContinuationFrame: |
| cc.hdec.Write(f.HeaderBlockFragment()) |
| case *http2DataFrame: |
| log.Printf("DATA: %q", f.Data()) |
| cs.pw.Write(f.Data()) |
| case *http2GoAwayFrame: |
| cc.t.removeClientConn(cc) |
| if f.ErrCode != 0 { |
| |
| log.Printf("transport got GOAWAY with error code = %v", f.ErrCode) |
| } |
| cc.setGoAway(f) |
| default: |
| log.Printf("Transport: unhandled response frame type %T", f) |
| } |
| headersEnded := false |
| if he, ok := f.(http2headersEnder); ok { |
| headersEnded = he.HeadersEnded() |
| if headersEnded { |
| continueStreamID = 0 |
| } else { |
| continueStreamID = streamID |
| } |
| } |
| |
| if streamEnded { |
| cs.pw.Close() |
| delete(activeRes, streamID) |
| } |
| if headersEnded { |
| if cs == nil { |
| panic("couldn't find stream") |
| } |
| |
| cc.nextRes.Body = cs.pr |
| res := cc.nextRes |
| activeRes[streamID] = cs |
| cs.resc <- http2resAndError{res: res} |
| } |
| } |
| } |
| |
| func (cc *http2clientConn) onNewHeaderField(f hpack.HeaderField) { |
| |
| log.Printf("Header field: %+v", f) |
| if f.Name == ":status" { |
| code, err := strconv.Atoi(f.Value) |
| if err != nil { |
| panic("TODO: be graceful") |
| } |
| cc.nextRes.Status = f.Value + " " + StatusText(code) |
| cc.nextRes.StatusCode = code |
| return |
| } |
| if strings.HasPrefix(f.Name, ":") { |
| |
| return |
| } |
| cc.nextRes.Header.Add(CanonicalHeaderKey(f.Name), f.Value) |
| } |
| |
| // writeFramer is implemented by any type that is used to write frames. |
| type http2writeFramer interface { |
| writeFrame(http2writeContext) error |
| } |
| |
| // writeContext is the interface needed by the various frame writer |
| // types below. All the writeFrame methods below are scheduled via the |
| // frame writing scheduler (see writeScheduler in writesched.go). |
| // |
| // This interface is implemented by *serverConn. |
| // TODO: use it from the client code too, once it exists. |
| type http2writeContext interface { |
| Framer() *http2Framer |
| Flush() error |
| CloseConn() error |
| // HeaderEncoder returns an HPACK encoder that writes to the |
| // returned buffer. |
| HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) |
| } |
| |
| // endsStream reports whether the given frame writer w will locally |
| // close the stream. |
| func http2endsStream(w http2writeFramer) bool { |
| switch v := w.(type) { |
| case *http2writeData: |
| return v.endStream |
| case *http2writeResHeaders: |
| return v.endStream |
| } |
| return false |
| } |
| |
| type http2flushFrameWriter struct{} |
| |
| func (http2flushFrameWriter) writeFrame(ctx http2writeContext) error { |
| return ctx.Flush() |
| } |
| |
| type http2writeSettings []http2Setting |
| |
| func (s http2writeSettings) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteSettings([]http2Setting(s)...) |
| } |
| |
| type http2writeGoAway struct { |
| maxStreamID uint32 |
| code http2ErrCode |
| } |
| |
| func (p *http2writeGoAway) writeFrame(ctx http2writeContext) error { |
| err := ctx.Framer().WriteGoAway(p.maxStreamID, p.code, nil) |
| if p.code != 0 { |
| ctx.Flush() |
| time.Sleep(50 * time.Millisecond) |
| ctx.CloseConn() |
| } |
| return err |
| } |
| |
| type http2writeData struct { |
| streamID uint32 |
| p []byte |
| endStream bool |
| } |
| |
| func (w *http2writeData) String() string { |
| return fmt.Sprintf("writeData(stream=%d, p=%d, endStream=%v)", w.streamID, len(w.p), w.endStream) |
| } |
| |
| func (w *http2writeData) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteData(w.streamID, w.endStream, w.p) |
| } |
| |
| func (se http2StreamError) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteRSTStream(se.StreamID, se.Code) |
| } |
| |
| type http2writePingAck struct{ pf *http2PingFrame } |
| |
| func (w http2writePingAck) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WritePing(true, w.pf.Data) |
| } |
| |
| type http2writeSettingsAck struct{} |
| |
| func (http2writeSettingsAck) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteSettingsAck() |
| } |
| |
| // writeResHeaders is a request to write a HEADERS and 0+ CONTINUATION frames |
| // for HTTP response headers from a server handler. |
| type http2writeResHeaders struct { |
| streamID uint32 |
| httpResCode int |
| h Header // may be nil |
| endStream bool |
| |
| contentType string |
| contentLength string |
| } |
| |
| func (w *http2writeResHeaders) writeFrame(ctx http2writeContext) error { |
| enc, buf := ctx.HeaderEncoder() |
| buf.Reset() |
| enc.WriteField(hpack.HeaderField{Name: ":status", Value: http2httpCodeString(w.httpResCode)}) |
| for k, vv := range w.h { |
| k = http2lowerHeader(k) |
| for _, v := range vv { |
| |
| if k == "transfer-encoding" && v != "trailers" { |
| continue |
| } |
| enc.WriteField(hpack.HeaderField{Name: k, Value: v}) |
| } |
| } |
| if w.contentType != "" { |
| enc.WriteField(hpack.HeaderField{Name: "content-type", Value: w.contentType}) |
| } |
| if w.contentLength != "" { |
| enc.WriteField(hpack.HeaderField{Name: "content-length", Value: w.contentLength}) |
| } |
| |
| headerBlock := buf.Bytes() |
| if len(headerBlock) == 0 { |
| panic("unexpected empty hpack") |
| } |
| |
| // For now we're lazy and just pick the minimum MAX_FRAME_SIZE |
| // that all peers must support (16KB). Later we could care |
| // more and send larger frames if the peer advertised it, but |
| // there's little point. Most headers are small anyway (so we |
| // generally won't have CONTINUATION frames), and extra frames |
| // only waste 9 bytes anyway. |
| const maxFrameSize = 16384 |
| |
| first := true |
| for len(headerBlock) > 0 { |
| frag := headerBlock |
| if len(frag) > maxFrameSize { |
| frag = frag[:maxFrameSize] |
| } |
| headerBlock = headerBlock[len(frag):] |
| endHeaders := len(headerBlock) == 0 |
| var err error |
| if first { |
| first = false |
| err = ctx.Framer().WriteHeaders(http2HeadersFrameParam{ |
| StreamID: w.streamID, |
| BlockFragment: frag, |
| EndStream: w.endStream, |
| EndHeaders: endHeaders, |
| }) |
| } else { |
| err = ctx.Framer().WriteContinuation(w.streamID, endHeaders, frag) |
| } |
| if err != nil { |
| return err |
| } |
| } |
| return nil |
| } |
| |
| type http2write100ContinueHeadersFrame struct { |
| streamID uint32 |
| } |
| |
| func (w http2write100ContinueHeadersFrame) writeFrame(ctx http2writeContext) error { |
| enc, buf := ctx.HeaderEncoder() |
| buf.Reset() |
| enc.WriteField(hpack.HeaderField{Name: ":status", Value: "100"}) |
| return ctx.Framer().WriteHeaders(http2HeadersFrameParam{ |
| StreamID: w.streamID, |
| BlockFragment: buf.Bytes(), |
| EndStream: false, |
| EndHeaders: true, |
| }) |
| } |
| |
| type http2writeWindowUpdate struct { |
| streamID uint32 // or 0 for conn-level |
| n uint32 |
| } |
| |
| func (wu http2writeWindowUpdate) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteWindowUpdate(wu.streamID, wu.n) |
| } |
| |
| // frameWriteMsg is a request to write a frame. |
| type http2frameWriteMsg struct { |
| // write is the interface value that does the writing, once the |
| // writeScheduler (below) has decided to select this frame |
| // to write. The write functions are all defined in write.go. |
| write http2writeFramer |
| |
| stream *http2stream // used for prioritization. nil for non-stream frames. |
| |
| // done, if non-nil, must be a buffered channel with space for |
| // 1 message and is sent the return value from write (or an |
| // earlier error) when the frame has been written. |
| done chan error |
| } |
| |
| // for debugging only: |
| func (wm http2frameWriteMsg) String() string { |
| var streamID uint32 |
| if wm.stream != nil { |
| streamID = wm.stream.id |
| } |
| var des string |
| if s, ok := wm.write.(fmt.Stringer); ok { |
| des = s.String() |
| } else { |
| des = fmt.Sprintf("%T", wm.write) |
| } |
| return fmt.Sprintf("[frameWriteMsg stream=%d, ch=%v, type: %v]", streamID, wm.done != nil, des) |
| } |
| |
| // writeScheduler tracks pending frames to write, priorities, and decides |
| // the next one to use. It is not thread-safe. |
| type http2writeScheduler struct { |
| // zero are frames not associated with a specific stream. |
| // They're sent before any stream-specific freams. |
| zero http2writeQueue |
| |
| // maxFrameSize is the maximum size of a DATA frame |
| // we'll write. Must be non-zero and between 16K-16M. |
| maxFrameSize uint32 |
| |
| // sq contains the stream-specific queues, keyed by stream ID. |
| // when a stream is idle, it's deleted from the map. |
| sq map[uint32]*http2writeQueue |
| |
| // canSend is a slice of memory that's reused between frame |
| // scheduling decisions to hold the list of writeQueues (from sq) |
| // which have enough flow control data to send. After canSend is |
| // built, the best is selected. |
| canSend []*http2writeQueue |
| |
| // pool of empty queues for reuse. |
| queuePool []*http2writeQueue |
| } |
| |
| func (ws *http2writeScheduler) putEmptyQueue(q *http2writeQueue) { |
| if len(q.s) != 0 { |
| panic("queue must be empty") |
| } |
| ws.queuePool = append(ws.queuePool, q) |
| } |
| |
| func (ws *http2writeScheduler) getEmptyQueue() *http2writeQueue { |
| ln := len(ws.queuePool) |
| if ln == 0 { |
| return new(http2writeQueue) |
| } |
| q := ws.queuePool[ln-1] |
| ws.queuePool = ws.queuePool[:ln-1] |
| return q |
| } |
| |
| func (ws *http2writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 } |
| |
| func (ws *http2writeScheduler) add(wm http2frameWriteMsg) { |
| st := wm.stream |
| if st == nil { |
| ws.zero.push(wm) |
| } else { |
| ws.streamQueue(st.id).push(wm) |
| } |
| } |
| |
| func (ws *http2writeScheduler) streamQueue(streamID uint32) *http2writeQueue { |
| if q, ok := ws.sq[streamID]; ok { |
| return q |
| } |
| if ws.sq == nil { |
| ws.sq = make(map[uint32]*http2writeQueue) |
| } |
| q := ws.getEmptyQueue() |
| ws.sq[streamID] = q |
| return q |
| } |
| |
| // take returns the most important frame to write and removes it from the scheduler. |
| // It is illegal to call this if the scheduler is empty or if there are no connection-level |
| // flow control bytes available. |
| func (ws *http2writeScheduler) take() (wm http2frameWriteMsg, ok bool) { |
| if ws.maxFrameSize == 0 { |
| panic("internal error: ws.maxFrameSize not initialized or invalid") |
| } |
| |
| if !ws.zero.empty() { |
| return ws.zero.shift(), true |
| } |
| if len(ws.sq) == 0 { |
| return |
| } |
| |
| for id, q := range ws.sq { |
| if q.firstIsNoCost() { |
| return ws.takeFrom(id, q) |
| } |
| } |
| |
| if len(ws.canSend) != 0 { |
| panic("should be empty") |
| } |
| for _, q := range ws.sq { |
| if n := ws.streamWritableBytes(q); n > 0 { |
| ws.canSend = append(ws.canSend, q) |
| } |
| } |
| if len(ws.canSend) == 0 { |
| return |
| } |
| defer ws.zeroCanSend() |
| |
| q := ws.canSend[0] |
| |
| return ws.takeFrom(q.streamID(), q) |
| } |
| |
| // zeroCanSend is defered from take. |
| func (ws *http2writeScheduler) zeroCanSend() { |
| for i := range ws.canSend { |
| ws.canSend[i] = nil |
| } |
| ws.canSend = ws.canSend[:0] |
| } |
| |
| // streamWritableBytes returns the number of DATA bytes we could write |
| // from the given queue's stream, if this stream/queue were |
| // selected. It is an error to call this if q's head isn't a |
| // *writeData. |
| func (ws *http2writeScheduler) streamWritableBytes(q *http2writeQueue) int32 { |
| wm := q.head() |
| ret := wm.stream.flow.available() |
| if ret == 0 { |
| return 0 |
| } |
| if int32(ws.maxFrameSize) < ret { |
| ret = int32(ws.maxFrameSize) |
| } |
| if ret == 0 { |
| panic("internal error: ws.maxFrameSize not initialized or invalid") |
| } |
| wd := wm.write.(*http2writeData) |
| if len(wd.p) < int(ret) { |
| ret = int32(len(wd.p)) |
| } |
| return ret |
| } |
| |
| func (ws *http2writeScheduler) takeFrom(id uint32, q *http2writeQueue) (wm http2frameWriteMsg, ok bool) { |
| wm = q.head() |
| |
| if wd, ok := wm.write.(*http2writeData); ok && len(wd.p) > 0 { |
| allowed := wm.stream.flow.available() |
| if allowed == 0 { |
| |
| return http2frameWriteMsg{}, false |
| } |
| if int32(ws.maxFrameSize) < allowed { |
| allowed = int32(ws.maxFrameSize) |
| } |
| |
| if len(wd.p) > int(allowed) { |
| wm.stream.flow.take(allowed) |
| chunk := wd.p[:allowed] |
| wd.p = wd.p[allowed:] |
| |
| return http2frameWriteMsg{ |
| stream: wm.stream, |
| write: &http2writeData{ |
| streamID: wd.streamID, |
| p: chunk, |
| |
| endStream: false, |
| }, |
| |
| done: nil, |
| }, true |
| } |
| wm.stream.flow.take(int32(len(wd.p))) |
| } |
| |
| q.shift() |
| if q.empty() { |
| ws.putEmptyQueue(q) |
| delete(ws.sq, id) |
| } |
| return wm, true |
| } |
| |
| func (ws *http2writeScheduler) forgetStream(id uint32) { |
| q, ok := ws.sq[id] |
| if !ok { |
| return |
| } |
| delete(ws.sq, id) |
| |
| for i := range q.s { |
| q.s[i] = http2frameWriteMsg{} |
| } |
| q.s = q.s[:0] |
| ws.putEmptyQueue(q) |
| } |
| |
| type http2writeQueue struct { |
| s []http2frameWriteMsg |
| } |
| |
| // streamID returns the stream ID for a non-empty stream-specific queue. |
| func (q *http2writeQueue) streamID() uint32 { return q.s[0].stream.id } |
| |
| func (q *http2writeQueue) empty() bool { return len(q.s) == 0 } |
| |
| func (q *http2writeQueue) push(wm http2frameWriteMsg) { |
| q.s = append(q.s, wm) |
| } |
| |
| // head returns the next item that would be removed by shift. |
| func (q *http2writeQueue) head() http2frameWriteMsg { |
| if len(q.s) == 0 { |
| panic("invalid use of queue") |
| } |
| return q.s[0] |
| } |
| |
| func (q *http2writeQueue) shift() http2frameWriteMsg { |
| if len(q.s) == 0 { |
| panic("invalid use of queue") |
| } |
| wm := q.s[0] |
| |
| copy(q.s, q.s[1:]) |
| q.s[len(q.s)-1] = http2frameWriteMsg{} |
| q.s = q.s[:len(q.s)-1] |
| return wm |
| } |
| |
| func (q *http2writeQueue) firstIsNoCost() bool { |
| if df, ok := q.s[0].write.(*http2writeData); ok { |
| return len(df.p) == 0 |
| } |
| return true |
| } |