| // Code generated by golang.org/x/tools/cmd/bundle. DO NOT EDIT. |
| //go:generate bundle -o h2_bundle.go -prefix http2 -underscore golang.org/x/net/http2 |
| |
| // Package http2 implements the HTTP/2 protocol. |
| // |
| // This package is low-level and intended to be used directly by very |
| // few people. Most users will use it indirectly through the automatic |
| // use by the net/http package (from Go 1.6 and later). |
| // For use in earlier Go versions see ConfigureServer. (Transport support |
| // requires Go 1.6 or later) |
| // |
| // See https://http2.github.io/ for more information on HTTP/2. |
| // |
| // See https://http2.golang.org/ for a test server running this code. |
| // |
| |
| package http |
| |
| import ( |
| "bufio" |
| "bytes" |
| "compress/gzip" |
| "context" |
| "crypto/rand" |
| "crypto/tls" |
| "encoding/binary" |
| "errors" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "log" |
| "math" |
| "net" |
| "net/http/httptrace" |
| "net/textproto" |
| "net/url" |
| "os" |
| "reflect" |
| "runtime" |
| "sort" |
| "strconv" |
| "strings" |
| "sync" |
| "time" |
| |
| "golang_org/x/net/http2/hpack" |
| "golang_org/x/net/idna" |
| "golang_org/x/net/lex/httplex" |
| ) |
| |
| // ClientConnPool manages a pool of HTTP/2 client connections. |
| type http2ClientConnPool interface { |
| GetClientConn(req *Request, addr string) (*http2ClientConn, error) |
| MarkDead(*http2ClientConn) |
| } |
| |
| // clientConnPoolIdleCloser is the interface implemented by ClientConnPool |
| // implementations which can close their idle connections. |
| type http2clientConnPoolIdleCloser interface { |
| http2ClientConnPool |
| closeIdleConnections() |
| } |
| |
| var ( |
| _ http2clientConnPoolIdleCloser = (*http2clientConnPool)(nil) |
| _ http2clientConnPoolIdleCloser = http2noDialClientConnPool{} |
| ) |
| |
| // TODO: use singleflight for dialing and addConnCalls? |
| type http2clientConnPool struct { |
| t *http2Transport |
| |
| mu sync.Mutex // TODO: maybe switch to RWMutex |
| // TODO: add support for sharing conns based on cert names |
| // (e.g. share conn for googleapis.com and appspot.com) |
| conns map[string][]*http2ClientConn // key is host:port |
| dialing map[string]*http2dialCall // currently in-flight dials |
| keys map[*http2ClientConn][]string |
| addConnCalls map[string]*http2addConnCall // in-flight addConnIfNeede calls |
| } |
| |
| func (p *http2clientConnPool) GetClientConn(req *Request, addr string) (*http2ClientConn, error) { |
| return p.getClientConn(req, addr, http2dialOnMiss) |
| } |
| |
| const ( |
| http2dialOnMiss = true |
| http2noDialOnMiss = false |
| ) |
| |
| func (p *http2clientConnPool) getClientConn(req *Request, addr string, dialOnMiss bool) (*http2ClientConn, error) { |
| if http2isConnectionCloseRequest(req) && dialOnMiss { |
| // It gets its own connection. |
| const singleUse = true |
| cc, err := p.t.dialClientConn(addr, singleUse) |
| if err != nil { |
| return nil, err |
| } |
| return cc, nil |
| } |
| p.mu.Lock() |
| for _, cc := range p.conns[addr] { |
| if cc.CanTakeNewRequest() { |
| p.mu.Unlock() |
| return cc, nil |
| } |
| } |
| if !dialOnMiss { |
| p.mu.Unlock() |
| return nil, http2ErrNoCachedConn |
| } |
| call := p.getStartDialLocked(addr) |
| p.mu.Unlock() |
| <-call.done |
| return call.res, call.err |
| } |
| |
| // dialCall is an in-flight Transport dial call to a host. |
| type http2dialCall struct { |
| p *http2clientConnPool |
| done chan struct{} // closed when done |
| res *http2ClientConn // valid after done is closed |
| err error // valid after done is closed |
| } |
| |
| // requires p.mu is held. |
| func (p *http2clientConnPool) getStartDialLocked(addr string) *http2dialCall { |
| if call, ok := p.dialing[addr]; ok { |
| |
| return call |
| } |
| call := &http2dialCall{p: p, done: make(chan struct{})} |
| if p.dialing == nil { |
| p.dialing = make(map[string]*http2dialCall) |
| } |
| p.dialing[addr] = call |
| go call.dial(addr) |
| return call |
| } |
| |
| // run in its own goroutine. |
| func (c *http2dialCall) dial(addr string) { |
| const singleUse = false // shared conn |
| c.res, c.err = c.p.t.dialClientConn(addr, singleUse) |
| close(c.done) |
| |
| c.p.mu.Lock() |
| delete(c.p.dialing, addr) |
| if c.err == nil { |
| c.p.addConnLocked(addr, c.res) |
| } |
| c.p.mu.Unlock() |
| } |
| |
| // addConnIfNeeded makes a NewClientConn out of c if a connection for key doesn't |
| // already exist. It coalesces concurrent calls with the same key. |
| // This is used by the http1 Transport code when it creates a new connection. Because |
| // the http1 Transport doesn't de-dup TCP dials to outbound hosts (because it doesn't know |
| // the protocol), it can get into a situation where it has multiple TLS connections. |
| // This code decides which ones live or die. |
| // The return value used is whether c was used. |
| // c is never closed. |
| func (p *http2clientConnPool) addConnIfNeeded(key string, t *http2Transport, c *tls.Conn) (used bool, err error) { |
| p.mu.Lock() |
| for _, cc := range p.conns[key] { |
| if cc.CanTakeNewRequest() { |
| p.mu.Unlock() |
| return false, nil |
| } |
| } |
| call, dup := p.addConnCalls[key] |
| if !dup { |
| if p.addConnCalls == nil { |
| p.addConnCalls = make(map[string]*http2addConnCall) |
| } |
| call = &http2addConnCall{ |
| p: p, |
| done: make(chan struct{}), |
| } |
| p.addConnCalls[key] = call |
| go call.run(t, key, c) |
| } |
| p.mu.Unlock() |
| |
| <-call.done |
| if call.err != nil { |
| return false, call.err |
| } |
| return !dup, nil |
| } |
| |
| type http2addConnCall struct { |
| p *http2clientConnPool |
| done chan struct{} // closed when done |
| err error |
| } |
| |
| func (c *http2addConnCall) run(t *http2Transport, key string, tc *tls.Conn) { |
| cc, err := t.NewClientConn(tc) |
| |
| p := c.p |
| p.mu.Lock() |
| if err != nil { |
| c.err = err |
| } else { |
| p.addConnLocked(key, cc) |
| } |
| delete(p.addConnCalls, key) |
| p.mu.Unlock() |
| close(c.done) |
| } |
| |
| func (p *http2clientConnPool) addConn(key string, cc *http2ClientConn) { |
| p.mu.Lock() |
| p.addConnLocked(key, cc) |
| p.mu.Unlock() |
| } |
| |
| // p.mu must be held |
| func (p *http2clientConnPool) addConnLocked(key string, cc *http2ClientConn) { |
| for _, v := range p.conns[key] { |
| if v == cc { |
| return |
| } |
| } |
| if p.conns == nil { |
| p.conns = make(map[string][]*http2ClientConn) |
| } |
| if p.keys == nil { |
| p.keys = make(map[*http2ClientConn][]string) |
| } |
| p.conns[key] = append(p.conns[key], cc) |
| p.keys[cc] = append(p.keys[cc], key) |
| } |
| |
| func (p *http2clientConnPool) MarkDead(cc *http2ClientConn) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| for _, key := range p.keys[cc] { |
| vv, ok := p.conns[key] |
| if !ok { |
| continue |
| } |
| newList := http2filterOutClientConn(vv, cc) |
| if len(newList) > 0 { |
| p.conns[key] = newList |
| } else { |
| delete(p.conns, key) |
| } |
| } |
| delete(p.keys, cc) |
| } |
| |
| func (p *http2clientConnPool) closeIdleConnections() { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| |
| for _, vv := range p.conns { |
| for _, cc := range vv { |
| cc.closeIfIdle() |
| } |
| } |
| } |
| |
| func http2filterOutClientConn(in []*http2ClientConn, exclude *http2ClientConn) []*http2ClientConn { |
| out := in[:0] |
| for _, v := range in { |
| if v != exclude { |
| out = append(out, v) |
| } |
| } |
| |
| if len(in) != len(out) { |
| in[len(in)-1] = nil |
| } |
| return out |
| } |
| |
| // noDialClientConnPool is an implementation of http2.ClientConnPool |
| // which never dials. We let the HTTP/1.1 client dial and use its TLS |
| // connection instead. |
| type http2noDialClientConnPool struct{ *http2clientConnPool } |
| |
| func (p http2noDialClientConnPool) GetClientConn(req *Request, addr string) (*http2ClientConn, error) { |
| return p.getClientConn(req, addr, http2noDialOnMiss) |
| } |
| |
| func http2configureTransport(t1 *Transport) (*http2Transport, error) { |
| connPool := new(http2clientConnPool) |
| t2 := &http2Transport{ |
| ConnPool: http2noDialClientConnPool{connPool}, |
| t1: t1, |
| } |
| connPool.t = t2 |
| if err := http2registerHTTPSProtocol(t1, http2noDialH2RoundTripper{t2}); err != nil { |
| return nil, err |
| } |
| if t1.TLSClientConfig == nil { |
| t1.TLSClientConfig = new(tls.Config) |
| } |
| if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "h2") { |
| t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...) |
| } |
| if !http2strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") { |
| t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1") |
| } |
| upgradeFn := func(authority string, c *tls.Conn) RoundTripper { |
| addr := http2authorityAddr("https", authority) |
| if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil { |
| go c.Close() |
| return http2erringRoundTripper{err} |
| } else if !used { |
| |
| go c.Close() |
| } |
| return t2 |
| } |
| if m := t1.TLSNextProto; len(m) == 0 { |
| t1.TLSNextProto = map[string]func(string, *tls.Conn) RoundTripper{ |
| "h2": upgradeFn, |
| } |
| } else { |
| m["h2"] = upgradeFn |
| } |
| return t2, nil |
| } |
| |
| // registerHTTPSProtocol calls Transport.RegisterProtocol but |
| // convering panics into errors. |
| func http2registerHTTPSProtocol(t *Transport, rt RoundTripper) (err error) { |
| defer func() { |
| if e := recover(); e != nil { |
| err = fmt.Errorf("%v", e) |
| } |
| }() |
| t.RegisterProtocol("https", rt) |
| return nil |
| } |
| |
| // noDialH2RoundTripper is a RoundTripper which only tries to complete the request |
| // if there's already has a cached connection to the host. |
| type http2noDialH2RoundTripper struct{ t *http2Transport } |
| |
| func (rt http2noDialH2RoundTripper) RoundTrip(req *Request) (*Response, error) { |
| res, err := rt.t.RoundTrip(req) |
| if err == http2ErrNoCachedConn { |
| return nil, ErrSkipAltProtocol |
| } |
| return res, err |
| } |
| |
| // An ErrCode is an unsigned 32-bit error code as defined in the HTTP/2 spec. |
| type http2ErrCode uint32 |
| |
| const ( |
| http2ErrCodeNo http2ErrCode = 0x0 |
| http2ErrCodeProtocol http2ErrCode = 0x1 |
| http2ErrCodeInternal http2ErrCode = 0x2 |
| http2ErrCodeFlowControl http2ErrCode = 0x3 |
| http2ErrCodeSettingsTimeout http2ErrCode = 0x4 |
| http2ErrCodeStreamClosed http2ErrCode = 0x5 |
| http2ErrCodeFrameSize http2ErrCode = 0x6 |
| http2ErrCodeRefusedStream http2ErrCode = 0x7 |
| http2ErrCodeCancel http2ErrCode = 0x8 |
| http2ErrCodeCompression http2ErrCode = 0x9 |
| http2ErrCodeConnect http2ErrCode = 0xa |
| http2ErrCodeEnhanceYourCalm http2ErrCode = 0xb |
| http2ErrCodeInadequateSecurity http2ErrCode = 0xc |
| http2ErrCodeHTTP11Required http2ErrCode = 0xd |
| ) |
| |
| var http2errCodeName = map[http2ErrCode]string{ |
| http2ErrCodeNo: "NO_ERROR", |
| http2ErrCodeProtocol: "PROTOCOL_ERROR", |
| http2ErrCodeInternal: "INTERNAL_ERROR", |
| http2ErrCodeFlowControl: "FLOW_CONTROL_ERROR", |
| http2ErrCodeSettingsTimeout: "SETTINGS_TIMEOUT", |
| http2ErrCodeStreamClosed: "STREAM_CLOSED", |
| http2ErrCodeFrameSize: "FRAME_SIZE_ERROR", |
| http2ErrCodeRefusedStream: "REFUSED_STREAM", |
| http2ErrCodeCancel: "CANCEL", |
| http2ErrCodeCompression: "COMPRESSION_ERROR", |
| http2ErrCodeConnect: "CONNECT_ERROR", |
| http2ErrCodeEnhanceYourCalm: "ENHANCE_YOUR_CALM", |
| http2ErrCodeInadequateSecurity: "INADEQUATE_SECURITY", |
| http2ErrCodeHTTP11Required: "HTTP_1_1_REQUIRED", |
| } |
| |
| func (e http2ErrCode) String() string { |
| if s, ok := http2errCodeName[e]; ok { |
| return s |
| } |
| return fmt.Sprintf("unknown error code 0x%x", uint32(e)) |
| } |
| |
| // ConnectionError is an error that results in the termination of the |
| // entire connection. |
| type http2ConnectionError http2ErrCode |
| |
| func (e http2ConnectionError) Error() string { |
| return fmt.Sprintf("connection error: %s", http2ErrCode(e)) |
| } |
| |
| // StreamError is an error that only affects one stream within an |
| // HTTP/2 connection. |
| type http2StreamError struct { |
| StreamID uint32 |
| Code http2ErrCode |
| Cause error // optional additional detail |
| } |
| |
| func http2streamError(id uint32, code http2ErrCode) http2StreamError { |
| return http2StreamError{StreamID: id, Code: code} |
| } |
| |
| func (e http2StreamError) Error() string { |
| if e.Cause != nil { |
| return fmt.Sprintf("stream error: stream ID %d; %v; %v", e.StreamID, e.Code, e.Cause) |
| } |
| return fmt.Sprintf("stream error: stream ID %d; %v", e.StreamID, e.Code) |
| } |
| |
| // 6.9.1 The Flow Control Window |
| // "If a sender receives a WINDOW_UPDATE that causes a flow control |
| // window to exceed this maximum it MUST terminate either the stream |
| // or the connection, as appropriate. For streams, [...]; for the |
| // connection, a GOAWAY frame with a FLOW_CONTROL_ERROR code." |
| type http2goAwayFlowError struct{} |
| |
| func (http2goAwayFlowError) Error() string { return "connection exceeded flow control window size" } |
| |
| // Errors of this type are only returned by the frame parser functions |
| // and converted into ConnectionError(ErrCodeProtocol). |
| type http2connError struct { |
| Code http2ErrCode |
| Reason string |
| } |
| |
| func (e http2connError) Error() string { |
| return fmt.Sprintf("http2: connection error: %v: %v", e.Code, e.Reason) |
| } |
| |
| type http2pseudoHeaderError string |
| |
| func (e http2pseudoHeaderError) Error() string { |
| return fmt.Sprintf("invalid pseudo-header %q", string(e)) |
| } |
| |
| type http2duplicatePseudoHeaderError string |
| |
| func (e http2duplicatePseudoHeaderError) Error() string { |
| return fmt.Sprintf("duplicate pseudo-header %q", string(e)) |
| } |
| |
| type http2headerFieldNameError string |
| |
| func (e http2headerFieldNameError) Error() string { |
| return fmt.Sprintf("invalid header field name %q", string(e)) |
| } |
| |
| type http2headerFieldValueError string |
| |
| func (e http2headerFieldValueError) Error() string { |
| return fmt.Sprintf("invalid header field value %q", string(e)) |
| } |
| |
| var ( |
| http2errMixPseudoHeaderTypes = errors.New("mix of request and response pseudo headers") |
| http2errPseudoAfterRegular = errors.New("pseudo header field after regular") |
| ) |
| |
| // fixedBuffer is an io.ReadWriter backed by a fixed size buffer. |
| // It never allocates, but moves old data as new data is written. |
| type http2fixedBuffer struct { |
| buf []byte |
| r, w int |
| } |
| |
| var ( |
| http2errReadEmpty = errors.New("read from empty fixedBuffer") |
| http2errWriteFull = errors.New("write on full fixedBuffer") |
| ) |
| |
| // Read copies bytes from the buffer into p. |
| // It is an error to read when no data is available. |
| func (b *http2fixedBuffer) Read(p []byte) (n int, err error) { |
| if b.r == b.w { |
| return 0, http2errReadEmpty |
| } |
| n = copy(p, b.buf[b.r:b.w]) |
| b.r += n |
| if b.r == b.w { |
| b.r = 0 |
| b.w = 0 |
| } |
| return n, nil |
| } |
| |
| // Len returns the number of bytes of the unread portion of the buffer. |
| func (b *http2fixedBuffer) Len() int { |
| return b.w - b.r |
| } |
| |
| // Write copies bytes from p into the buffer. |
| // It is an error to write more data than the buffer can hold. |
| func (b *http2fixedBuffer) Write(p []byte) (n int, err error) { |
| |
| if b.r > 0 && len(p) > len(b.buf)-b.w { |
| copy(b.buf, b.buf[b.r:b.w]) |
| b.w -= b.r |
| b.r = 0 |
| } |
| |
| n = copy(b.buf[b.w:], p) |
| b.w += n |
| if n < len(p) { |
| err = http2errWriteFull |
| } |
| return n, err |
| } |
| |
| // flow is the flow control window's size. |
| type http2flow struct { |
| // n is the number of DATA bytes we're allowed to send. |
| // A flow is kept both on a conn and a per-stream. |
| n int32 |
| |
| // conn points to the shared connection-level flow that is |
| // shared by all streams on that conn. It is nil for the flow |
| // that's on the conn directly. |
| conn *http2flow |
| } |
| |
| func (f *http2flow) setConnFlow(cf *http2flow) { f.conn = cf } |
| |
| func (f *http2flow) available() int32 { |
| n := f.n |
| if f.conn != nil && f.conn.n < n { |
| n = f.conn.n |
| } |
| return n |
| } |
| |
| func (f *http2flow) take(n int32) { |
| if n > f.available() { |
| panic("internal error: took too much") |
| } |
| f.n -= n |
| if f.conn != nil { |
| f.conn.n -= n |
| } |
| } |
| |
| // add adds n bytes (positive or negative) to the flow control window. |
| // It returns false if the sum would exceed 2^31-1. |
| func (f *http2flow) add(n int32) bool { |
| remain := (1<<31 - 1) - f.n |
| if n > remain { |
| return false |
| } |
| f.n += n |
| return true |
| } |
| |
| const http2frameHeaderLen = 9 |
| |
| var http2padZeros = make([]byte, 255) // zeros for padding |
| |
| // A FrameType is a registered frame type as defined in |
| // http://http2.github.io/http2-spec/#rfc.section.11.2 |
| type http2FrameType uint8 |
| |
| const ( |
| http2FrameData http2FrameType = 0x0 |
| http2FrameHeaders http2FrameType = 0x1 |
| http2FramePriority http2FrameType = 0x2 |
| http2FrameRSTStream http2FrameType = 0x3 |
| http2FrameSettings http2FrameType = 0x4 |
| http2FramePushPromise http2FrameType = 0x5 |
| http2FramePing http2FrameType = 0x6 |
| http2FrameGoAway http2FrameType = 0x7 |
| http2FrameWindowUpdate http2FrameType = 0x8 |
| http2FrameContinuation http2FrameType = 0x9 |
| ) |
| |
| var http2frameName = map[http2FrameType]string{ |
| http2FrameData: "DATA", |
| http2FrameHeaders: "HEADERS", |
| http2FramePriority: "PRIORITY", |
| http2FrameRSTStream: "RST_STREAM", |
| http2FrameSettings: "SETTINGS", |
| http2FramePushPromise: "PUSH_PROMISE", |
| http2FramePing: "PING", |
| http2FrameGoAway: "GOAWAY", |
| http2FrameWindowUpdate: "WINDOW_UPDATE", |
| http2FrameContinuation: "CONTINUATION", |
| } |
| |
| func (t http2FrameType) String() string { |
| if s, ok := http2frameName[t]; ok { |
| return s |
| } |
| return fmt.Sprintf("UNKNOWN_FRAME_TYPE_%d", uint8(t)) |
| } |
| |
| // Flags is a bitmask of HTTP/2 flags. |
| // The meaning of flags varies depending on the frame type. |
| type http2Flags uint8 |
| |
| // Has reports whether f contains all (0 or more) flags in v. |
| func (f http2Flags) Has(v http2Flags) bool { |
| return (f & v) == v |
| } |
| |
| // Frame-specific FrameHeader flag bits. |
| const ( |
| // Data Frame |
| http2FlagDataEndStream http2Flags = 0x1 |
| http2FlagDataPadded http2Flags = 0x8 |
| |
| // Headers Frame |
| http2FlagHeadersEndStream http2Flags = 0x1 |
| http2FlagHeadersEndHeaders http2Flags = 0x4 |
| http2FlagHeadersPadded http2Flags = 0x8 |
| http2FlagHeadersPriority http2Flags = 0x20 |
| |
| // Settings Frame |
| http2FlagSettingsAck http2Flags = 0x1 |
| |
| // Ping Frame |
| http2FlagPingAck http2Flags = 0x1 |
| |
| // Continuation Frame |
| http2FlagContinuationEndHeaders http2Flags = 0x4 |
| |
| http2FlagPushPromiseEndHeaders http2Flags = 0x4 |
| http2FlagPushPromisePadded http2Flags = 0x8 |
| ) |
| |
| var http2flagName = map[http2FrameType]map[http2Flags]string{ |
| http2FrameData: { |
| http2FlagDataEndStream: "END_STREAM", |
| http2FlagDataPadded: "PADDED", |
| }, |
| http2FrameHeaders: { |
| http2FlagHeadersEndStream: "END_STREAM", |
| http2FlagHeadersEndHeaders: "END_HEADERS", |
| http2FlagHeadersPadded: "PADDED", |
| http2FlagHeadersPriority: "PRIORITY", |
| }, |
| http2FrameSettings: { |
| http2FlagSettingsAck: "ACK", |
| }, |
| http2FramePing: { |
| http2FlagPingAck: "ACK", |
| }, |
| http2FrameContinuation: { |
| http2FlagContinuationEndHeaders: "END_HEADERS", |
| }, |
| http2FramePushPromise: { |
| http2FlagPushPromiseEndHeaders: "END_HEADERS", |
| http2FlagPushPromisePadded: "PADDED", |
| }, |
| } |
| |
| // a frameParser parses a frame given its FrameHeader and payload |
| // bytes. The length of payload will always equal fh.Length (which |
| // might be 0). |
| type http2frameParser func(fh http2FrameHeader, payload []byte) (http2Frame, error) |
| |
| var http2frameParsers = map[http2FrameType]http2frameParser{ |
| http2FrameData: http2parseDataFrame, |
| http2FrameHeaders: http2parseHeadersFrame, |
| http2FramePriority: http2parsePriorityFrame, |
| http2FrameRSTStream: http2parseRSTStreamFrame, |
| http2FrameSettings: http2parseSettingsFrame, |
| http2FramePushPromise: http2parsePushPromise, |
| http2FramePing: http2parsePingFrame, |
| http2FrameGoAway: http2parseGoAwayFrame, |
| http2FrameWindowUpdate: http2parseWindowUpdateFrame, |
| http2FrameContinuation: http2parseContinuationFrame, |
| } |
| |
| func http2typeFrameParser(t http2FrameType) http2frameParser { |
| if f := http2frameParsers[t]; f != nil { |
| return f |
| } |
| return http2parseUnknownFrame |
| } |
| |
| // A FrameHeader is the 9 byte header of all HTTP/2 frames. |
| // |
| // See http://http2.github.io/http2-spec/#FrameHeader |
| type http2FrameHeader struct { |
| valid bool // caller can access []byte fields in the Frame |
| |
| // Type is the 1 byte frame type. There are ten standard frame |
| // types, but extension frame types may be written by WriteRawFrame |
| // and will be returned by ReadFrame (as UnknownFrame). |
| Type http2FrameType |
| |
| // Flags are the 1 byte of 8 potential bit flags per frame. |
| // They are specific to the frame type. |
| Flags http2Flags |
| |
| // Length is the length of the frame, not including the 9 byte header. |
| // The maximum size is one byte less than 16MB (uint24), but only |
| // frames up to 16KB are allowed without peer agreement. |
| Length uint32 |
| |
| // StreamID is which stream this frame is for. Certain frames |
| // are not stream-specific, in which case this field is 0. |
| StreamID uint32 |
| } |
| |
| // Header returns h. It exists so FrameHeaders can be embedded in other |
| // specific frame types and implement the Frame interface. |
| func (h http2FrameHeader) Header() http2FrameHeader { return h } |
| |
| func (h http2FrameHeader) String() string { |
| var buf bytes.Buffer |
| buf.WriteString("[FrameHeader ") |
| h.writeDebug(&buf) |
| buf.WriteByte(']') |
| return buf.String() |
| } |
| |
| func (h http2FrameHeader) writeDebug(buf *bytes.Buffer) { |
| buf.WriteString(h.Type.String()) |
| if h.Flags != 0 { |
| buf.WriteString(" flags=") |
| set := 0 |
| for i := uint8(0); i < 8; i++ { |
| if h.Flags&(1<<i) == 0 { |
| continue |
| } |
| set++ |
| if set > 1 { |
| buf.WriteByte('|') |
| } |
| name := http2flagName[h.Type][http2Flags(1<<i)] |
| if name != "" { |
| buf.WriteString(name) |
| } else { |
| fmt.Fprintf(buf, "0x%x", 1<<i) |
| } |
| } |
| } |
| if h.StreamID != 0 { |
| fmt.Fprintf(buf, " stream=%d", h.StreamID) |
| } |
| fmt.Fprintf(buf, " len=%d", h.Length) |
| } |
| |
| func (h *http2FrameHeader) checkValid() { |
| if !h.valid { |
| panic("Frame accessor called on non-owned Frame") |
| } |
| } |
| |
| func (h *http2FrameHeader) invalidate() { h.valid = false } |
| |
| // frame header bytes. |
| // Used only by ReadFrameHeader. |
| var http2fhBytes = sync.Pool{ |
| New: func() interface{} { |
| buf := make([]byte, http2frameHeaderLen) |
| return &buf |
| }, |
| } |
| |
| // ReadFrameHeader reads 9 bytes from r and returns a FrameHeader. |
| // Most users should use Framer.ReadFrame instead. |
| func http2ReadFrameHeader(r io.Reader) (http2FrameHeader, error) { |
| bufp := http2fhBytes.Get().(*[]byte) |
| defer http2fhBytes.Put(bufp) |
| return http2readFrameHeader(*bufp, r) |
| } |
| |
| func http2readFrameHeader(buf []byte, r io.Reader) (http2FrameHeader, error) { |
| _, err := io.ReadFull(r, buf[:http2frameHeaderLen]) |
| if err != nil { |
| return http2FrameHeader{}, err |
| } |
| return http2FrameHeader{ |
| Length: (uint32(buf[0])<<16 | uint32(buf[1])<<8 | uint32(buf[2])), |
| Type: http2FrameType(buf[3]), |
| Flags: http2Flags(buf[4]), |
| StreamID: binary.BigEndian.Uint32(buf[5:]) & (1<<31 - 1), |
| valid: true, |
| }, nil |
| } |
| |
| // A Frame is the base interface implemented by all frame types. |
| // Callers will generally type-assert the specific frame type: |
| // *HeadersFrame, *SettingsFrame, *WindowUpdateFrame, etc. |
| // |
| // Frames are only valid until the next call to Framer.ReadFrame. |
| type http2Frame interface { |
| Header() http2FrameHeader |
| |
| // invalidate is called by Framer.ReadFrame to make this |
| // frame's buffers as being invalid, since the subsequent |
| // frame will reuse them. |
| invalidate() |
| } |
| |
| // A Framer reads and writes Frames. |
| type http2Framer struct { |
| r io.Reader |
| lastFrame http2Frame |
| errDetail error |
| |
| // lastHeaderStream is non-zero if the last frame was an |
| // unfinished HEADERS/CONTINUATION. |
| lastHeaderStream uint32 |
| |
| maxReadSize uint32 |
| headerBuf [http2frameHeaderLen]byte |
| |
| // TODO: let getReadBuf be configurable, and use a less memory-pinning |
| // allocator in server.go to minimize memory pinned for many idle conns. |
| // Will probably also need to make frame invalidation have a hook too. |
| getReadBuf func(size uint32) []byte |
| readBuf []byte // cache for default getReadBuf |
| |
| maxWriteSize uint32 // zero means unlimited; TODO: implement |
| |
| w io.Writer |
| wbuf []byte |
| |
| // AllowIllegalWrites permits the Framer's Write methods to |
| // write frames that do not conform to the HTTP/2 spec. This |
| // permits using the Framer to test other HTTP/2 |
| // implementations' conformance to the spec. |
| // If false, the Write methods will prefer to return an error |
| // rather than comply. |
| AllowIllegalWrites bool |
| |
| // AllowIllegalReads permits the Framer's ReadFrame method |
| // to return non-compliant frames or frame orders. |
| // This is for testing and permits using the Framer to test |
| // other HTTP/2 implementations' conformance to the spec. |
| // It is not compatible with ReadMetaHeaders. |
| AllowIllegalReads bool |
| |
| // ReadMetaHeaders if non-nil causes ReadFrame to merge |
| // HEADERS and CONTINUATION frames together and return |
| // MetaHeadersFrame instead. |
| ReadMetaHeaders *hpack.Decoder |
| |
| // MaxHeaderListSize is the http2 MAX_HEADER_LIST_SIZE. |
| // It's used only if ReadMetaHeaders is set; 0 means a sane default |
| // (currently 16MB) |
| // If the limit is hit, MetaHeadersFrame.Truncated is set true. |
| MaxHeaderListSize uint32 |
| |
| logReads, logWrites bool |
| |
| debugFramer *http2Framer // only use for logging written writes |
| debugFramerBuf *bytes.Buffer |
| debugReadLoggerf func(string, ...interface{}) |
| debugWriteLoggerf func(string, ...interface{}) |
| } |
| |
| func (fr *http2Framer) maxHeaderListSize() uint32 { |
| if fr.MaxHeaderListSize == 0 { |
| return 16 << 20 |
| } |
| return fr.MaxHeaderListSize |
| } |
| |
| func (f *http2Framer) startWrite(ftype http2FrameType, flags http2Flags, streamID uint32) { |
| |
| f.wbuf = append(f.wbuf[:0], |
| 0, |
| 0, |
| 0, |
| byte(ftype), |
| byte(flags), |
| byte(streamID>>24), |
| byte(streamID>>16), |
| byte(streamID>>8), |
| byte(streamID)) |
| } |
| |
| func (f *http2Framer) endWrite() error { |
| |
| length := len(f.wbuf) - http2frameHeaderLen |
| if length >= (1 << 24) { |
| return http2ErrFrameTooLarge |
| } |
| _ = append(f.wbuf[:0], |
| byte(length>>16), |
| byte(length>>8), |
| byte(length)) |
| if f.logWrites { |
| f.logWrite() |
| } |
| |
| n, err := f.w.Write(f.wbuf) |
| if err == nil && n != len(f.wbuf) { |
| err = io.ErrShortWrite |
| } |
| return err |
| } |
| |
| func (f *http2Framer) logWrite() { |
| if f.debugFramer == nil { |
| f.debugFramerBuf = new(bytes.Buffer) |
| f.debugFramer = http2NewFramer(nil, f.debugFramerBuf) |
| f.debugFramer.logReads = false |
| |
| f.debugFramer.AllowIllegalReads = true |
| } |
| f.debugFramerBuf.Write(f.wbuf) |
| fr, err := f.debugFramer.ReadFrame() |
| if err != nil { |
| f.debugWriteLoggerf("http2: Framer %p: failed to decode just-written frame", f) |
| return |
| } |
| f.debugWriteLoggerf("http2: Framer %p: wrote %v", f, http2summarizeFrame(fr)) |
| } |
| |
| func (f *http2Framer) writeByte(v byte) { f.wbuf = append(f.wbuf, v) } |
| |
| func (f *http2Framer) writeBytes(v []byte) { f.wbuf = append(f.wbuf, v...) } |
| |
| func (f *http2Framer) writeUint16(v uint16) { f.wbuf = append(f.wbuf, byte(v>>8), byte(v)) } |
| |
| func (f *http2Framer) writeUint32(v uint32) { |
| f.wbuf = append(f.wbuf, byte(v>>24), byte(v>>16), byte(v>>8), byte(v)) |
| } |
| |
| const ( |
| http2minMaxFrameSize = 1 << 14 |
| http2maxFrameSize = 1<<24 - 1 |
| ) |
| |
| // NewFramer returns a Framer that writes frames to w and reads them from r. |
| func http2NewFramer(w io.Writer, r io.Reader) *http2Framer { |
| fr := &http2Framer{ |
| w: w, |
| r: r, |
| logReads: http2logFrameReads, |
| logWrites: http2logFrameWrites, |
| debugReadLoggerf: log.Printf, |
| debugWriteLoggerf: log.Printf, |
| } |
| fr.getReadBuf = func(size uint32) []byte { |
| if cap(fr.readBuf) >= int(size) { |
| return fr.readBuf[:size] |
| } |
| fr.readBuf = make([]byte, size) |
| return fr.readBuf |
| } |
| fr.SetMaxReadFrameSize(http2maxFrameSize) |
| return fr |
| } |
| |
| // SetMaxReadFrameSize sets the maximum size of a frame |
| // that will be read by a subsequent call to ReadFrame. |
| // It is the caller's responsibility to advertise this |
| // limit with a SETTINGS frame. |
| func (fr *http2Framer) SetMaxReadFrameSize(v uint32) { |
| if v > http2maxFrameSize { |
| v = http2maxFrameSize |
| } |
| fr.maxReadSize = v |
| } |
| |
| // ErrorDetail returns a more detailed error of the last error |
| // returned by Framer.ReadFrame. For instance, if ReadFrame |
| // returns a StreamError with code PROTOCOL_ERROR, ErrorDetail |
| // will say exactly what was invalid. ErrorDetail is not guaranteed |
| // to return a non-nil value and like the rest of the http2 package, |
| // its return value is not protected by an API compatibility promise. |
| // ErrorDetail is reset after the next call to ReadFrame. |
| func (fr *http2Framer) ErrorDetail() error { |
| return fr.errDetail |
| } |
| |
| // ErrFrameTooLarge is returned from Framer.ReadFrame when the peer |
| // sends a frame that is larger than declared with SetMaxReadFrameSize. |
| var http2ErrFrameTooLarge = errors.New("http2: frame too large") |
| |
| // terminalReadFrameError reports whether err is an unrecoverable |
| // error from ReadFrame and no other frames should be read. |
| func http2terminalReadFrameError(err error) bool { |
| if _, ok := err.(http2StreamError); ok { |
| return false |
| } |
| return err != nil |
| } |
| |
| // ReadFrame reads a single frame. The returned Frame is only valid |
| // until the next call to ReadFrame. |
| // |
| // If the frame is larger than previously set with SetMaxReadFrameSize, the |
| // returned error is ErrFrameTooLarge. Other errors may be of type |
| // ConnectionError, StreamError, or anything else from the underlying |
| // reader. |
| func (fr *http2Framer) ReadFrame() (http2Frame, error) { |
| fr.errDetail = nil |
| if fr.lastFrame != nil { |
| fr.lastFrame.invalidate() |
| } |
| fh, err := http2readFrameHeader(fr.headerBuf[:], fr.r) |
| if err != nil { |
| return nil, err |
| } |
| if fh.Length > fr.maxReadSize { |
| return nil, http2ErrFrameTooLarge |
| } |
| payload := fr.getReadBuf(fh.Length) |
| if _, err := io.ReadFull(fr.r, payload); err != nil { |
| return nil, err |
| } |
| f, err := http2typeFrameParser(fh.Type)(fh, payload) |
| if err != nil { |
| if ce, ok := err.(http2connError); ok { |
| return nil, fr.connError(ce.Code, ce.Reason) |
| } |
| return nil, err |
| } |
| if err := fr.checkFrameOrder(f); err != nil { |
| return nil, err |
| } |
| if fr.logReads { |
| fr.debugReadLoggerf("http2: Framer %p: read %v", fr, http2summarizeFrame(f)) |
| } |
| if fh.Type == http2FrameHeaders && fr.ReadMetaHeaders != nil { |
| return fr.readMetaFrame(f.(*http2HeadersFrame)) |
| } |
| return f, nil |
| } |
| |
| // connError returns ConnectionError(code) but first |
| // stashes away a public reason to the caller can optionally relay it |
| // to the peer before hanging up on them. This might help others debug |
| // their implementations. |
| func (fr *http2Framer) connError(code http2ErrCode, reason string) error { |
| fr.errDetail = errors.New(reason) |
| return http2ConnectionError(code) |
| } |
| |
| // checkFrameOrder reports an error if f is an invalid frame to return |
| // next from ReadFrame. Mostly it checks whether HEADERS and |
| // CONTINUATION frames are contiguous. |
| func (fr *http2Framer) checkFrameOrder(f http2Frame) error { |
| last := fr.lastFrame |
| fr.lastFrame = f |
| if fr.AllowIllegalReads { |
| return nil |
| } |
| |
| fh := f.Header() |
| if fr.lastHeaderStream != 0 { |
| if fh.Type != http2FrameContinuation { |
| return fr.connError(http2ErrCodeProtocol, |
| fmt.Sprintf("got %s for stream %d; expected CONTINUATION following %s for stream %d", |
| fh.Type, fh.StreamID, |
| last.Header().Type, fr.lastHeaderStream)) |
| } |
| if fh.StreamID != fr.lastHeaderStream { |
| return fr.connError(http2ErrCodeProtocol, |
| fmt.Sprintf("got CONTINUATION for stream %d; expected stream %d", |
| fh.StreamID, fr.lastHeaderStream)) |
| } |
| } else if fh.Type == http2FrameContinuation { |
| return fr.connError(http2ErrCodeProtocol, fmt.Sprintf("unexpected CONTINUATION for stream %d", fh.StreamID)) |
| } |
| |
| switch fh.Type { |
| case http2FrameHeaders, http2FrameContinuation: |
| if fh.Flags.Has(http2FlagHeadersEndHeaders) { |
| fr.lastHeaderStream = 0 |
| } else { |
| fr.lastHeaderStream = fh.StreamID |
| } |
| } |
| |
| return nil |
| } |
| |
| // A DataFrame conveys arbitrary, variable-length sequences of octets |
| // associated with a stream. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.1 |
| type http2DataFrame struct { |
| http2FrameHeader |
| data []byte |
| } |
| |
| func (f *http2DataFrame) StreamEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagDataEndStream) |
| } |
| |
| // Data returns the frame's data octets, not including any padding |
| // size byte or padding suffix bytes. |
| // The caller must not retain the returned memory past the next |
| // call to ReadFrame. |
| func (f *http2DataFrame) Data() []byte { |
| f.checkValid() |
| return f.data |
| } |
| |
| func http2parseDataFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { |
| if fh.StreamID == 0 { |
| |
| return nil, http2connError{http2ErrCodeProtocol, "DATA frame with stream ID 0"} |
| } |
| f := &http2DataFrame{ |
| http2FrameHeader: fh, |
| } |
| var padSize byte |
| if fh.Flags.Has(http2FlagDataPadded) { |
| var err error |
| payload, padSize, err = http2readByte(payload) |
| if err != nil { |
| return nil, err |
| } |
| } |
| if int(padSize) > len(payload) { |
| |
| return nil, http2connError{http2ErrCodeProtocol, "pad size larger than data payload"} |
| } |
| f.data = payload[:len(payload)-int(padSize)] |
| return f, nil |
| } |
| |
| var ( |
| http2errStreamID = errors.New("invalid stream ID") |
| http2errDepStreamID = errors.New("invalid dependent stream ID") |
| http2errPadLength = errors.New("pad length too large") |
| ) |
| |
| func http2validStreamIDOrZero(streamID uint32) bool { |
| return streamID&(1<<31) == 0 |
| } |
| |
| func http2validStreamID(streamID uint32) bool { |
| return streamID != 0 && streamID&(1<<31) == 0 |
| } |
| |
| // WriteData writes a DATA frame. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility not to violate the maximum frame size |
| // and to not call other Write methods concurrently. |
| func (f *http2Framer) WriteData(streamID uint32, endStream bool, data []byte) error { |
| return f.WriteDataPadded(streamID, endStream, data, nil) |
| } |
| |
| // WriteData writes a DATA frame with optional padding. |
| // |
| // If pad is nil, the padding bit is not sent. |
| // The length of pad must not exceed 255 bytes. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility not to violate the maximum frame size |
| // and to not call other Write methods concurrently. |
| func (f *http2Framer) WriteDataPadded(streamID uint32, endStream bool, data, pad []byte) error { |
| if !http2validStreamID(streamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| if len(pad) > 255 { |
| return http2errPadLength |
| } |
| var flags http2Flags |
| if endStream { |
| flags |= http2FlagDataEndStream |
| } |
| if pad != nil { |
| flags |= http2FlagDataPadded |
| } |
| f.startWrite(http2FrameData, flags, streamID) |
| if pad != nil { |
| f.wbuf = append(f.wbuf, byte(len(pad))) |
| } |
| f.wbuf = append(f.wbuf, data...) |
| f.wbuf = append(f.wbuf, pad...) |
| return f.endWrite() |
| } |
| |
| // A SettingsFrame conveys configuration parameters that affect how |
| // endpoints communicate, such as preferences and constraints on peer |
| // behavior. |
| // |
| // See http://http2.github.io/http2-spec/#SETTINGS |
| type http2SettingsFrame struct { |
| http2FrameHeader |
| p []byte |
| } |
| |
| func http2parseSettingsFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| if fh.Flags.Has(http2FlagSettingsAck) && fh.Length > 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| if fh.StreamID != 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if len(p)%6 != 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| f := &http2SettingsFrame{http2FrameHeader: fh, p: p} |
| if v, ok := f.Value(http2SettingInitialWindowSize); ok && v > (1<<31)-1 { |
| |
| return nil, http2ConnectionError(http2ErrCodeFlowControl) |
| } |
| return f, nil |
| } |
| |
| func (f *http2SettingsFrame) IsAck() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagSettingsAck) |
| } |
| |
| func (f *http2SettingsFrame) Value(s http2SettingID) (v uint32, ok bool) { |
| f.checkValid() |
| buf := f.p |
| for len(buf) > 0 { |
| settingID := http2SettingID(binary.BigEndian.Uint16(buf[:2])) |
| if settingID == s { |
| return binary.BigEndian.Uint32(buf[2:6]), true |
| } |
| buf = buf[6:] |
| } |
| return 0, false |
| } |
| |
| // ForeachSetting runs fn for each setting. |
| // It stops and returns the first error. |
| func (f *http2SettingsFrame) ForeachSetting(fn func(http2Setting) error) error { |
| f.checkValid() |
| buf := f.p |
| for len(buf) > 0 { |
| if err := fn(http2Setting{ |
| http2SettingID(binary.BigEndian.Uint16(buf[:2])), |
| binary.BigEndian.Uint32(buf[2:6]), |
| }); err != nil { |
| return err |
| } |
| buf = buf[6:] |
| } |
| return nil |
| } |
| |
| // WriteSettings writes a SETTINGS frame with zero or more settings |
| // specified and the ACK bit not set. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteSettings(settings ...http2Setting) error { |
| f.startWrite(http2FrameSettings, 0, 0) |
| for _, s := range settings { |
| f.writeUint16(uint16(s.ID)) |
| f.writeUint32(s.Val) |
| } |
| return f.endWrite() |
| } |
| |
| // WriteSettingsAck writes an empty SETTINGS frame with the ACK bit set. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteSettingsAck() error { |
| f.startWrite(http2FrameSettings, http2FlagSettingsAck, 0) |
| return f.endWrite() |
| } |
| |
| // A PingFrame is a mechanism for measuring a minimal round trip time |
| // from the sender, as well as determining whether an idle connection |
| // is still functional. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.7 |
| type http2PingFrame struct { |
| http2FrameHeader |
| Data [8]byte |
| } |
| |
| func (f *http2PingFrame) IsAck() bool { return f.Flags.Has(http2FlagPingAck) } |
| |
| func http2parsePingFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { |
| if len(payload) != 8 { |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| if fh.StreamID != 0 { |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| f := &http2PingFrame{http2FrameHeader: fh} |
| copy(f.Data[:], payload) |
| return f, nil |
| } |
| |
| func (f *http2Framer) WritePing(ack bool, data [8]byte) error { |
| var flags http2Flags |
| if ack { |
| flags = http2FlagPingAck |
| } |
| f.startWrite(http2FramePing, flags, 0) |
| f.writeBytes(data[:]) |
| return f.endWrite() |
| } |
| |
| // A GoAwayFrame informs the remote peer to stop creating streams on this connection. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.8 |
| type http2GoAwayFrame struct { |
| http2FrameHeader |
| LastStreamID uint32 |
| ErrCode http2ErrCode |
| debugData []byte |
| } |
| |
| // DebugData returns any debug data in the GOAWAY frame. Its contents |
| // are not defined. |
| // The caller must not retain the returned memory past the next |
| // call to ReadFrame. |
| func (f *http2GoAwayFrame) DebugData() []byte { |
| f.checkValid() |
| return f.debugData |
| } |
| |
| func http2parseGoAwayFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| if fh.StreamID != 0 { |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if len(p) < 8 { |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| return &http2GoAwayFrame{ |
| http2FrameHeader: fh, |
| LastStreamID: binary.BigEndian.Uint32(p[:4]) & (1<<31 - 1), |
| ErrCode: http2ErrCode(binary.BigEndian.Uint32(p[4:8])), |
| debugData: p[8:], |
| }, nil |
| } |
| |
| func (f *http2Framer) WriteGoAway(maxStreamID uint32, code http2ErrCode, debugData []byte) error { |
| f.startWrite(http2FrameGoAway, 0, 0) |
| f.writeUint32(maxStreamID & (1<<31 - 1)) |
| f.writeUint32(uint32(code)) |
| f.writeBytes(debugData) |
| return f.endWrite() |
| } |
| |
| // An UnknownFrame is the frame type returned when the frame type is unknown |
| // or no specific frame type parser exists. |
| type http2UnknownFrame struct { |
| http2FrameHeader |
| p []byte |
| } |
| |
| // Payload returns the frame's payload (after the header). It is not |
| // valid to call this method after a subsequent call to |
| // Framer.ReadFrame, nor is it valid to retain the returned slice. |
| // The memory is owned by the Framer and is invalidated when the next |
| // frame is read. |
| func (f *http2UnknownFrame) Payload() []byte { |
| f.checkValid() |
| return f.p |
| } |
| |
| func http2parseUnknownFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| return &http2UnknownFrame{fh, p}, nil |
| } |
| |
| // A WindowUpdateFrame is used to implement flow control. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.9 |
| type http2WindowUpdateFrame struct { |
| http2FrameHeader |
| Increment uint32 // never read with high bit set |
| } |
| |
| func http2parseWindowUpdateFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| if len(p) != 4 { |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| inc := binary.BigEndian.Uint32(p[:4]) & 0x7fffffff |
| if inc == 0 { |
| |
| if fh.StreamID == 0 { |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol) |
| } |
| return &http2WindowUpdateFrame{ |
| http2FrameHeader: fh, |
| Increment: inc, |
| }, nil |
| } |
| |
| // WriteWindowUpdate writes a WINDOW_UPDATE frame. |
| // The increment value must be between 1 and 2,147,483,647, inclusive. |
| // If the Stream ID is zero, the window update applies to the |
| // connection as a whole. |
| func (f *http2Framer) WriteWindowUpdate(streamID, incr uint32) error { |
| |
| if (incr < 1 || incr > 2147483647) && !f.AllowIllegalWrites { |
| return errors.New("illegal window increment value") |
| } |
| f.startWrite(http2FrameWindowUpdate, 0, streamID) |
| f.writeUint32(incr) |
| return f.endWrite() |
| } |
| |
| // A HeadersFrame is used to open a stream and additionally carries a |
| // header block fragment. |
| type http2HeadersFrame struct { |
| http2FrameHeader |
| |
| // Priority is set if FlagHeadersPriority is set in the FrameHeader. |
| Priority http2PriorityParam |
| |
| headerFragBuf []byte // not owned |
| } |
| |
| func (f *http2HeadersFrame) HeaderBlockFragment() []byte { |
| f.checkValid() |
| return f.headerFragBuf |
| } |
| |
| func (f *http2HeadersFrame) HeadersEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagHeadersEndHeaders) |
| } |
| |
| func (f *http2HeadersFrame) StreamEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagHeadersEndStream) |
| } |
| |
| func (f *http2HeadersFrame) HasPriority() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagHeadersPriority) |
| } |
| |
| func http2parseHeadersFrame(fh http2FrameHeader, p []byte) (_ http2Frame, err error) { |
| hf := &http2HeadersFrame{ |
| http2FrameHeader: fh, |
| } |
| if fh.StreamID == 0 { |
| |
| return nil, http2connError{http2ErrCodeProtocol, "HEADERS frame with stream ID 0"} |
| } |
| var padLength uint8 |
| if fh.Flags.Has(http2FlagHeadersPadded) { |
| if p, padLength, err = http2readByte(p); err != nil { |
| return |
| } |
| } |
| if fh.Flags.Has(http2FlagHeadersPriority) { |
| var v uint32 |
| p, v, err = http2readUint32(p) |
| if err != nil { |
| return nil, err |
| } |
| hf.Priority.StreamDep = v & 0x7fffffff |
| hf.Priority.Exclusive = (v != hf.Priority.StreamDep) |
| p, hf.Priority.Weight, err = http2readByte(p) |
| if err != nil { |
| return nil, err |
| } |
| } |
| if len(p)-int(padLength) <= 0 { |
| return nil, http2streamError(fh.StreamID, http2ErrCodeProtocol) |
| } |
| hf.headerFragBuf = p[:len(p)-int(padLength)] |
| return hf, nil |
| } |
| |
| // HeadersFrameParam are the parameters for writing a HEADERS frame. |
| type http2HeadersFrameParam struct { |
| // StreamID is the required Stream ID to initiate. |
| StreamID uint32 |
| // BlockFragment is part (or all) of a Header Block. |
| BlockFragment []byte |
| |
| // EndStream indicates that the header block is the last that |
| // the endpoint will send for the identified stream. Setting |
| // this flag causes the stream to enter one of "half closed" |
| // states. |
| EndStream bool |
| |
| // EndHeaders indicates that this frame contains an entire |
| // header block and is not followed by any |
| // CONTINUATION frames. |
| EndHeaders bool |
| |
| // PadLength is the optional number of bytes of zeros to add |
| // to this frame. |
| PadLength uint8 |
| |
| // Priority, if non-zero, includes stream priority information |
| // in the HEADER frame. |
| Priority http2PriorityParam |
| } |
| |
| // WriteHeaders writes a single HEADERS frame. |
| // |
| // This is a low-level header writing method. Encoding headers and |
| // splitting them into any necessary CONTINUATION frames is handled |
| // elsewhere. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteHeaders(p http2HeadersFrameParam) error { |
| if !http2validStreamID(p.StreamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| var flags http2Flags |
| if p.PadLength != 0 { |
| flags |= http2FlagHeadersPadded |
| } |
| if p.EndStream { |
| flags |= http2FlagHeadersEndStream |
| } |
| if p.EndHeaders { |
| flags |= http2FlagHeadersEndHeaders |
| } |
| if !p.Priority.IsZero() { |
| flags |= http2FlagHeadersPriority |
| } |
| f.startWrite(http2FrameHeaders, flags, p.StreamID) |
| if p.PadLength != 0 { |
| f.writeByte(p.PadLength) |
| } |
| if !p.Priority.IsZero() { |
| v := p.Priority.StreamDep |
| if !http2validStreamIDOrZero(v) && !f.AllowIllegalWrites { |
| return http2errDepStreamID |
| } |
| if p.Priority.Exclusive { |
| v |= 1 << 31 |
| } |
| f.writeUint32(v) |
| f.writeByte(p.Priority.Weight) |
| } |
| f.wbuf = append(f.wbuf, p.BlockFragment...) |
| f.wbuf = append(f.wbuf, http2padZeros[:p.PadLength]...) |
| return f.endWrite() |
| } |
| |
| // A PriorityFrame specifies the sender-advised priority of a stream. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.3 |
| type http2PriorityFrame struct { |
| http2FrameHeader |
| http2PriorityParam |
| } |
| |
| // PriorityParam are the stream prioritzation parameters. |
| type http2PriorityParam struct { |
| // StreamDep is a 31-bit stream identifier for the |
| // stream that this stream depends on. Zero means no |
| // dependency. |
| StreamDep uint32 |
| |
| // Exclusive is whether the dependency is exclusive. |
| Exclusive bool |
| |
| // Weight is the stream's zero-indexed weight. It should be |
| // set together with StreamDep, or neither should be set. Per |
| // the spec, "Add one to the value to obtain a weight between |
| // 1 and 256." |
| Weight uint8 |
| } |
| |
| func (p http2PriorityParam) IsZero() bool { |
| return p == http2PriorityParam{} |
| } |
| |
| func http2parsePriorityFrame(fh http2FrameHeader, payload []byte) (http2Frame, error) { |
| if fh.StreamID == 0 { |
| return nil, http2connError{http2ErrCodeProtocol, "PRIORITY frame with stream ID 0"} |
| } |
| if len(payload) != 5 { |
| return nil, http2connError{http2ErrCodeFrameSize, fmt.Sprintf("PRIORITY frame payload size was %d; want 5", len(payload))} |
| } |
| v := binary.BigEndian.Uint32(payload[:4]) |
| streamID := v & 0x7fffffff |
| return &http2PriorityFrame{ |
| http2FrameHeader: fh, |
| http2PriorityParam: http2PriorityParam{ |
| Weight: payload[4], |
| StreamDep: streamID, |
| Exclusive: streamID != v, |
| }, |
| }, nil |
| } |
| |
| // WritePriority writes a PRIORITY frame. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WritePriority(streamID uint32, p http2PriorityParam) error { |
| if !http2validStreamID(streamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| if !http2validStreamIDOrZero(p.StreamDep) { |
| return http2errDepStreamID |
| } |
| f.startWrite(http2FramePriority, 0, streamID) |
| v := p.StreamDep |
| if p.Exclusive { |
| v |= 1 << 31 |
| } |
| f.writeUint32(v) |
| f.writeByte(p.Weight) |
| return f.endWrite() |
| } |
| |
| // A RSTStreamFrame allows for abnormal termination of a stream. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.4 |
| type http2RSTStreamFrame struct { |
| http2FrameHeader |
| ErrCode http2ErrCode |
| } |
| |
| func http2parseRSTStreamFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| if len(p) != 4 { |
| return nil, http2ConnectionError(http2ErrCodeFrameSize) |
| } |
| if fh.StreamID == 0 { |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| return &http2RSTStreamFrame{fh, http2ErrCode(binary.BigEndian.Uint32(p[:4]))}, nil |
| } |
| |
| // WriteRSTStream writes a RST_STREAM frame. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteRSTStream(streamID uint32, code http2ErrCode) error { |
| if !http2validStreamID(streamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| f.startWrite(http2FrameRSTStream, 0, streamID) |
| f.writeUint32(uint32(code)) |
| return f.endWrite() |
| } |
| |
| // A ContinuationFrame is used to continue a sequence of header block fragments. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.10 |
| type http2ContinuationFrame struct { |
| http2FrameHeader |
| headerFragBuf []byte |
| } |
| |
| func http2parseContinuationFrame(fh http2FrameHeader, p []byte) (http2Frame, error) { |
| if fh.StreamID == 0 { |
| return nil, http2connError{http2ErrCodeProtocol, "CONTINUATION frame with stream ID 0"} |
| } |
| return &http2ContinuationFrame{fh, p}, nil |
| } |
| |
| func (f *http2ContinuationFrame) HeaderBlockFragment() []byte { |
| f.checkValid() |
| return f.headerFragBuf |
| } |
| |
| func (f *http2ContinuationFrame) HeadersEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagContinuationEndHeaders) |
| } |
| |
| // WriteContinuation writes a CONTINUATION frame. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WriteContinuation(streamID uint32, endHeaders bool, headerBlockFragment []byte) error { |
| if !http2validStreamID(streamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| var flags http2Flags |
| if endHeaders { |
| flags |= http2FlagContinuationEndHeaders |
| } |
| f.startWrite(http2FrameContinuation, flags, streamID) |
| f.wbuf = append(f.wbuf, headerBlockFragment...) |
| return f.endWrite() |
| } |
| |
| // A PushPromiseFrame is used to initiate a server stream. |
| // See http://http2.github.io/http2-spec/#rfc.section.6.6 |
| type http2PushPromiseFrame struct { |
| http2FrameHeader |
| PromiseID uint32 |
| headerFragBuf []byte // not owned |
| } |
| |
| func (f *http2PushPromiseFrame) HeaderBlockFragment() []byte { |
| f.checkValid() |
| return f.headerFragBuf |
| } |
| |
| func (f *http2PushPromiseFrame) HeadersEnded() bool { |
| return f.http2FrameHeader.Flags.Has(http2FlagPushPromiseEndHeaders) |
| } |
| |
| func http2parsePushPromise(fh http2FrameHeader, p []byte) (_ http2Frame, err error) { |
| pp := &http2PushPromiseFrame{ |
| http2FrameHeader: fh, |
| } |
| if pp.StreamID == 0 { |
| |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| // The PUSH_PROMISE frame includes optional padding. |
| // Padding fields and flags are identical to those defined for DATA frames |
| var padLength uint8 |
| if fh.Flags.Has(http2FlagPushPromisePadded) { |
| if p, padLength, err = http2readByte(p); err != nil { |
| return |
| } |
| } |
| |
| p, pp.PromiseID, err = http2readUint32(p) |
| if err != nil { |
| return |
| } |
| pp.PromiseID = pp.PromiseID & (1<<31 - 1) |
| |
| if int(padLength) > len(p) { |
| |
| return nil, http2ConnectionError(http2ErrCodeProtocol) |
| } |
| pp.headerFragBuf = p[:len(p)-int(padLength)] |
| return pp, nil |
| } |
| |
| // PushPromiseParam are the parameters for writing a PUSH_PROMISE frame. |
| type http2PushPromiseParam struct { |
| // StreamID is the required Stream ID to initiate. |
| StreamID uint32 |
| |
| // PromiseID is the required Stream ID which this |
| // Push Promises |
| PromiseID uint32 |
| |
| // BlockFragment is part (or all) of a Header Block. |
| BlockFragment []byte |
| |
| // EndHeaders indicates that this frame contains an entire |
| // header block and is not followed by any |
| // CONTINUATION frames. |
| EndHeaders bool |
| |
| // PadLength is the optional number of bytes of zeros to add |
| // to this frame. |
| PadLength uint8 |
| } |
| |
| // WritePushPromise writes a single PushPromise Frame. |
| // |
| // As with Header Frames, This is the low level call for writing |
| // individual frames. Continuation frames are handled elsewhere. |
| // |
| // It will perform exactly one Write to the underlying Writer. |
| // It is the caller's responsibility to not call other Write methods concurrently. |
| func (f *http2Framer) WritePushPromise(p http2PushPromiseParam) error { |
| if !http2validStreamID(p.StreamID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| var flags http2Flags |
| if p.PadLength != 0 { |
| flags |= http2FlagPushPromisePadded |
| } |
| if p.EndHeaders { |
| flags |= http2FlagPushPromiseEndHeaders |
| } |
| f.startWrite(http2FramePushPromise, flags, p.StreamID) |
| if p.PadLength != 0 { |
| f.writeByte(p.PadLength) |
| } |
| if !http2validStreamID(p.PromiseID) && !f.AllowIllegalWrites { |
| return http2errStreamID |
| } |
| f.writeUint32(p.PromiseID) |
| f.wbuf = append(f.wbuf, p.BlockFragment...) |
| f.wbuf = append(f.wbuf, http2padZeros[:p.PadLength]...) |
| return f.endWrite() |
| } |
| |
| // WriteRawFrame writes a raw frame. This can be used to write |
| // extension frames unknown to this package. |
| func (f *http2Framer) WriteRawFrame(t http2FrameType, flags http2Flags, streamID uint32, payload []byte) error { |
| f.startWrite(t, flags, streamID) |
| f.writeBytes(payload) |
| return f.endWrite() |
| } |
| |
| func http2readByte(p []byte) (remain []byte, b byte, err error) { |
| if len(p) == 0 { |
| return nil, 0, io.ErrUnexpectedEOF |
| } |
| return p[1:], p[0], nil |
| } |
| |
| func http2readUint32(p []byte) (remain []byte, v uint32, err error) { |
| if len(p) < 4 { |
| return nil, 0, io.ErrUnexpectedEOF |
| } |
| return p[4:], binary.BigEndian.Uint32(p[:4]), nil |
| } |
| |
| type http2streamEnder interface { |
| StreamEnded() bool |
| } |
| |
| type http2headersEnder interface { |
| HeadersEnded() bool |
| } |
| |
| type http2headersOrContinuation interface { |
| http2headersEnder |
| HeaderBlockFragment() []byte |
| } |
| |
| // A MetaHeadersFrame is the representation of one HEADERS frame and |
| // zero or more contiguous CONTINUATION frames and the decoding of |
| // their HPACK-encoded contents. |
| // |
| // This type of frame does not appear on the wire and is only returned |
| // by the Framer when Framer.ReadMetaHeaders is set. |
| type http2MetaHeadersFrame struct { |
| *http2HeadersFrame |
| |
| // Fields are the fields contained in the HEADERS and |
| // CONTINUATION frames. The underlying slice is owned by the |
| // Framer and must not be retained after the next call to |
| // ReadFrame. |
| // |
| // Fields are guaranteed to be in the correct http2 order and |
| // not have unknown pseudo header fields or invalid header |
| // field names or values. Required pseudo header fields may be |
| // missing, however. Use the MetaHeadersFrame.Pseudo accessor |
| // method access pseudo headers. |
| Fields []hpack.HeaderField |
| |
| // Truncated is whether the max header list size limit was hit |
| // and Fields is incomplete. The hpack decoder state is still |
| // valid, however. |
| Truncated bool |
| } |
| |
| // PseudoValue returns the given pseudo header field's value. |
| // The provided pseudo field should not contain the leading colon. |
| func (mh *http2MetaHeadersFrame) PseudoValue(pseudo string) string { |
| for _, hf := range mh.Fields { |
| if !hf.IsPseudo() { |
| return "" |
| } |
| if hf.Name[1:] == pseudo { |
| return hf.Value |
| } |
| } |
| return "" |
| } |
| |
| // RegularFields returns the regular (non-pseudo) header fields of mh. |
| // The caller does not own the returned slice. |
| func (mh *http2MetaHeadersFrame) RegularFields() []hpack.HeaderField { |
| for i, hf := range mh.Fields { |
| if !hf.IsPseudo() { |
| return mh.Fields[i:] |
| } |
| } |
| return nil |
| } |
| |
| // PseudoFields returns the pseudo header fields of mh. |
| // The caller does not own the returned slice. |
| func (mh *http2MetaHeadersFrame) PseudoFields() []hpack.HeaderField { |
| for i, hf := range mh.Fields { |
| if !hf.IsPseudo() { |
| return mh.Fields[:i] |
| } |
| } |
| return mh.Fields |
| } |
| |
| func (mh *http2MetaHeadersFrame) checkPseudos() error { |
| var isRequest, isResponse bool |
| pf := mh.PseudoFields() |
| for i, hf := range pf { |
| switch hf.Name { |
| case ":method", ":path", ":scheme", ":authority": |
| isRequest = true |
| case ":status": |
| isResponse = true |
| default: |
| return http2pseudoHeaderError(hf.Name) |
| } |
| |
| for _, hf2 := range pf[:i] { |
| if hf.Name == hf2.Name { |
| return http2duplicatePseudoHeaderError(hf.Name) |
| } |
| } |
| } |
| if isRequest && isResponse { |
| return http2errMixPseudoHeaderTypes |
| } |
| return nil |
| } |
| |
| func (fr *http2Framer) maxHeaderStringLen() int { |
| v := fr.maxHeaderListSize() |
| if uint32(int(v)) == v { |
| return int(v) |
| } |
| |
| return 0 |
| } |
| |
| // readMetaFrame returns 0 or more CONTINUATION frames from fr and |
| // merge them into into the provided hf and returns a MetaHeadersFrame |
| // with the decoded hpack values. |
| func (fr *http2Framer) readMetaFrame(hf *http2HeadersFrame) (*http2MetaHeadersFrame, error) { |
| if fr.AllowIllegalReads { |
| return nil, errors.New("illegal use of AllowIllegalReads with ReadMetaHeaders") |
| } |
| mh := &http2MetaHeadersFrame{ |
| http2HeadersFrame: hf, |
| } |
| var remainSize = fr.maxHeaderListSize() |
| var sawRegular bool |
| |
| var invalid error // pseudo header field errors |
| hdec := fr.ReadMetaHeaders |
| hdec.SetEmitEnabled(true) |
| hdec.SetMaxStringLength(fr.maxHeaderStringLen()) |
| hdec.SetEmitFunc(func(hf hpack.HeaderField) { |
| if http2VerboseLogs && fr.logReads { |
| fr.debugReadLoggerf("http2: decoded hpack field %+v", hf) |
| } |
| if !httplex.ValidHeaderFieldValue(hf.Value) { |
| invalid = http2headerFieldValueError(hf.Value) |
| } |
| isPseudo := strings.HasPrefix(hf.Name, ":") |
| if isPseudo { |
| if sawRegular { |
| invalid = http2errPseudoAfterRegular |
| } |
| } else { |
| sawRegular = true |
| if !http2validWireHeaderFieldName(hf.Name) { |
| invalid = http2headerFieldNameError(hf.Name) |
| } |
| } |
| |
| if invalid != nil { |
| hdec.SetEmitEnabled(false) |
| return |
| } |
| |
| size := hf.Size() |
| if size > remainSize { |
| hdec.SetEmitEnabled(false) |
| mh.Truncated = true |
| return |
| } |
| remainSize -= size |
| |
| mh.Fields = append(mh.Fields, hf) |
| }) |
| |
| defer hdec.SetEmitFunc(func(hf hpack.HeaderField) {}) |
| |
| var hc http2headersOrContinuation = hf |
| for { |
| frag := hc.HeaderBlockFragment() |
| if _, err := hdec.Write(frag); err != nil { |
| return nil, http2ConnectionError(http2ErrCodeCompression) |
| } |
| |
| if hc.HeadersEnded() { |
| break |
| } |
| if f, err := fr.ReadFrame(); err != nil { |
| return nil, err |
| } else { |
| hc = f.(*http2ContinuationFrame) |
| } |
| } |
| |
| mh.http2HeadersFrame.headerFragBuf = nil |
| mh.http2HeadersFrame.invalidate() |
| |
| if err := hdec.Close(); err != nil { |
| return nil, http2ConnectionError(http2ErrCodeCompression) |
| } |
| if invalid != nil { |
| fr.errDetail = invalid |
| if http2VerboseLogs { |
| log.Printf("http2: invalid header: %v", invalid) |
| } |
| return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol, invalid} |
| } |
| if err := mh.checkPseudos(); err != nil { |
| fr.errDetail = err |
| if http2VerboseLogs { |
| log.Printf("http2: invalid pseudo headers: %v", err) |
| } |
| return nil, http2StreamError{mh.StreamID, http2ErrCodeProtocol, err} |
| } |
| return mh, nil |
| } |
| |
| func http2summarizeFrame(f http2Frame) string { |
| var buf bytes.Buffer |
| f.Header().writeDebug(&buf) |
| switch f := f.(type) { |
| case *http2SettingsFrame: |
| n := 0 |
| f.ForeachSetting(func(s http2Setting) error { |
| n++ |
| if n == 1 { |
| buf.WriteString(", settings:") |
| } |
| fmt.Fprintf(&buf, " %v=%v,", s.ID, s.Val) |
| return nil |
| }) |
| if n > 0 { |
| buf.Truncate(buf.Len() - 1) |
| } |
| case *http2DataFrame: |
| data := f.Data() |
| const max = 256 |
| if len(data) > max { |
| data = data[:max] |
| } |
| fmt.Fprintf(&buf, " data=%q", data) |
| if len(f.Data()) > max { |
| fmt.Fprintf(&buf, " (%d bytes omitted)", len(f.Data())-max) |
| } |
| case *http2WindowUpdateFrame: |
| if f.StreamID == 0 { |
| buf.WriteString(" (conn)") |
| } |
| fmt.Fprintf(&buf, " incr=%v", f.Increment) |
| case *http2PingFrame: |
| fmt.Fprintf(&buf, " ping=%q", f.Data[:]) |
| case *http2GoAwayFrame: |
| fmt.Fprintf(&buf, " LastStreamID=%v ErrCode=%v Debug=%q", |
| f.LastStreamID, f.ErrCode, f.debugData) |
| case *http2RSTStreamFrame: |
| fmt.Fprintf(&buf, " ErrCode=%v", f.ErrCode) |
| } |
| return buf.String() |
| } |
| |
| func http2transportExpectContinueTimeout(t1 *Transport) time.Duration { |
| return t1.ExpectContinueTimeout |
| } |
| |
| // isBadCipher reports whether the cipher is blacklisted by the HTTP/2 spec. |
| func http2isBadCipher(cipher uint16) bool { |
| switch cipher { |
| case tls.TLS_RSA_WITH_RC4_128_SHA, |
| tls.TLS_RSA_WITH_3DES_EDE_CBC_SHA, |
| tls.TLS_RSA_WITH_AES_128_CBC_SHA, |
| tls.TLS_RSA_WITH_AES_256_CBC_SHA, |
| tls.TLS_RSA_WITH_AES_128_GCM_SHA256, |
| tls.TLS_RSA_WITH_AES_256_GCM_SHA384, |
| tls.TLS_ECDHE_ECDSA_WITH_RC4_128_SHA, |
| tls.TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA, |
| tls.TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA, |
| tls.TLS_ECDHE_RSA_WITH_RC4_128_SHA, |
| tls.TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA, |
| tls.TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA, |
| tls.TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA: |
| |
| return true |
| default: |
| return false |
| } |
| } |
| |
| type http2contextContext interface { |
| context.Context |
| } |
| |
| func http2serverConnBaseContext(c net.Conn, opts *http2ServeConnOpts) (ctx http2contextContext, cancel func()) { |
| ctx, cancel = context.WithCancel(context.Background()) |
| ctx = context.WithValue(ctx, LocalAddrContextKey, c.LocalAddr()) |
| if hs := opts.baseConfig(); hs != nil { |
| ctx = context.WithValue(ctx, ServerContextKey, hs) |
| } |
| return |
| } |
| |
| func http2contextWithCancel(ctx http2contextContext) (_ http2contextContext, cancel func()) { |
| return context.WithCancel(ctx) |
| } |
| |
| func http2requestWithContext(req *Request, ctx http2contextContext) *Request { |
| return req.WithContext(ctx) |
| } |
| |
| type http2clientTrace httptrace.ClientTrace |
| |
| func http2reqContext(r *Request) context.Context { return r.Context() } |
| |
| func (t *http2Transport) idleConnTimeout() time.Duration { |
| if t.t1 != nil { |
| return t.t1.IdleConnTimeout |
| } |
| return 0 |
| } |
| |
| func http2setResponseUncompressed(res *Response) { res.Uncompressed = true } |
| |
| func http2traceGotConn(req *Request, cc *http2ClientConn) { |
| trace := httptrace.ContextClientTrace(req.Context()) |
| if trace == nil || trace.GotConn == nil { |
| return |
| } |
| ci := httptrace.GotConnInfo{Conn: cc.tconn} |
| cc.mu.Lock() |
| ci.Reused = cc.nextStreamID > 1 |
| ci.WasIdle = len(cc.streams) == 0 && ci.Reused |
| if ci.WasIdle && !cc.lastActive.IsZero() { |
| ci.IdleTime = time.Now().Sub(cc.lastActive) |
| } |
| cc.mu.Unlock() |
| |
| trace.GotConn(ci) |
| } |
| |
| func http2traceWroteHeaders(trace *http2clientTrace) { |
| if trace != nil && trace.WroteHeaders != nil { |
| trace.WroteHeaders() |
| } |
| } |
| |
| func http2traceGot100Continue(trace *http2clientTrace) { |
| if trace != nil && trace.Got100Continue != nil { |
| trace.Got100Continue() |
| } |
| } |
| |
| func http2traceWait100Continue(trace *http2clientTrace) { |
| if trace != nil && trace.Wait100Continue != nil { |
| trace.Wait100Continue() |
| } |
| } |
| |
| func http2traceWroteRequest(trace *http2clientTrace, err error) { |
| if trace != nil && trace.WroteRequest != nil { |
| trace.WroteRequest(httptrace.WroteRequestInfo{Err: err}) |
| } |
| } |
| |
| func http2traceFirstResponseByte(trace *http2clientTrace) { |
| if trace != nil && trace.GotFirstResponseByte != nil { |
| trace.GotFirstResponseByte() |
| } |
| } |
| |
| func http2requestTrace(req *Request) *http2clientTrace { |
| trace := httptrace.ContextClientTrace(req.Context()) |
| return (*http2clientTrace)(trace) |
| } |
| |
| // Ping sends a PING frame to the server and waits for the ack. |
| func (cc *http2ClientConn) Ping(ctx context.Context) error { |
| return cc.ping(ctx) |
| } |
| |
| func http2cloneTLSConfig(c *tls.Config) *tls.Config { return c.Clone() } |
| |
| var _ Pusher = (*http2responseWriter)(nil) |
| |
| // Push implements http.Pusher. |
| func (w *http2responseWriter) Push(target string, opts *PushOptions) error { |
| internalOpts := http2pushOptions{} |
| if opts != nil { |
| internalOpts.Method = opts.Method |
| internalOpts.Header = opts.Header |
| } |
| return w.push(target, internalOpts) |
| } |
| |
| func http2configureServer18(h1 *Server, h2 *http2Server) error { |
| if h2.IdleTimeout == 0 { |
| if h1.IdleTimeout != 0 { |
| h2.IdleTimeout = h1.IdleTimeout |
| } else { |
| h2.IdleTimeout = h1.ReadTimeout |
| } |
| } |
| return nil |
| } |
| |
| func http2shouldLogPanic(panicValue interface{}) bool { |
| return panicValue != nil && panicValue != ErrAbortHandler |
| } |
| |
| func http2reqGetBody(req *Request) func() (io.ReadCloser, error) { |
| return req.GetBody |
| } |
| |
| func http2reqBodyIsNoBody(body io.ReadCloser) bool { |
| return body == NoBody |
| } |
| |
| var http2DebugGoroutines = os.Getenv("DEBUG_HTTP2_GOROUTINES") == "1" |
| |
| type http2goroutineLock uint64 |
| |
| func http2newGoroutineLock() http2goroutineLock { |
| if !http2DebugGoroutines { |
| return 0 |
| } |
| return http2goroutineLock(http2curGoroutineID()) |
| } |
| |
| func (g http2goroutineLock) check() { |
| if !http2DebugGoroutines { |
| return |
| } |
| if http2curGoroutineID() != uint64(g) { |
| panic("running on the wrong goroutine") |
| } |
| } |
| |
| func (g http2goroutineLock) checkNotOn() { |
| if !http2DebugGoroutines { |
| return |
| } |
| if http2curGoroutineID() == uint64(g) { |
| panic("running on the wrong goroutine") |
| } |
| } |
| |
| var http2goroutineSpace = []byte("goroutine ") |
| |
| func http2curGoroutineID() uint64 { |
| bp := http2littleBuf.Get().(*[]byte) |
| defer http2littleBuf.Put(bp) |
| b := *bp |
| b = b[:runtime.Stack(b, false)] |
| |
| b = bytes.TrimPrefix(b, http2goroutineSpace) |
| i := bytes.IndexByte(b, ' ') |
| if i < 0 { |
| panic(fmt.Sprintf("No space found in %q", b)) |
| } |
| b = b[:i] |
| n, err := http2parseUintBytes(b, 10, 64) |
| if err != nil { |
| panic(fmt.Sprintf("Failed to parse goroutine ID out of %q: %v", b, err)) |
| } |
| return n |
| } |
| |
| var http2littleBuf = sync.Pool{ |
| New: func() interface{} { |
| buf := make([]byte, 64) |
| return &buf |
| }, |
| } |
| |
| // parseUintBytes is like strconv.ParseUint, but using a []byte. |
| func http2parseUintBytes(s []byte, base int, bitSize int) (n uint64, err error) { |
| var cutoff, maxVal uint64 |
| |
| if bitSize == 0 { |
| bitSize = int(strconv.IntSize) |
| } |
| |
| s0 := s |
| switch { |
| case len(s) < 1: |
| err = strconv.ErrSyntax |
| goto Error |
| |
| case 2 <= base && base <= 36: |
| |
| case base == 0: |
| |
| switch { |
| case s[0] == '0' && len(s) > 1 && (s[1] == 'x' || s[1] == 'X'): |
| base = 16 |
| s = s[2:] |
| if len(s) < 1 { |
| err = strconv.ErrSyntax |
| goto Error |
| } |
| case s[0] == '0': |
| base = 8 |
| default: |
| base = 10 |
| } |
| |
| default: |
| err = errors.New("invalid base " + strconv.Itoa(base)) |
| goto Error |
| } |
| |
| n = 0 |
| cutoff = http2cutoff64(base) |
| maxVal = 1<<uint(bitSize) - 1 |
| |
| for i := 0; i < len(s); i++ { |
| var v byte |
| d := s[i] |
| switch { |
| case '0' <= d && d <= '9': |
| v = d - '0' |
| case 'a' <= d && d <= 'z': |
| v = d - 'a' + 10 |
| case 'A' <= d && d <= 'Z': |
| v = d - 'A' + 10 |
| default: |
| n = 0 |
| err = strconv.ErrSyntax |
| goto Error |
| } |
| if int(v) >= base { |
| n = 0 |
| err = strconv.ErrSyntax |
| goto Error |
| } |
| |
| if n >= cutoff { |
| |
| n = 1<<64 - 1 |
| err = strconv.ErrRange |
| goto Error |
| } |
| n *= uint64(base) |
| |
| n1 := n + uint64(v) |
| if n1 < n || n1 > maxVal { |
| |
| n = 1<<64 - 1 |
| err = strconv.ErrRange |
| goto Error |
| } |
| n = n1 |
| } |
| |
| return n, nil |
| |
| Error: |
| return n, &strconv.NumError{Func: "ParseUint", Num: string(s0), Err: err} |
| } |
| |
| // Return the first number n such that n*base >= 1<<64. |
| func http2cutoff64(base int) uint64 { |
| if base < 2 { |
| return 0 |
| } |
| return (1<<64-1)/uint64(base) + 1 |
| } |
| |
| var ( |
| http2commonLowerHeader = map[string]string{} // Go-Canonical-Case -> lower-case |
| http2commonCanonHeader = map[string]string{} // lower-case -> Go-Canonical-Case |
| ) |
| |
| func init() { |
| for _, v := range []string{ |
| "accept", |
| "accept-charset", |
| "accept-encoding", |
| "accept-language", |
| "accept-ranges", |
| "age", |
| "access-control-allow-origin", |
| "allow", |
| "authorization", |
| "cache-control", |
| "content-disposition", |
| "content-encoding", |
| "content-language", |
| "content-length", |
| "content-location", |
| "content-range", |
| "content-type", |
| "cookie", |
| "date", |
| "etag", |
| "expect", |
| "expires", |
| "from", |
| "host", |
| "if-match", |
| "if-modified-since", |
| "if-none-match", |
| "if-unmodified-since", |
| "last-modified", |
| "link", |
| "location", |
| "max-forwards", |
| "proxy-authenticate", |
| "proxy-authorization", |
| "range", |
| "referer", |
| "refresh", |
| "retry-after", |
| "server", |
| "set-cookie", |
| "strict-transport-security", |
| "trailer", |
| "transfer-encoding", |
| "user-agent", |
| "vary", |
| "via", |
| "www-authenticate", |
| } { |
| chk := CanonicalHeaderKey(v) |
| http2commonLowerHeader[chk] = v |
| http2commonCanonHeader[v] = chk |
| } |
| } |
| |
| func http2lowerHeader(v string) string { |
| if s, ok := http2commonLowerHeader[v]; ok { |
| return s |
| } |
| return strings.ToLower(v) |
| } |
| |
| var ( |
| http2VerboseLogs bool |
| http2logFrameWrites bool |
| http2logFrameReads bool |
| http2inTests bool |
| ) |
| |
| func init() { |
| e := os.Getenv("GODEBUG") |
| if strings.Contains(e, "http2debug=1") { |
| http2VerboseLogs = true |
| } |
| if strings.Contains(e, "http2debug=2") { |
| http2VerboseLogs = true |
| http2logFrameWrites = true |
| http2logFrameReads = true |
| } |
| } |
| |
| const ( |
| // ClientPreface is the string that must be sent by new |
| // connections from clients. |
| http2ClientPreface = "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n" |
| |
| // SETTINGS_MAX_FRAME_SIZE default |
| // http://http2.github.io/http2-spec/#rfc.section.6.5.2 |
| http2initialMaxFrameSize = 16384 |
| |
| // NextProtoTLS is the NPN/ALPN protocol negotiated during |
| // HTTP/2's TLS setup. |
| http2NextProtoTLS = "h2" |
| |
| // http://http2.github.io/http2-spec/#SettingValues |
| http2initialHeaderTableSize = 4096 |
| |
| http2initialWindowSize = 65535 // 6.9.2 Initial Flow Control Window Size |
| |
| http2defaultMaxReadFrameSize = 1 << 20 |
| ) |
| |
| var ( |
| http2clientPreface = []byte(http2ClientPreface) |
| ) |
| |
| type http2streamState int |
| |
| // HTTP/2 stream states. |
| // |
| // See http://tools.ietf.org/html/rfc7540#section-5.1. |
| // |
| // For simplicity, the server code merges "reserved (local)" into |
| // "half-closed (remote)". This is one less state transition to track. |
| // The only downside is that we send PUSH_PROMISEs slightly less |
| // liberally than allowable. More discussion here: |
| // https://lists.w3.org/Archives/Public/ietf-http-wg/2016JulSep/0599.html |
| // |
| // "reserved (remote)" is omitted since the client code does not |
| // support server push. |
| const ( |
| http2stateIdle http2streamState = iota |
| http2stateOpen |
| http2stateHalfClosedLocal |
| http2stateHalfClosedRemote |
| http2stateClosed |
| ) |
| |
| var http2stateName = [...]string{ |
| http2stateIdle: "Idle", |
| http2stateOpen: "Open", |
| http2stateHalfClosedLocal: "HalfClosedLocal", |
| http2stateHalfClosedRemote: "HalfClosedRemote", |
| http2stateClosed: "Closed", |
| } |
| |
| func (st http2streamState) String() string { |
| return http2stateName[st] |
| } |
| |
| // Setting is a setting parameter: which setting it is, and its value. |
| type http2Setting struct { |
| // ID is which setting is being set. |
| // See http://http2.github.io/http2-spec/#SettingValues |
| ID http2SettingID |
| |
| // Val is the value. |
| Val uint32 |
| } |
| |
| func (s http2Setting) String() string { |
| return fmt.Sprintf("[%v = %d]", s.ID, s.Val) |
| } |
| |
| // Valid reports whether the setting is valid. |
| func (s http2Setting) Valid() error { |
| |
| switch s.ID { |
| case http2SettingEnablePush: |
| if s.Val != 1 && s.Val != 0 { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| case http2SettingInitialWindowSize: |
| if s.Val > 1<<31-1 { |
| return http2ConnectionError(http2ErrCodeFlowControl) |
| } |
| case http2SettingMaxFrameSize: |
| if s.Val < 16384 || s.Val > 1<<24-1 { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| } |
| return nil |
| } |
| |
| // A SettingID is an HTTP/2 setting as defined in |
| // http://http2.github.io/http2-spec/#iana-settings |
| type http2SettingID uint16 |
| |
| const ( |
| http2SettingHeaderTableSize http2SettingID = 0x1 |
| http2SettingEnablePush http2SettingID = 0x2 |
| http2SettingMaxConcurrentStreams http2SettingID = 0x3 |
| http2SettingInitialWindowSize http2SettingID = 0x4 |
| http2SettingMaxFrameSize http2SettingID = 0x5 |
| http2SettingMaxHeaderListSize http2SettingID = 0x6 |
| ) |
| |
| var http2settingName = map[http2SettingID]string{ |
| http2SettingHeaderTableSize: "HEADER_TABLE_SIZE", |
| http2SettingEnablePush: "ENABLE_PUSH", |
| http2SettingMaxConcurrentStreams: "MAX_CONCURRENT_STREAMS", |
| http2SettingInitialWindowSize: "INITIAL_WINDOW_SIZE", |
| http2SettingMaxFrameSize: "MAX_FRAME_SIZE", |
| http2SettingMaxHeaderListSize: "MAX_HEADER_LIST_SIZE", |
| } |
| |
| func (s http2SettingID) String() string { |
| if v, ok := http2settingName[s]; ok { |
| return v |
| } |
| return fmt.Sprintf("UNKNOWN_SETTING_%d", uint16(s)) |
| } |
| |
| var ( |
| http2errInvalidHeaderFieldName = errors.New("http2: invalid header field name") |
| http2errInvalidHeaderFieldValue = errors.New("http2: invalid header field value") |
| ) |
| |
| // validWireHeaderFieldName reports whether v is a valid header field |
| // name (key). See httplex.ValidHeaderName for the base rules. |
| // |
| // Further, http2 says: |
| // "Just as in HTTP/1.x, header field names are strings of ASCII |
| // characters that are compared in a case-insensitive |
| // fashion. However, header field names MUST be converted to |
| // lowercase prior to their encoding in HTTP/2. " |
| func http2validWireHeaderFieldName(v string) bool { |
| if len(v) == 0 { |
| return false |
| } |
| for _, r := range v { |
| if !httplex.IsTokenRune(r) { |
| return false |
| } |
| if 'A' <= r && r <= 'Z' { |
| return false |
| } |
| } |
| return true |
| } |
| |
| var http2httpCodeStringCommon = map[int]string{} // n -> strconv.Itoa(n) |
| |
| func init() { |
| for i := 100; i <= 999; i++ { |
| if v := StatusText(i); v != "" { |
| http2httpCodeStringCommon[i] = strconv.Itoa(i) |
| } |
| } |
| } |
| |
| func http2httpCodeString(code int) string { |
| if s, ok := http2httpCodeStringCommon[code]; ok { |
| return s |
| } |
| return strconv.Itoa(code) |
| } |
| |
| // from pkg io |
| type http2stringWriter interface { |
| WriteString(s string) (n int, err error) |
| } |
| |
| // A gate lets two goroutines coordinate their activities. |
| type http2gate chan struct{} |
| |
| func (g http2gate) Done() { g <- struct{}{} } |
| |
| func (g http2gate) Wait() { <-g } |
| |
| // A closeWaiter is like a sync.WaitGroup but only goes 1 to 0 (open to closed). |
| type http2closeWaiter chan struct{} |
| |
| // Init makes a closeWaiter usable. |
| // It exists because so a closeWaiter value can be placed inside a |
| // larger struct and have the Mutex and Cond's memory in the same |
| // allocation. |
| func (cw *http2closeWaiter) Init() { |
| *cw = make(chan struct{}) |
| } |
| |
| // Close marks the closeWaiter as closed and unblocks any waiters. |
| func (cw http2closeWaiter) Close() { |
| close(cw) |
| } |
| |
| // Wait waits for the closeWaiter to become closed. |
| func (cw http2closeWaiter) Wait() { |
| <-cw |
| } |
| |
| // bufferedWriter is a buffered writer that writes to w. |
| // Its buffered writer is lazily allocated as needed, to minimize |
| // idle memory usage with many connections. |
| type http2bufferedWriter struct { |
| w io.Writer // immutable |
| bw *bufio.Writer // non-nil when data is buffered |
| } |
| |
| func http2newBufferedWriter(w io.Writer) *http2bufferedWriter { |
| return &http2bufferedWriter{w: w} |
| } |
| |
| // bufWriterPoolBufferSize is the size of bufio.Writer's |
| // buffers created using bufWriterPool. |
| // |
| // TODO: pick a less arbitrary value? this is a bit under |
| // (3 x typical 1500 byte MTU) at least. Other than that, |
| // not much thought went into it. |
| const http2bufWriterPoolBufferSize = 4 << 10 |
| |
| var http2bufWriterPool = sync.Pool{ |
| New: func() interface{} { |
| return bufio.NewWriterSize(nil, http2bufWriterPoolBufferSize) |
| }, |
| } |
| |
| func (w *http2bufferedWriter) Available() int { |
| if w.bw == nil { |
| return http2bufWriterPoolBufferSize |
| } |
| return w.bw.Available() |
| } |
| |
| func (w *http2bufferedWriter) Write(p []byte) (n int, err error) { |
| if w.bw == nil { |
| bw := http2bufWriterPool.Get().(*bufio.Writer) |
| bw.Reset(w.w) |
| w.bw = bw |
| } |
| return w.bw.Write(p) |
| } |
| |
| func (w *http2bufferedWriter) Flush() error { |
| bw := w.bw |
| if bw == nil { |
| return nil |
| } |
| err := bw.Flush() |
| bw.Reset(nil) |
| http2bufWriterPool.Put(bw) |
| w.bw = nil |
| return err |
| } |
| |
| func http2mustUint31(v int32) uint32 { |
| if v < 0 || v > 2147483647 { |
| panic("out of range") |
| } |
| return uint32(v) |
| } |
| |
| // bodyAllowedForStatus reports whether a given response status code |
| // permits a body. See RFC 2616, section 4.4. |
| func http2bodyAllowedForStatus(status int) bool { |
| switch { |
| case status >= 100 && status <= 199: |
| return false |
| case status == 204: |
| return false |
| case status == 304: |
| return false |
| } |
| return true |
| } |
| |
| type http2httpError struct { |
| msg string |
| timeout bool |
| } |
| |
| func (e *http2httpError) Error() string { return e.msg } |
| |
| func (e *http2httpError) Timeout() bool { return e.timeout } |
| |
| func (e *http2httpError) Temporary() bool { return true } |
| |
| var http2errTimeout error = &http2httpError{msg: "http2: timeout awaiting response headers", timeout: true} |
| |
| type http2connectionStater interface { |
| ConnectionState() tls.ConnectionState |
| } |
| |
| var http2sorterPool = sync.Pool{New: func() interface{} { return new(http2sorter) }} |
| |
| type http2sorter struct { |
| v []string // owned by sorter |
| } |
| |
| func (s *http2sorter) Len() int { return len(s.v) } |
| |
| func (s *http2sorter) Swap(i, j int) { s.v[i], s.v[j] = s.v[j], s.v[i] } |
| |
| func (s *http2sorter) Less(i, j int) bool { return s.v[i] < s.v[j] } |
| |
| // Keys returns the sorted keys of h. |
| // |
| // The returned slice is only valid until s used again or returned to |
| // its pool. |
| func (s *http2sorter) Keys(h Header) []string { |
| keys := s.v[:0] |
| for k := range h { |
| keys = append(keys, k) |
| } |
| s.v = keys |
| sort.Sort(s) |
| return keys |
| } |
| |
| func (s *http2sorter) SortStrings(ss []string) { |
| |
| save := s.v |
| s.v = ss |
| sort.Sort(s) |
| s.v = save |
| } |
| |
| // validPseudoPath reports whether v is a valid :path pseudo-header |
| // value. It must be either: |
| // |
| // *) a non-empty string starting with '/', but not with with "//", |
| // *) the string '*', for OPTIONS requests. |
| // |
| // For now this is only used a quick check for deciding when to clean |
| // up Opaque URLs before sending requests from the Transport. |
| // See golang.org/issue/16847 |
| func http2validPseudoPath(v string) bool { |
| return (len(v) > 0 && v[0] == '/' && (len(v) == 1 || v[1] != '/')) || v == "*" |
| } |
| |
| // pipe is a goroutine-safe io.Reader/io.Writer pair. It's like |
| // io.Pipe except there are no PipeReader/PipeWriter halves, and the |
| // underlying buffer is an interface. (io.Pipe is always unbuffered) |
| type http2pipe struct { |
| mu sync.Mutex |
| c sync.Cond // c.L lazily initialized to &p.mu |
| b http2pipeBuffer |
| err error // read error once empty. non-nil means closed. |
| breakErr error // immediate read error (caller doesn't see rest of b) |
| donec chan struct{} // closed on error |
| readFn func() // optional code to run in Read before error |
| } |
| |
| type http2pipeBuffer interface { |
| Len() int |
| io.Writer |
| io.Reader |
| } |
| |
| func (p *http2pipe) Len() int { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| return p.b.Len() |
| } |
| |
| // Read waits until data is available and copies bytes |
| // from the buffer into p. |
| func (p *http2pipe) Read(d []byte) (n int, err error) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if p.c.L == nil { |
| p.c.L = &p.mu |
| } |
| for { |
| if p.breakErr != nil { |
| return 0, p.breakErr |
| } |
| if p.b.Len() > 0 { |
| return p.b.Read(d) |
| } |
| if p.err != nil { |
| if p.readFn != nil { |
| p.readFn() |
| p.readFn = nil |
| } |
| return 0, p.err |
| } |
| p.c.Wait() |
| } |
| } |
| |
| var http2errClosedPipeWrite = errors.New("write on closed buffer") |
| |
| // Write copies bytes from p into the buffer and wakes a reader. |
| // It is an error to write more data than the buffer can hold. |
| func (p *http2pipe) Write(d []byte) (n int, err error) { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if p.c.L == nil { |
| p.c.L = &p.mu |
| } |
| defer p.c.Signal() |
| if p.err != nil { |
| return 0, http2errClosedPipeWrite |
| } |
| return p.b.Write(d) |
| } |
| |
| // CloseWithError causes the next Read (waking up a current blocked |
| // Read if needed) to return the provided err after all data has been |
| // read. |
| // |
| // The error must be non-nil. |
| func (p *http2pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) } |
| |
| // BreakWithError causes the next Read (waking up a current blocked |
| // Read if needed) to return the provided err immediately, without |
| // waiting for unread data. |
| func (p *http2pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) } |
| |
| // closeWithErrorAndCode is like CloseWithError but also sets some code to run |
| // in the caller's goroutine before returning the error. |
| func (p *http2pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) } |
| |
| func (p *http2pipe) closeWithError(dst *error, err error, fn func()) { |
| if err == nil { |
| panic("err must be non-nil") |
| } |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if p.c.L == nil { |
| p.c.L = &p.mu |
| } |
| defer p.c.Signal() |
| if *dst != nil { |
| |
| return |
| } |
| p.readFn = fn |
| *dst = err |
| p.closeDoneLocked() |
| } |
| |
| // requires p.mu be held. |
| func (p *http2pipe) closeDoneLocked() { |
| if p.donec == nil { |
| return |
| } |
| |
| select { |
| case <-p.donec: |
| default: |
| close(p.donec) |
| } |
| } |
| |
| // Err returns the error (if any) first set by BreakWithError or CloseWithError. |
| func (p *http2pipe) Err() error { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if p.breakErr != nil { |
| return p.breakErr |
| } |
| return p.err |
| } |
| |
| // Done returns a channel which is closed if and when this pipe is closed |
| // with CloseWithError. |
| func (p *http2pipe) Done() <-chan struct{} { |
| p.mu.Lock() |
| defer p.mu.Unlock() |
| if p.donec == nil { |
| p.donec = make(chan struct{}) |
| if p.err != nil || p.breakErr != nil { |
| |
| p.closeDoneLocked() |
| } |
| } |
| return p.donec |
| } |
| |
| const ( |
| http2prefaceTimeout = 10 * time.Second |
| http2firstSettingsTimeout = 2 * time.Second // should be in-flight with preface anyway |
| http2handlerChunkWriteSize = 4 << 10 |
| http2defaultMaxStreams = 250 // TODO: make this 100 as the GFE seems to? |
| ) |
| |
| var ( |
| http2errClientDisconnected = errors.New("client disconnected") |
| http2errClosedBody = errors.New("body closed by handler") |
| http2errHandlerComplete = errors.New("http2: request body closed due to handler exiting") |
| http2errStreamClosed = errors.New("http2: stream closed") |
| ) |
| |
| var http2responseWriterStatePool = sync.Pool{ |
| New: func() interface{} { |
| rws := &http2responseWriterState{} |
| rws.bw = bufio.NewWriterSize(http2chunkWriter{rws}, http2handlerChunkWriteSize) |
| return rws |
| }, |
| } |
| |
| // Test hooks. |
| var ( |
| http2testHookOnConn func() |
| http2testHookGetServerConn func(*http2serverConn) |
| http2testHookOnPanicMu *sync.Mutex // nil except in tests |
| http2testHookOnPanic func(sc *http2serverConn, panicVal interface{}) (rePanic bool) |
| ) |
| |
| // Server is an HTTP/2 server. |
| type http2Server struct { |
| // MaxHandlers limits the number of http.Handler ServeHTTP goroutines |
| // which may run at a time over all connections. |
| // Negative or zero no limit. |
| // TODO: implement |
| MaxHandlers int |
| |
| // MaxConcurrentStreams optionally specifies the number of |
| // concurrent streams that each client may have open at a |
| // time. This is unrelated to the number of http.Handler goroutines |
| // which may be active globally, which is MaxHandlers. |
| // If zero, MaxConcurrentStreams defaults to at least 100, per |
| // the HTTP/2 spec's recommendations. |
| MaxConcurrentStreams uint32 |
| |
| // MaxReadFrameSize optionally specifies the largest frame |
| // this server is willing to read. A valid value is between |
| // 16k and 16M, inclusive. If zero or otherwise invalid, a |
| // default value is used. |
| MaxReadFrameSize uint32 |
| |
| // PermitProhibitedCipherSuites, if true, permits the use of |
| // cipher suites prohibited by the HTTP/2 spec. |
| PermitProhibitedCipherSuites bool |
| |
| // IdleTimeout specifies how long until idle clients should be |
| // closed with a GOAWAY frame. PING frames are not considered |
| // activity for the purposes of IdleTimeout. |
| IdleTimeout time.Duration |
| |
| // NewWriteScheduler constructs a write scheduler for a connection. |
| // If nil, a default scheduler is chosen. |
| NewWriteScheduler func() http2WriteScheduler |
| } |
| |
| func (s *http2Server) maxReadFrameSize() uint32 { |
| if v := s.MaxReadFrameSize; v >= http2minMaxFrameSize && v <= http2maxFrameSize { |
| return v |
| } |
| return http2defaultMaxReadFrameSize |
| } |
| |
| func (s *http2Server) maxConcurrentStreams() uint32 { |
| if v := s.MaxConcurrentStreams; v > 0 { |
| return v |
| } |
| return http2defaultMaxStreams |
| } |
| |
| // ConfigureServer adds HTTP/2 support to a net/http Server. |
| // |
| // The configuration conf may be nil. |
| // |
| // ConfigureServer must be called before s begins serving. |
| func http2ConfigureServer(s *Server, conf *http2Server) error { |
| if s == nil { |
| panic("nil *http.Server") |
| } |
| if conf == nil { |
| conf = new(http2Server) |
| } |
| if err := http2configureServer18(s, conf); err != nil { |
| return err |
| } |
| |
| if s.TLSConfig == nil { |
| s.TLSConfig = new(tls.Config) |
| } else if s.TLSConfig.CipherSuites != nil { |
| // If they already provided a CipherSuite list, return |
| // an error if it has a bad order or is missing |
| // ECDHE_RSA_WITH_AES_128_GCM_SHA256. |
| const requiredCipher = tls.TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 |
| haveRequired := false |
| sawBad := false |
| for i, cs := range s.TLSConfig.CipherSuites { |
| if cs == requiredCipher { |
| haveRequired = true |
| } |
| if http2isBadCipher(cs) { |
| sawBad = true |
| } else if sawBad { |
| return fmt.Errorf("http2: TLSConfig.CipherSuites index %d contains an HTTP/2-approved cipher suite (%#04x), but it comes after unapproved cipher suites. With this configuration, clients that don't support previous, approved cipher suites may be given an unapproved one and reject the connection.", i, cs) |
| } |
| } |
| if !haveRequired { |
| return fmt.Errorf("http2: TLSConfig.CipherSuites is missing HTTP/2-required TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256") |
| } |
| } |
| |
| s.TLSConfig.PreferServerCipherSuites = true |
| |
| haveNPN := false |
| for _, p := range s.TLSConfig.NextProtos { |
| if p == http2NextProtoTLS { |
| haveNPN = true |
| break |
| } |
| } |
| if !haveNPN { |
| s.TLSConfig.NextProtos = append(s.TLSConfig.NextProtos, http2NextProtoTLS) |
| } |
| |
| if s.TLSNextProto == nil { |
| s.TLSNextProto = map[string]func(*Server, *tls.Conn, Handler){} |
| } |
| protoHandler := func(hs *Server, c *tls.Conn, h Handler) { |
| if http2testHookOnConn != nil { |
| http2testHookOnConn() |
| } |
| conf.ServeConn(c, &http2ServeConnOpts{ |
| Handler: h, |
| BaseConfig: hs, |
| }) |
| } |
| s.TLSNextProto[http2NextProtoTLS] = protoHandler |
| return nil |
| } |
| |
| // ServeConnOpts are options for the Server.ServeConn method. |
| type http2ServeConnOpts struct { |
| // BaseConfig optionally sets the base configuration |
| // for values. If nil, defaults are used. |
| BaseConfig *Server |
| |
| // Handler specifies which handler to use for processing |
| // requests. If nil, BaseConfig.Handler is used. If BaseConfig |
| // or BaseConfig.Handler is nil, http.DefaultServeMux is used. |
| Handler Handler |
| } |
| |
| func (o *http2ServeConnOpts) baseConfig() *Server { |
| if o != nil && o.BaseConfig != nil { |
| return o.BaseConfig |
| } |
| return new(Server) |
| } |
| |
| func (o *http2ServeConnOpts) handler() Handler { |
| if o != nil { |
| if o.Handler != nil { |
| return o.Handler |
| } |
| if o.BaseConfig != nil && o.BaseConfig.Handler != nil { |
| return o.BaseConfig.Handler |
| } |
| } |
| return DefaultServeMux |
| } |
| |
| // ServeConn serves HTTP/2 requests on the provided connection and |
| // blocks until the connection is no longer readable. |
| // |
| // ServeConn starts speaking HTTP/2 assuming that c has not had any |
| // reads or writes. It writes its initial settings frame and expects |
| // to be able to read the preface and settings frame from the |
| // client. If c has a ConnectionState method like a *tls.Conn, the |
| // ConnectionState is used to verify the TLS ciphersuite and to set |
| // the Request.TLS field in Handlers. |
| // |
| // ServeConn does not support h2c by itself. Any h2c support must be |
| // implemented in terms of providing a suitably-behaving net.Conn. |
| // |
| // The opts parameter is optional. If nil, default values are used. |
| func (s *http2Server) ServeConn(c net.Conn, opts *http2ServeConnOpts) { |
| baseCtx, cancel := http2serverConnBaseContext(c, opts) |
| defer cancel() |
| |
| sc := &http2serverConn{ |
| srv: s, |
| hs: opts.baseConfig(), |
| conn: c, |
| baseCtx: baseCtx, |
| remoteAddrStr: c.RemoteAddr().String(), |
| bw: http2newBufferedWriter(c), |
| handler: opts.handler(), |
| streams: make(map[uint32]*http2stream), |
| readFrameCh: make(chan http2readFrameResult), |
| wantWriteFrameCh: make(chan http2FrameWriteRequest, 8), |
| wantStartPushCh: make(chan http2startPushRequest, 8), |
| wroteFrameCh: make(chan http2frameWriteResult, 1), |
| bodyReadCh: make(chan http2bodyReadMsg), |
| doneServing: make(chan struct{}), |
| clientMaxStreams: math.MaxUint32, |
| advMaxStreams: s.maxConcurrentStreams(), |
| initialWindowSize: http2initialWindowSize, |
| maxFrameSize: http2initialMaxFrameSize, |
| headerTableSize: http2initialHeaderTableSize, |
| serveG: http2newGoroutineLock(), |
| pushEnabled: true, |
| } |
| |
| if sc.hs.WriteTimeout != 0 { |
| sc.conn.SetWriteDeadline(time.Time{}) |
| } |
| |
| if s.NewWriteScheduler != nil { |
| sc.writeSched = s.NewWriteScheduler() |
| } else { |
| sc.writeSched = http2NewRandomWriteScheduler() |
| } |
| |
| sc.flow.add(http2initialWindowSize) |
| sc.inflow.add(http2initialWindowSize) |
| sc.hpackEncoder = hpack.NewEncoder(&sc.headerWriteBuf) |
| |
| fr := http2NewFramer(sc.bw, c) |
| fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil) |
| fr.MaxHeaderListSize = sc.maxHeaderListSize() |
| fr.SetMaxReadFrameSize(s.maxReadFrameSize()) |
| sc.framer = fr |
| |
| if tc, ok := c.(http2connectionStater); ok { |
| sc.tlsState = new(tls.ConnectionState) |
| *sc.tlsState = tc.ConnectionState() |
| |
| if sc.tlsState.Version < tls.VersionTLS12 { |
| sc.rejectConn(http2ErrCodeInadequateSecurity, "TLS version too low") |
| return |
| } |
| |
| if sc.tlsState.ServerName == "" { |
| |
| } |
| |
| if !s.PermitProhibitedCipherSuites && http2isBadCipher(sc.tlsState.CipherSuite) { |
| |
| sc.rejectConn(http2ErrCodeInadequateSecurity, fmt.Sprintf("Prohibited TLS 1.2 Cipher Suite: %x", sc.tlsState.CipherSuite)) |
| return |
| } |
| } |
| |
| if hook := http2testHookGetServerConn; hook != nil { |
| hook(sc) |
| } |
| sc.serve() |
| } |
| |
| func (sc *http2serverConn) rejectConn(err http2ErrCode, debug string) { |
| sc.vlogf("http2: server rejecting conn: %v, %s", err, debug) |
| |
| sc.framer.WriteGoAway(0, err, []byte(debug)) |
| sc.bw.Flush() |
| sc.conn.Close() |
| } |
| |
| type http2serverConn struct { |
| // Immutable: |
| srv *http2Server |
| hs *Server |
| conn net.Conn |
| bw *http2bufferedWriter // writing to conn |
| handler Handler |
| baseCtx http2contextContext |
| framer *http2Framer |
| doneServing chan struct{} // closed when serverConn.serve ends |
| readFrameCh chan http2readFrameResult // written by serverConn.readFrames |
| wantWriteFrameCh chan http2FrameWriteRequest // from handlers -> serve |
| wantStartPushCh chan http2startPushRequest // from handlers -> serve |
| wroteFrameCh chan http2frameWriteResult // from writeFrameAsync -> serve, tickles more frame writes |
| bodyReadCh chan http2bodyReadMsg // from handlers -> serve |
| testHookCh chan func(int) // code to run on the serve loop |
| flow http2flow // conn-wide (not stream-specific) outbound flow control |
| inflow http2flow // conn-wide inbound flow control |
| tlsState *tls.ConnectionState // shared by all handlers, like net/http |
| remoteAddrStr string |
| writeSched http2WriteScheduler |
| |
| // Everything following is owned by the serve loop; use serveG.check(): |
| serveG http2goroutineLock // used to verify funcs are on serve() |
| pushEnabled bool |
| sawFirstSettings bool // got the initial SETTINGS frame after the preface |
| needToSendSettingsAck bool |
| unackedSettings int // how many SETTINGS have we sent without ACKs? |
| clientMaxStreams uint32 // SETTINGS_MAX_CONCURRENT_STREAMS from client (our PUSH_PROMISE limit) |
| advMaxStreams uint32 // our SETTINGS_MAX_CONCURRENT_STREAMS advertised the client |
| curClientStreams uint32 // number of open streams initiated by the client |
| curPushedStreams uint32 // number of open streams initiated by server push |
| maxClientStreamID uint32 // max ever seen from client (odd), or 0 if there have been no client requests |
| maxPushPromiseID uint32 // ID of the last push promise (even), or 0 if there have been no pushes |
| streams map[uint32]*http2stream |
| initialWindowSize int32 |
| maxFrameSize int32 |
| headerTableSize uint32 |
| peerMaxHeaderListSize uint32 // zero means unknown (default) |
| canonHeader map[string]string // http2-lower-case -> Go-Canonical-Case |
| writingFrame bool // started writing a frame (on serve goroutine or separate) |
| writingFrameAsync bool // started a frame on its own goroutine but haven't heard back on wroteFrameCh |
| needsFrameFlush bool // last frame write wasn't a flush |
| inGoAway bool // we've started to or sent GOAWAY |
| inFrameScheduleLoop bool // whether we're in the scheduleFrameWrite loop |
| needToSendGoAway bool // we need to schedule a GOAWAY frame write |
| goAwayCode http2ErrCode |
| shutdownTimerCh <-chan time.Time // nil until used |
| shutdownTimer *time.Timer // nil until used |
| idleTimer *time.Timer // nil if unused |
| idleTimerCh <-chan time.Time // nil if unused |
| |
| // Owned by the writeFrameAsync goroutine: |
| headerWriteBuf bytes.Buffer |
| hpackEncoder *hpack.Encoder |
| } |
| |
| func (sc *http2serverConn) maxHeaderListSize() uint32 { |
| n := sc.hs.MaxHeaderBytes |
| if n <= 0 { |
| n = DefaultMaxHeaderBytes |
| } |
| // http2's count is in a slightly different unit and includes 32 bytes per pair. |
| // So, take the net/http.Server value and pad it up a bit, assuming 10 headers. |
| const perFieldOverhead = 32 // per http2 spec |
| const typicalHeaders = 10 // conservative |
| return uint32(n + typicalHeaders*perFieldOverhead) |
| } |
| |
| func (sc *http2serverConn) curOpenStreams() uint32 { |
| sc.serveG.check() |
| return sc.curClientStreams + sc.curPushedStreams |
| } |
| |
| // stream represents a stream. This is the minimal metadata needed by |
| // the serve goroutine. Most of the actual stream state is owned by |
| // the http.Handler's goroutine in the responseWriter. Because the |
| // responseWriter's responseWriterState is recycled at the end of a |
| // handler, this struct intentionally has no pointer to the |
| // *responseWriter{,State} itself, as the Handler ending nils out the |
| // responseWriter's state field. |
| type http2stream struct { |
| // immutable: |
| sc *http2serverConn |
| id uint32 |
| body *http2pipe // non-nil if expecting DATA frames |
| cw http2closeWaiter // closed wait stream transitions to closed state |
| ctx http2contextContext |
| cancelCtx func() |
| |
| // owned by serverConn's serve loop: |
| bodyBytes int64 // body bytes seen so far |
| declBodyBytes int64 // or -1 if undeclared |
| flow http2flow // limits writing from Handler to client |
| inflow http2flow // what the client is allowed to POST/etc to us |
| parent *http2stream // or nil |
| numTrailerValues int64 |
| weight uint8 |
| state http2streamState |
| resetQueued bool // RST_STREAM queued for write; set by sc.resetStream |
| gotTrailerHeader bool // HEADER frame for trailers was seen |
| wroteHeaders bool // whether we wrote headers (not status 100) |
| reqBuf []byte // if non-nil, body pipe buffer to return later at EOF |
| |
| trailer Header // accumulated trailers |
| reqTrailer Header // handler's Request.Trailer |
| } |
| |
| func (sc *http2serverConn) Framer() *http2Framer { return sc.framer } |
| |
| func (sc *http2serverConn) CloseConn() error { return sc.conn.Close() } |
| |
| func (sc *http2serverConn) Flush() error { return sc.bw.Flush() } |
| |
| func (sc *http2serverConn) HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) { |
| return sc.hpackEncoder, &sc.headerWriteBuf |
| } |
| |
| func (sc *http2serverConn) state(streamID uint32) (http2streamState, *http2stream) { |
| sc.serveG.check() |
| |
| if st, ok := sc.streams[streamID]; ok { |
| return st.state, st |
| } |
| |
| if streamID%2 == 1 { |
| if streamID <= sc.maxClientStreamID { |
| return http2stateClosed, nil |
| } |
| } else { |
| if streamID <= sc.maxPushPromiseID { |
| return http2stateClosed, nil |
| } |
| } |
| return http2stateIdle, nil |
| } |
| |
| // setConnState calls the net/http ConnState hook for this connection, if configured. |
| // Note that the net/http package does StateNew and StateClosed for us. |
| // There is currently no plan for StateHijacked or hijacking HTTP/2 connections. |
| func (sc *http2serverConn) setConnState(state ConnState) { |
| if sc.hs.ConnState != nil { |
| sc.hs.ConnState(sc.conn, state) |
| } |
| } |
| |
| func (sc *http2serverConn) vlogf(format string, args ...interface{}) { |
| if http2VerboseLogs { |
| sc.logf(format, args...) |
| } |
| } |
| |
| func (sc *http2serverConn) logf(format string, args ...interface{}) { |
| if lg := sc.hs.ErrorLog; lg != nil { |
| lg.Printf(format, args...) |
| } else { |
| log.Printf(format, args...) |
| } |
| } |
| |
| // errno returns v's underlying uintptr, else 0. |
| // |
| // TODO: remove this helper function once http2 can use build |
| // tags. See comment in isClosedConnError. |
| func http2errno(v error) uintptr { |
| if rv := reflect.ValueOf(v); rv.Kind() == reflect.Uintptr { |
| return uintptr(rv.Uint()) |
| } |
| return 0 |
| } |
| |
| // isClosedConnError reports whether err is an error from use of a closed |
| // network connection. |
| func http2isClosedConnError(err error) bool { |
| if err == nil { |
| return false |
| } |
| |
| str := err.Error() |
| if strings.Contains(str, "use of closed network connection") { |
| return true |
| } |
| |
| if runtime.GOOS == "windows" { |
| if oe, ok := err.(*net.OpError); ok && oe.Op == "read" { |
| if se, ok := oe.Err.(*os.SyscallError); ok && se.Syscall == "wsarecv" { |
| const WSAECONNABORTED = 10053 |
| const WSAECONNRESET = 10054 |
| if n := http2errno(se.Err); n == WSAECONNRESET || n == WSAECONNABORTED { |
| return true |
| } |
| } |
| } |
| } |
| return false |
| } |
| |
| func (sc *http2serverConn) condlogf(err error, format string, args ...interface{}) { |
| if err == nil { |
| return |
| } |
| if err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) { |
| |
| sc.vlogf(format, args...) |
| } else { |
| sc.logf(format, args...) |
| } |
| } |
| |
| func (sc *http2serverConn) canonicalHeader(v string) string { |
| sc.serveG.check() |
| cv, ok := http2commonCanonHeader[v] |
| if ok { |
| return cv |
| } |
| cv, ok = sc.canonHeader[v] |
| if ok { |
| return cv |
| } |
| if sc.canonHeader == nil { |
| sc.canonHeader = make(map[string]string) |
| } |
| cv = CanonicalHeaderKey(v) |
| sc.canonHeader[v] = cv |
| return cv |
| } |
| |
| type http2readFrameResult struct { |
| f http2Frame // valid until readMore is called |
| err error |
| |
| // readMore should be called once the consumer no longer needs or |
| // retains f. After readMore, f is invalid and more frames can be |
| // read. |
| readMore func() |
| } |
| |
| // readFrames is the loop that reads incoming frames. |
| // It takes care to only read one frame at a time, blocking until the |
| // consumer is done with the frame. |
| // It's run on its own goroutine. |
| func (sc *http2serverConn) readFrames() { |
| gate := make(http2gate) |
| gateDone := gate.Done |
| for { |
| f, err := sc.framer.ReadFrame() |
| select { |
| case sc.readFrameCh <- http2readFrameResult{f, err, gateDone}: |
| case <-sc.doneServing: |
| return |
| } |
| select { |
| case <-gate: |
| case <-sc.doneServing: |
| return |
| } |
| if http2terminalReadFrameError(err) { |
| return |
| } |
| } |
| } |
| |
| // frameWriteResult is the message passed from writeFrameAsync to the serve goroutine. |
| type http2frameWriteResult struct { |
| wr http2FrameWriteRequest // what was written (or attempted) |
| err error // result of the writeFrame call |
| } |
| |
| // writeFrameAsync runs in its own goroutine and writes a single frame |
| // and then reports when it's done. |
| // At most one goroutine can be running writeFrameAsync at a time per |
| // serverConn. |
| func (sc *http2serverConn) writeFrameAsync(wr http2FrameWriteRequest) { |
| err := wr.write.writeFrame(sc) |
| sc.wroteFrameCh <- http2frameWriteResult{wr, err} |
| } |
| |
| func (sc *http2serverConn) closeAllStreamsOnConnClose() { |
| sc.serveG.check() |
| for _, st := range sc.streams { |
| sc.closeStream(st, http2errClientDisconnected) |
| } |
| } |
| |
| func (sc *http2serverConn) stopShutdownTimer() { |
| sc.serveG.check() |
| if t := sc.shutdownTimer; t != nil { |
| t.Stop() |
| } |
| } |
| |
| func (sc *http2serverConn) notePanic() { |
| |
| if http2testHookOnPanicMu != nil { |
| http2testHookOnPanicMu.Lock() |
| defer http2testHookOnPanicMu.Unlock() |
| } |
| if http2testHookOnPanic != nil { |
| if e := recover(); e != nil { |
| if http2testHookOnPanic(sc, e) { |
| panic(e) |
| } |
| } |
| } |
| } |
| |
| func (sc *http2serverConn) serve() { |
| sc.serveG.check() |
| defer sc.notePanic() |
| defer sc.conn.Close() |
| defer sc.closeAllStreamsOnConnClose() |
| defer sc.stopShutdownTimer() |
| defer close(sc.doneServing) |
| |
| if http2VerboseLogs { |
| sc.vlogf("http2: server connection from %v on %p", sc.conn.RemoteAddr(), sc.hs) |
| } |
| |
| sc.writeFrame(http2FrameWriteRequest{ |
| write: http2writeSettings{ |
| {http2SettingMaxFrameSize, sc.srv.maxReadFrameSize()}, |
| {http2SettingMaxConcurrentStreams, sc.advMaxStreams}, |
| {http2SettingMaxHeaderListSize, sc.maxHeaderListSize()}, |
| }, |
| }) |
| sc.unackedSettings++ |
| |
| if err := sc.readPreface(); err != nil { |
| sc.condlogf(err, "http2: server: error reading preface from client %v: %v", sc.conn.RemoteAddr(), err) |
| return |
| } |
| |
| sc.setConnState(StateActive) |
| sc.setConnState(StateIdle) |
| |
| if sc.srv.IdleTimeout != 0 { |
| sc.idleTimer = time.NewTimer(sc.srv.IdleTimeout) |
| defer sc.idleTimer.Stop() |
| sc.idleTimerCh = sc.idleTimer.C |
| } |
| |
| var gracefulShutdownCh chan struct{} |
| if sc.hs != nil { |
| ch := http2h1ServerShutdownChan(sc.hs) |
| if ch != nil { |
| gracefulShutdownCh = make(chan struct{}) |
| go sc.awaitGracefulShutdown(ch, gracefulShutdownCh) |
| } |
| } |
| |
| go sc.readFrames() |
| |
| settingsTimer := time.NewTimer(http2firstSettingsTimeout) |
| loopNum := 0 |
| for { |
| loopNum++ |
| select { |
| case wr := <-sc.wantWriteFrameCh: |
| sc.writeFrame(wr) |
| case spr := <-sc.wantStartPushCh: |
| sc.startPush(spr) |
| case res := <-sc.wroteFrameCh: |
| sc.wroteFrame(res) |
| case res := <-sc.readFrameCh: |
| if !sc.processFrameFromReader(res) { |
| return |
| } |
| res.readMore() |
| if settingsTimer.C != nil { |
| settingsTimer.Stop() |
| settingsTimer.C = nil |
| } |
| case m := <-sc.bodyReadCh: |
| sc.noteBodyRead(m.st, m.n) |
| case <-settingsTimer.C: |
| sc.logf("timeout waiting for SETTINGS frames from %v", sc.conn.RemoteAddr()) |
| return |
| case <-gracefulShutdownCh: |
| gracefulShutdownCh = nil |
| sc.startGracefulShutdown() |
| case <-sc.shutdownTimerCh: |
| sc.vlogf("GOAWAY close timer fired; closing conn from %v", sc.conn.RemoteAddr()) |
| return |
| case <-sc.idleTimerCh: |
| sc.vlogf("connection is idle") |
| sc.goAway(http2ErrCodeNo) |
| case fn := <-sc.testHookCh: |
| fn(loopNum) |
| } |
| |
| if sc.inGoAway && sc.curOpenStreams() == 0 && !sc.needToSendGoAway && !sc.writingFrame { |
| return |
| } |
| } |
| } |
| |
| func (sc *http2serverConn) awaitGracefulShutdown(sharedCh <-chan struct{}, privateCh chan struct{}) { |
| select { |
| case <-sc.doneServing: |
| case <-sharedCh: |
| close(privateCh) |
| } |
| } |
| |
| // readPreface reads the ClientPreface greeting from the peer |
| // or returns an error on timeout or an invalid greeting. |
| func (sc *http2serverConn) readPreface() error { |
| errc := make(chan error, 1) |
| go func() { |
| |
| buf := make([]byte, len(http2ClientPreface)) |
| if _, err := io.ReadFull(sc.conn, buf); err != nil { |
| errc <- err |
| } else if !bytes.Equal(buf, http2clientPreface) { |
| errc <- fmt.Errorf("bogus greeting %q", buf) |
| } else { |
| errc <- nil |
| } |
| }() |
| timer := time.NewTimer(http2prefaceTimeout) |
| defer timer.Stop() |
| select { |
| case <-timer.C: |
| return errors.New("timeout waiting for client preface") |
| case err := <-errc: |
| if err == nil { |
| if http2VerboseLogs { |
| sc.vlogf("http2: server: client %v said hello", sc.conn.RemoteAddr()) |
| } |
| } |
| return err |
| } |
| } |
| |
| var http2errChanPool = sync.Pool{ |
| New: func() interface{} { return make(chan error, 1) }, |
| } |
| |
| var http2writeDataPool = sync.Pool{ |
| New: func() interface{} { return new(http2writeData) }, |
| } |
| |
| // writeDataFromHandler writes DATA response frames from a handler on |
| // the given stream. |
| func (sc *http2serverConn) writeDataFromHandler(stream *http2stream, data []byte, endStream bool) error { |
| ch := http2errChanPool.Get().(chan error) |
| writeArg := http2writeDataPool.Get().(*http2writeData) |
| *writeArg = http2writeData{stream.id, data, endStream} |
| err := sc.writeFrameFromHandler(http2FrameWriteRequest{ |
| write: writeArg, |
| stream: stream, |
| done: ch, |
| }) |
| if err != nil { |
| return err |
| } |
| var frameWriteDone bool // the frame write is done (successfully or not) |
| select { |
| case err = <-ch: |
| frameWriteDone = true |
| case <-sc.doneServing: |
| return http2errClientDisconnected |
| case <-stream.cw: |
| |
| select { |
| case err = <-ch: |
| frameWriteDone = true |
| default: |
| return http2errStreamClosed |
| } |
| } |
| http2errChanPool.Put(ch) |
| if frameWriteDone { |
| http2writeDataPool.Put(writeArg) |
| } |
| return err |
| } |
| |
| // writeFrameFromHandler sends wr to sc.wantWriteFrameCh, but aborts |
| // if the connection has gone away. |
| // |
| // This must not be run from the serve goroutine itself, else it might |
| // deadlock writing to sc.wantWriteFrameCh (which is only mildly |
| // buffered and is read by serve itself). If you're on the serve |
| // goroutine, call writeFrame instead. |
| func (sc *http2serverConn) writeFrameFromHandler(wr http2FrameWriteRequest) error { |
| sc.serveG.checkNotOn() |
| select { |
| case sc.wantWriteFrameCh <- wr: |
| return nil |
| case <-sc.doneServing: |
| |
| return http2errClientDisconnected |
| } |
| } |
| |
| // writeFrame schedules a frame to write and sends it if there's nothing |
| // already being written. |
| // |
| // There is no pushback here (the serve goroutine never blocks). It's |
| // the http.Handlers that block, waiting for their previous frames to |
| // make it onto the wire |
| // |
| // If you're not on the serve goroutine, use writeFrameFromHandler instead. |
| func (sc *http2serverConn) writeFrame(wr http2FrameWriteRequest) { |
| sc.serveG.check() |
| |
| // If true, wr will not be written and wr.done will not be signaled. |
| var ignoreWrite bool |
| |
| if wr.StreamID() != 0 { |
| _, isReset := wr.write.(http2StreamError) |
| if state, _ := sc.state(wr.StreamID()); state == http2stateClosed && !isReset { |
| ignoreWrite = true |
| } |
| } |
| |
| switch wr.write.(type) { |
| case *http2writeResHeaders: |
| wr.stream.wroteHeaders = true |
| case http2write100ContinueHeadersFrame: |
| if wr.stream.wroteHeaders { |
| |
| if wr.done != nil { |
| panic("wr.done != nil for write100ContinueHeadersFrame") |
| } |
| ignoreWrite = true |
| } |
| } |
| |
| if !ignoreWrite { |
| sc.writeSched.Push(wr) |
| } |
| sc.scheduleFrameWrite() |
| } |
| |
| // startFrameWrite starts a goroutine to write wr (in a separate |
| // goroutine since that might block on the network), and updates the |
| // serve goroutine's state about the world, updated from info in wr. |
| func (sc *http2serverConn) startFrameWrite(wr http2FrameWriteRequest) { |
| sc.serveG.check() |
| if sc.writingFrame { |
| panic("internal error: can only be writing one frame at a time") |
| } |
| |
| st := wr.stream |
| if st != nil { |
| switch st.state { |
| case http2stateHalfClosedLocal: |
| switch wr.write.(type) { |
| case http2StreamError, http2handlerPanicRST, http2writeWindowUpdate: |
| |
| default: |
| panic(fmt.Sprintf("internal error: attempt to send frame on a half-closed-local stream: %v", wr)) |
| } |
| case http2stateClosed: |
| panic(fmt.Sprintf("internal error: attempt to send frame on a closed stream: %v", wr)) |
| } |
| } |
| if wpp, ok := wr.write.(*http2writePushPromise); ok { |
| var err error |
| wpp.promisedID, err = wpp.allocatePromisedID() |
| if err != nil { |
| sc.writingFrameAsync = false |
| wr.replyToWriter(err) |
| return |
| } |
| } |
| |
| sc.writingFrame = true |
| sc.needsFrameFlush = true |
| if wr.write.staysWithinBuffer(sc.bw.Available()) { |
| sc.writingFrameAsync = false |
| err := wr.write.writeFrame(sc) |
| sc.wroteFrame(http2frameWriteResult{wr, err}) |
| } else { |
| sc.writingFrameAsync = true |
| go sc.writeFrameAsync(wr) |
| } |
| } |
| |
| // errHandlerPanicked is the error given to any callers blocked in a read from |
| // Request.Body when the main goroutine panics. Since most handlers read in the |
| // the main ServeHTTP goroutine, this will show up rarely. |
| var http2errHandlerPanicked = errors.New("http2: handler panicked") |
| |
| // wroteFrame is called on the serve goroutine with the result of |
| // whatever happened on writeFrameAsync. |
| func (sc *http2serverConn) wroteFrame(res http2frameWriteResult) { |
| sc.serveG.check() |
| if !sc.writingFrame { |
| panic("internal error: expected to be already writing a frame") |
| } |
| sc.writingFrame = false |
| sc.writingFrameAsync = false |
| |
| wr := res.wr |
| |
| if http2writeEndsStream(wr.write) { |
| st := wr.stream |
| if st == nil { |
| panic("internal error: expecting non-nil stream") |
| } |
| switch st.state { |
| case http2stateOpen: |
| |
| st.state = http2stateHalfClosedLocal |
| sc.resetStream(http2streamError(st.id, http2ErrCodeCancel)) |
| case http2stateHalfClosedRemote: |
| sc.closeStream(st, http2errHandlerComplete) |
| } |
| } else { |
| switch v := wr.write.(type) { |
| case http2StreamError: |
| |
| if st, ok := sc.streams[v.StreamID]; ok { |
| sc.closeStream(st, v) |
| } |
| case http2handlerPanicRST: |
| sc.closeStream(wr.stream, http2errHandlerPanicked) |
| } |
| } |
| |
| wr.replyToWriter(res.err) |
| |
| sc.scheduleFrameWrite() |
| } |
| |
| // scheduleFrameWrite tickles the frame writing scheduler. |
| // |
| // If a frame is already being written, nothing happens. This will be called again |
| // when the frame is done being written. |
| // |
| // If a frame isn't being written we need to send one, the best frame |
| // to send is selected, preferring first things that aren't |
| // stream-specific (e.g. ACKing settings), and then finding the |
| // highest priority stream. |
| // |
| // If a frame isn't being written and there's nothing else to send, we |
| // flush the write buffer. |
| func (sc *http2serverConn) scheduleFrameWrite() { |
| sc.serveG.check() |
| if sc.writingFrame || sc.inFrameScheduleLoop { |
| return |
| } |
| sc.inFrameScheduleLoop = true |
| for !sc.writingFrameAsync { |
| if sc.needToSendGoAway { |
| sc.needToSendGoAway = false |
| sc.startFrameWrite(http2FrameWriteRequest{ |
| write: &http2writeGoAway{ |
| maxStreamID: sc.maxClientStreamID, |
| code: sc.goAwayCode, |
| }, |
| }) |
| continue |
| } |
| if sc.needToSendSettingsAck { |
| sc.needToSendSettingsAck = false |
| sc.startFrameWrite(http2FrameWriteRequest{write: http2writeSettingsAck{}}) |
| continue |
| } |
| if !sc.inGoAway || sc.goAwayCode == http2ErrCodeNo { |
| if wr, ok := sc.writeSched.Pop(); ok { |
| sc.startFrameWrite(wr) |
| continue |
| } |
| } |
| if sc.needsFrameFlush { |
| sc.startFrameWrite(http2FrameWriteRequest{write: http2flushFrameWriter{}}) |
| sc.needsFrameFlush = false |
| continue |
| } |
| break |
| } |
| sc.inFrameScheduleLoop = false |
| } |
| |
| // startGracefulShutdown sends a GOAWAY with ErrCodeNo to tell the |
| // client we're gracefully shutting down. The connection isn't closed |
| // until all current streams are done. |
| func (sc *http2serverConn) startGracefulShutdown() { |
| sc.goAwayIn(http2ErrCodeNo, 0) |
| } |
| |
| func (sc *http2serverConn) goAway(code http2ErrCode) { |
| sc.serveG.check() |
| var forceCloseIn time.Duration |
| if code != http2ErrCodeNo { |
| forceCloseIn = 250 * time.Millisecond |
| } else { |
| |
| forceCloseIn = 1 * time.Second |
| } |
| sc.goAwayIn(code, forceCloseIn) |
| } |
| |
| func (sc *http2serverConn) goAwayIn(code http2ErrCode, forceCloseIn time.Duration) { |
| sc.serveG.check() |
| if sc.inGoAway { |
| return |
| } |
| if forceCloseIn != 0 { |
| sc.shutDownIn(forceCloseIn) |
| } |
| sc.inGoAway = true |
| sc.needToSendGoAway = true |
| sc.goAwayCode = code |
| sc.scheduleFrameWrite() |
| } |
| |
| func (sc *http2serverConn) shutDownIn(d time.Duration) { |
| sc.serveG.check() |
| sc.shutdownTimer = time.NewTimer(d) |
| sc.shutdownTimerCh = sc.shutdownTimer.C |
| } |
| |
| func (sc *http2serverConn) resetStream(se http2StreamError) { |
| sc.serveG.check() |
| sc.writeFrame(http2FrameWriteRequest{write: se}) |
| if st, ok := sc.streams[se.StreamID]; ok { |
| st.resetQueued = true |
| } |
| } |
| |
| // processFrameFromReader processes the serve loop's read from readFrameCh from the |
| // frame-reading goroutine. |
| // processFrameFromReader returns whether the connection should be kept open. |
| func (sc *http2serverConn) processFrameFromReader(res http2readFrameResult) bool { |
| sc.serveG.check() |
| err := res.err |
| if err != nil { |
| if err == http2ErrFrameTooLarge { |
| sc.goAway(http2ErrCodeFrameSize) |
| return true |
| } |
| clientGone := err == io.EOF || err == io.ErrUnexpectedEOF || http2isClosedConnError(err) |
| if clientGone { |
| |
| return false |
| } |
| } else { |
| f := res.f |
| if http2VerboseLogs { |
| sc.vlogf("http2: server read frame %v", http2summarizeFrame(f)) |
| } |
| err = sc.processFrame(f) |
| if err == nil { |
| return true |
| } |
| } |
| |
| switch ev := err.(type) { |
| case http2StreamError: |
| sc.resetStream(ev) |
| return true |
| case http2goAwayFlowError: |
| sc.goAway(http2ErrCodeFlowControl) |
| return true |
| case http2ConnectionError: |
| sc.logf("http2: server connection error from %v: %v", sc.conn.RemoteAddr(), ev) |
| sc.goAway(http2ErrCode(ev)) |
| return true |
| default: |
| if res.err != nil { |
| sc.vlogf("http2: server closing client connection; error reading frame from client %s: %v", sc.conn.RemoteAddr(), err) |
| } else { |
| sc.logf("http2: server closing client connection: %v", err) |
| } |
| return false |
| } |
| } |
| |
| func (sc *http2serverConn) processFrame(f http2Frame) error { |
| sc.serveG.check() |
| |
| if !sc.sawFirstSettings { |
| if _, ok := f.(*http2SettingsFrame); !ok { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| sc.sawFirstSettings = true |
| } |
| |
| switch f := f.(type) { |
| case *http2SettingsFrame: |
| return sc.processSettings(f) |
| case *http2MetaHeadersFrame: |
| return sc.processHeaders(f) |
| case *http2WindowUpdateFrame: |
| return sc.processWindowUpdate(f) |
| case *http2PingFrame: |
| return sc.processPing(f) |
| case *http2DataFrame: |
| return sc.processData(f) |
| case *http2RSTStreamFrame: |
| return sc.processResetStream(f) |
| case *http2PriorityFrame: |
| return sc.processPriority(f) |
| case *http2GoAwayFrame: |
| return sc.processGoAway(f) |
| case *http2PushPromiseFrame: |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| default: |
| sc.vlogf("http2: server ignoring frame: %v", f.Header()) |
| return nil |
| } |
| } |
| |
| func (sc *http2serverConn) processPing(f *http2PingFrame) error { |
| sc.serveG.check() |
| if f.IsAck() { |
| |
| return nil |
| } |
| if f.StreamID != 0 { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if sc.inGoAway && sc.goAwayCode != http2ErrCodeNo { |
| return nil |
| } |
| sc.writeFrame(http2FrameWriteRequest{write: http2writePingAck{f}}) |
| return nil |
| } |
| |
| func (sc *http2serverConn) processWindowUpdate(f *http2WindowUpdateFrame) error { |
| sc.serveG.check() |
| switch { |
| case f.StreamID != 0: |
| state, st := sc.state(f.StreamID) |
| if state == http2stateIdle { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if st == nil { |
| |
| return nil |
| } |
| if !st.flow.add(int32(f.Increment)) { |
| return http2streamError(f.StreamID, http2ErrCodeFlowControl) |
| } |
| default: |
| if !sc.flow.add(int32(f.Increment)) { |
| return http2goAwayFlowError{} |
| } |
| } |
| sc.scheduleFrameWrite() |
| return nil |
| } |
| |
| func (sc *http2serverConn) processResetStream(f *http2RSTStreamFrame) error { |
| sc.serveG.check() |
| |
| state, st := sc.state(f.StreamID) |
| if state == http2stateIdle { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if st != nil { |
| st.cancelCtx() |
| sc.closeStream(st, http2streamError(f.StreamID, f.ErrCode)) |
| } |
| return nil |
| } |
| |
| func (sc *http2serverConn) closeStream(st *http2stream, err error) { |
| sc.serveG.check() |
| if st.state == http2stateIdle || st.state == http2stateClosed { |
| panic(fmt.Sprintf("invariant; can't close stream in state %v", st.state)) |
| } |
| st.state = http2stateClosed |
| if st.isPushed() { |
| sc.curPushedStreams-- |
| } else { |
| sc.curClientStreams-- |
| } |
| delete(sc.streams, st.id) |
| if len(sc.streams) == 0 { |
| sc.setConnState(StateIdle) |
| if sc.srv.IdleTimeout != 0 { |
| sc.idleTimer.Reset(sc.srv.IdleTimeout) |
| } |
| if http2h1ServerKeepAlivesDisabled(sc.hs) { |
| sc.startGracefulShutdown() |
| } |
| } |
| if p := st.body; p != nil { |
| |
| sc.sendWindowUpdate(nil, p.Len()) |
| |
| p.CloseWithError(err) |
| } |
| st.cw.Close() |
| sc.writeSched.CloseStream(st.id) |
| } |
| |
| func (sc *http2serverConn) processSettings(f *http2SettingsFrame) error { |
| sc.serveG.check() |
| if f.IsAck() { |
| sc.unackedSettings-- |
| if sc.unackedSettings < 0 { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| return nil |
| } |
| if err := f.ForeachSetting(sc.processSetting); err != nil { |
| return err |
| } |
| sc.needToSendSettingsAck = true |
| sc.scheduleFrameWrite() |
| return nil |
| } |
| |
| func (sc *http2serverConn) processSetting(s http2Setting) error { |
| sc.serveG.check() |
| if err := s.Valid(); err != nil { |
| return err |
| } |
| if http2VerboseLogs { |
| sc.vlogf("http2: server processing setting %v", s) |
| } |
| switch s.ID { |
| case http2SettingHeaderTableSize: |
| sc.headerTableSize = s.Val |
| sc.hpackEncoder.SetMaxDynamicTableSize(s.Val) |
| case http2SettingEnablePush: |
| sc.pushEnabled = s.Val != 0 |
| case http2SettingMaxConcurrentStreams: |
| sc.clientMaxStreams = s.Val |
| case http2SettingInitialWindowSize: |
| return sc.processSettingInitialWindowSize(s.Val) |
| case http2SettingMaxFrameSize: |
| sc.maxFrameSize = int32(s.Val) |
| case http2SettingMaxHeaderListSize: |
| sc.peerMaxHeaderListSize = s.Val |
| default: |
| |
| if http2VerboseLogs { |
| sc.vlogf("http2: server ignoring unknown setting %v", s) |
| } |
| } |
| return nil |
| } |
| |
| func (sc *http2serverConn) processSettingInitialWindowSize(val uint32) error { |
| sc.serveG.check() |
| |
| old := sc.initialWindowSize |
| sc.initialWindowSize = int32(val) |
| growth := sc.initialWindowSize - old |
| for _, st := range sc.streams { |
| if !st.flow.add(growth) { |
| |
| return http2ConnectionError(http2ErrCodeFlowControl) |
| } |
| } |
| return nil |
| } |
| |
| func (sc *http2serverConn) processData(f *http2DataFrame) error { |
| sc.serveG.check() |
| if sc.inGoAway && sc.goAwayCode != http2ErrCodeNo { |
| return nil |
| } |
| data := f.Data() |
| |
| id := f.Header().StreamID |
| state, st := sc.state(id) |
| if id == 0 || state == http2stateIdle { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if st == nil || state != http2stateOpen || st.gotTrailerHeader || st.resetQueued { |
| |
| if sc.inflow.available() < int32(f.Length) { |
| return http2streamError(id, http2ErrCodeFlowControl) |
| } |
| |
| sc.inflow.take(int32(f.Length)) |
| sc.sendWindowUpdate(nil, int(f.Length)) |
| |
| if st != nil && st.resetQueued { |
| |
| return nil |
| } |
| return http2streamError(id, http2ErrCodeStreamClosed) |
| } |
| if st.body == nil { |
| panic("internal error: should have a body in this state") |
| } |
| |
| if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes { |
| st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes)) |
| return http2streamError(id, http2ErrCodeStreamClosed) |
| } |
| if f.Length > 0 { |
| |
| if st.inflow.available() < int32(f.Length) { |
| return http2streamError(id, http2ErrCodeFlowControl) |
| } |
| st.inflow.take(int32(f.Length)) |
| |
| if len(data) > 0 { |
| wrote, err := st.body.Write(data) |
| if err != nil { |
| return http2streamError(id, http2ErrCodeStreamClosed) |
| } |
| if wrote != len(data) { |
| panic("internal error: bad Writer") |
| } |
| st.bodyBytes += int64(len(data)) |
| } |
| |
| if pad := int32(f.Length) - int32(len(data)); pad > 0 { |
| sc.sendWindowUpdate32(nil, pad) |
| sc.sendWindowUpdate32(st, pad) |
| } |
| } |
| if f.StreamEnded() { |
| st.endStream() |
| } |
| return nil |
| } |
| |
| func (sc *http2serverConn) processGoAway(f *http2GoAwayFrame) error { |
| sc.serveG.check() |
| if f.ErrCode != http2ErrCodeNo { |
| sc.logf("http2: received GOAWAY %+v, starting graceful shutdown", f) |
| } else { |
| sc.vlogf("http2: received GOAWAY %+v, starting graceful shutdown", f) |
| } |
| sc.startGracefulShutdown() |
| |
| sc.pushEnabled = false |
| return nil |
| } |
| |
| // isPushed reports whether the stream is server-initiated. |
| func (st *http2stream) isPushed() bool { |
| return st.id%2 == 0 |
| } |
| |
| // endStream closes a Request.Body's pipe. It is called when a DATA |
| // frame says a request body is over (or after trailers). |
| func (st *http2stream) endStream() { |
| sc := st.sc |
| sc.serveG.check() |
| |
| if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes { |
| st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes", |
| st.declBodyBytes, st.bodyBytes)) |
| } else { |
| st.body.closeWithErrorAndCode(io.EOF, st.copyTrailersToHandlerRequest) |
| st.body.CloseWithError(io.EOF) |
| } |
| st.state = http2stateHalfClosedRemote |
| } |
| |
| // copyTrailersToHandlerRequest is run in the Handler's goroutine in |
| // its Request.Body.Read just before it gets io.EOF. |
| func (st *http2stream) copyTrailersToHandlerRequest() { |
| for k, vv := range st.trailer { |
| if _, ok := st.reqTrailer[k]; ok { |
| |
| st.reqTrailer[k] = vv |
| } |
| } |
| } |
| |
| func (sc *http2serverConn) processHeaders(f *http2MetaHeadersFrame) error { |
| sc.serveG.check() |
| id := f.StreamID |
| if sc.inGoAway { |
| |
| return nil |
| } |
| |
| if id%2 != 1 { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| |
| if st := sc.streams[f.StreamID]; st != nil { |
| if st.resetQueued { |
| |
| return nil |
| } |
| return st.processTrailerHeaders(f) |
| } |
| |
| if id <= sc.maxClientStreamID { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| sc.maxClientStreamID = id |
| |
| if sc.idleTimer != nil { |
| sc.idleTimer.Stop() |
| } |
| |
| if sc.curClientStreams+1 > sc.advMaxStreams { |
| if sc.unackedSettings == 0 { |
| |
| return http2streamError(id, http2ErrCodeProtocol) |
| } |
| |
| return http2streamError(id, http2ErrCodeRefusedStream) |
| } |
| |
| initialState := http2stateOpen |
| if f.StreamEnded() { |
| initialState = http2stateHalfClosedRemote |
| } |
| st := sc.newStream(id, 0, initialState) |
| |
| if f.HasPriority() { |
| if err := http2checkPriority(f.StreamID, f.Priority); err != nil { |
| return err |
| } |
| sc.writeSched.AdjustStream(st.id, f.Priority) |
| } |
| |
| rw, req, err := sc.newWriterAndRequest(st, f) |
| if err != nil { |
| return err |
| } |
| st.reqTrailer = req.Trailer |
| if st.reqTrailer != nil { |
| st.trailer = make(Header) |
| } |
| st.body = req.Body.(*http2requestBody).pipe |
| st.declBodyBytes = req.ContentLength |
| |
| handler := sc.handler.ServeHTTP |
| if f.Truncated { |
| |
| handler = http2handleHeaderListTooLong |
| } else if err := http2checkValidHTTP2RequestHeaders(req.Header); err != nil { |
| handler = http2new400Handler(err) |
| } |
| |
| if sc.hs.ReadTimeout != 0 { |
| sc.conn.SetReadDeadline(time.Time{}) |
| } |
| |
| go sc.runHandler(rw, req, handler) |
| return nil |
| } |
| |
| func (st *http2stream) processTrailerHeaders(f *http2MetaHeadersFrame) error { |
| sc := st.sc |
| sc.serveG.check() |
| if st.gotTrailerHeader { |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| st.gotTrailerHeader = true |
| if !f.StreamEnded() { |
| return http2streamError(st.id, http2ErrCodeProtocol) |
| } |
| |
| if len(f.PseudoFields()) > 0 { |
| return http2streamError(st.id, http2ErrCodeProtocol) |
| } |
| if st.trailer != nil { |
| for _, hf := range f.RegularFields() { |
| key := sc.canonicalHeader(hf.Name) |
| if !http2ValidTrailerHeader(key) { |
| |
| return http2streamError(st.id, http2ErrCodeProtocol) |
| } |
| st.trailer[key] = append(st.trailer[key], hf.Value) |
| } |
| } |
| st.endStream() |
| return nil |
| } |
| |
| func http2checkPriority(streamID uint32, p http2PriorityParam) error { |
| if streamID == p.StreamDep { |
| |
| return http2streamError(streamID, http2ErrCodeProtocol) |
| } |
| return nil |
| } |
| |
| func (sc *http2serverConn) processPriority(f *http2PriorityFrame) error { |
| if sc.inGoAway { |
| return nil |
| } |
| if err := http2checkPriority(f.StreamID, f.http2PriorityParam); err != nil { |
| return err |
| } |
| sc.writeSched.AdjustStream(f.StreamID, f.http2PriorityParam) |
| return nil |
| } |
| |
| func (sc *http2serverConn) newStream(id, pusherID uint32, state http2streamState) *http2stream { |
| sc.serveG.check() |
| if id == 0 { |
| panic("internal error: cannot create stream with id 0") |
| } |
| |
| ctx, cancelCtx := http2contextWithCancel(sc.baseCtx) |
| st := &http2stream{ |
| sc: sc, |
| id: id, |
| state: state, |
| ctx: ctx, |
| cancelCtx: cancelCtx, |
| } |
| st.cw.Init() |
| st.flow.conn = &sc.flow |
| st.flow.add(sc.initialWindowSize) |
| st.inflow.conn = &sc.inflow |
| st.inflow.add(http2initialWindowSize) |
| |
| sc.streams[id] = st |
| sc.writeSched.OpenStream(st.id, http2OpenStreamOptions{PusherID: pusherID}) |
| if st.isPushed() { |
| sc.curPushedStreams++ |
| } else { |
| sc.curClientStreams++ |
| } |
| if sc.curOpenStreams() == 1 { |
| sc.setConnState(StateActive) |
| } |
| |
| return st |
| } |
| |
| func (sc *http2serverConn) newWriterAndRequest(st *http2stream, f *http2MetaHeadersFrame) (*http2responseWriter, *Request, error) { |
| sc.serveG.check() |
| |
| rp := http2requestParam{ |
| method: f.PseudoValue("method"), |
| scheme: f.PseudoValue("scheme"), |
| authority: f.PseudoValue("authority"), |
| path: f.PseudoValue("path"), |
| } |
| |
| isConnect := rp.method == "CONNECT" |
| if isConnect { |
| if rp.path != "" || rp.scheme != "" || rp.authority == "" { |
| return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) |
| } |
| } else if rp.method == "" || rp.path == "" || (rp.scheme != "https" && rp.scheme != "http") { |
| |
| return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) |
| } |
| |
| bodyOpen := !f.StreamEnded() |
| if rp.method == "HEAD" && bodyOpen { |
| |
| return nil, nil, http2streamError(f.StreamID, http2ErrCodeProtocol) |
| } |
| |
| rp.header = make(Header) |
| for _, hf := range f.RegularFields() { |
| rp.header.Add(sc.canonicalHeader(hf.Name), hf.Value) |
| } |
| if rp.authority == "" { |
| rp.authority = rp.header.Get("Host") |
| } |
| |
| rw, req, err := sc.newWriterAndRequestNoBody(st, rp) |
| if err != nil { |
| return nil, nil, err |
| } |
| if bodyOpen { |
| st.reqBuf = http2getRequestBodyBuf() |
| req.Body.(*http2requestBody).pipe = &http2pipe{ |
| b: &http2fixedBuffer{buf: st.reqBuf}, |
| } |
| |
| if vv, ok := rp.header["Content-Length"]; ok { |
| req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64) |
| } else { |
| req.ContentLength = -1 |
| } |
| } |
| return rw, req, nil |
| } |
| |
| type http2requestParam struct { |
| method string |
| scheme, authority, path string |
| header Header |
| } |
| |
| func (sc *http2serverConn) newWriterAndRequestNoBody(st *http2stream, rp http2requestParam) (*http2responseWriter, *Request, error) { |
| sc.serveG.check() |
| |
| var tlsState *tls.ConnectionState // nil if not scheme https |
| if rp.scheme == "https" { |
| tlsState = sc.tlsState |
| } |
| |
| needsContinue := rp.header.Get("Expect") == "100-continue" |
| if needsContinue { |
| rp.header.Del("Expect") |
| } |
| |
| if cookies := rp.header["Cookie"]; len(cookies) > 1 { |
| rp.header.Set("Cookie", strings.Join(cookies, "; ")) |
| } |
| |
| // Setup Trailers |
| var trailer Header |
| for _, v := range rp.header["Trailer"] { |
| for _, key := range strings.Split(v, ",") { |
| key = CanonicalHeaderKey(strings.TrimSpace(key)) |
| switch key { |
| case "Transfer-Encoding", "Trailer", "Content-Length": |
| |
| default: |
| if trailer == nil { |
| trailer = make(Header) |
| } |
| trailer[key] = nil |
| } |
| } |
| } |
| delete(rp.header, "Trailer") |
| |
| var url_ *url.URL |
| var requestURI string |
| if rp.method == "CONNECT" { |
| url_ = &url.URL{Host: rp.authority} |
| requestURI = rp.authority |
| } else { |
| var err error |
| url_, err = url.ParseRequestURI(rp.path) |
| if err != nil { |
| return nil, nil, http2streamError(st.id, http2ErrCodeProtocol) |
| } |
| requestURI = rp.path |
| } |
| |
| body := &http2requestBody{ |
| conn: sc, |
| stream: st, |
| needsContinue: needsContinue, |
| } |
| req := &Request{ |
| Method: rp.method, |
| URL: url_, |
| RemoteAddr: sc.remoteAddrStr, |
| Header: rp.header, |
| RequestURI: requestURI, |
| Proto: "HTTP/2.0", |
| ProtoMajor: 2, |
| ProtoMinor: 0, |
| TLS: tlsState, |
| Host: rp.authority, |
| Body: body, |
| Trailer: trailer, |
| } |
| req = http2requestWithContext(req, st.ctx) |
| |
| rws := http2responseWriterStatePool.Get().(*http2responseWriterState) |
| bwSave := rws.bw |
| *rws = http2responseWriterState{} |
| rws.conn = sc |
| rws.bw = bwSave |
| rws.bw.Reset(http2chunkWriter{rws}) |
| rws.stream = st |
| rws.req = req |
| rws.body = body |
| |
| rw := &http2responseWriter{rws: rws} |
| return rw, req, nil |
| } |
| |
| var http2reqBodyCache = make(chan []byte, 8) |
| |
| func http2getRequestBodyBuf() []byte { |
| select { |
| case b := <-http2reqBodyCache: |
| return b |
| default: |
| return make([]byte, http2initialWindowSize) |
| } |
| } |
| |
| func http2putRequestBodyBuf(b []byte) { |
| select { |
| case http2reqBodyCache <- b: |
| default: |
| } |
| } |
| |
| // Run on its own goroutine. |
| func (sc *http2serverConn) runHandler(rw *http2responseWriter, req *Request, handler func(ResponseWriter, *Request)) { |
| didPanic := true |
| defer func() { |
| rw.rws.stream.cancelCtx() |
| if didPanic { |
| e := recover() |
| sc.writeFrameFromHandler(http2FrameWriteRequest{ |
| write: http2handlerPanicRST{rw.rws.stream.id}, |
| stream: rw.rws.stream, |
| }) |
| |
| if http2shouldLogPanic(e) { |
| const size = 64 << 10 |
| buf := make([]byte, size) |
| buf = buf[:runtime.Stack(buf, false)] |
| sc.logf("http2: panic serving %v: %v\n%s", sc.conn.RemoteAddr(), e, buf) |
| } |
| return |
| } |
| rw.handlerDone() |
| }() |
| handler(rw, req) |
| didPanic = false |
| } |
| |
| func http2handleHeaderListTooLong(w ResponseWriter, r *Request) { |
| // 10.5.1 Limits on Header Block Size: |
| // .. "A server that receives a larger header block than it is |
| // willing to handle can send an HTTP 431 (Request Header Fields Too |
| // Large) status code" |
| const statusRequestHeaderFieldsTooLarge = 431 // only in Go 1.6+ |
| w.WriteHeader(statusRequestHeaderFieldsTooLarge) |
| io.WriteString(w, "<h1>HTTP Error 431</h1><p>Request Header Field(s) Too Large</p>") |
| } |
| |
| // called from handler goroutines. |
| // h may be nil. |
| func (sc *http2serverConn) writeHeaders(st *http2stream, headerData *http2writeResHeaders) error { |
| sc.serveG.checkNotOn() |
| var errc chan error |
| if headerData.h != nil { |
| |
| errc = http2errChanPool.Get().(chan error) |
| } |
| if err := sc.writeFrameFromHandler(http2FrameWriteRequest{ |
| write: headerData, |
| stream: st, |
| done: errc, |
| }); err != nil { |
| return err |
| } |
| if errc != nil { |
| select { |
| case err := <-errc: |
| http2errChanPool.Put(errc) |
| return err |
| case <-sc.doneServing: |
| return http2errClientDisconnected |
| case <-st.cw: |
| return http2errStreamClosed |
| } |
| } |
| return nil |
| } |
| |
| // called from handler goroutines. |
| func (sc *http2serverConn) write100ContinueHeaders(st *http2stream) { |
| sc.writeFrameFromHandler(http2FrameWriteRequest{ |
| write: http2write100ContinueHeadersFrame{st.id}, |
| stream: st, |
| }) |
| } |
| |
| // A bodyReadMsg tells the server loop that the http.Handler read n |
| // bytes of the DATA from the client on the given stream. |
| type http2bodyReadMsg struct { |
| st *http2stream |
| n int |
| } |
| |
| // called from handler goroutines. |
| // Notes that the handler for the given stream ID read n bytes of its body |
| // and schedules flow control tokens to be sent. |
| func (sc *http2serverConn) noteBodyReadFromHandler(st *http2stream, n int, err error) { |
| sc.serveG.checkNotOn() |
| if n > 0 { |
| select { |
| case sc.bodyReadCh <- http2bodyReadMsg{st, n}: |
| case <-sc.doneServing: |
| } |
| } |
| if err == io.EOF { |
| if buf := st.reqBuf; buf != nil { |
| st.reqBuf = nil |
| http2putRequestBodyBuf(buf) |
| } |
| } |
| } |
| |
| func (sc *http2serverConn) noteBodyRead(st *http2stream, n int) { |
| sc.serveG.check() |
| sc.sendWindowUpdate(nil, n) |
| if st.state != http2stateHalfClosedRemote && st.state != http2stateClosed { |
| |
| sc.sendWindowUpdate(st, n) |
| } |
| } |
| |
| // st may be nil for conn-level |
| func (sc *http2serverConn) sendWindowUpdate(st *http2stream, n int) { |
| sc.serveG.check() |
| // "The legal range for the increment to the flow control |
| // window is 1 to 2^31-1 (2,147,483,647) octets." |
| // A Go Read call on 64-bit machines could in theory read |
| // a larger Read than this. Very unlikely, but we handle it here |
| // rather than elsewhere for now. |
| const maxUint31 = 1<<31 - 1 |
| for n >= maxUint31 { |
| sc.sendWindowUpdate32(st, maxUint31) |
| n -= maxUint31 |
| } |
| sc.sendWindowUpdate32(st, int32(n)) |
| } |
| |
| // st may be nil for conn-level |
| func (sc *http2serverConn) sendWindowUpdate32(st *http2stream, n int32) { |
| sc.serveG.check() |
| if n == 0 { |
| return |
| } |
| if n < 0 { |
| panic("negative update") |
| } |
| var streamID uint32 |
| if st != nil { |
| streamID = st.id |
| } |
| sc.writeFrame(http2FrameWriteRequest{ |
| write: http2writeWindowUpdate{streamID: streamID, n: uint32(n)}, |
| stream: st, |
| }) |
| var ok bool |
| if st == nil { |
| ok = sc.inflow.add(n) |
| } else { |
| ok = st.inflow.add(n) |
| } |
| if !ok { |
| panic("internal error; sent too many window updates without decrements?") |
| } |
| } |
| |
| // requestBody is the Handler's Request.Body type. |
| // Read and Close may be called concurrently. |
| type http2requestBody struct { |
| stream *http2stream |
| conn *http2serverConn |
| closed bool // for use by Close only |
| sawEOF bool // for use by Read only |
| pipe *http2pipe // non-nil if we have a HTTP entity message body |
| needsContinue bool // need to send a 100-continue |
| } |
| |
| func (b *http2requestBody) Close() error { |
| if b.pipe != nil && !b.closed { |
| b.pipe.BreakWithError(http2errClosedBody) |
| } |
| b.closed = true |
| return nil |
| } |
| |
| func (b *http2requestBody) Read(p []byte) (n int, err error) { |
| if b.needsContinue { |
| b.needsContinue = false |
| b.conn.write100ContinueHeaders(b.stream) |
| } |
| if b.pipe == nil || b.sawEOF { |
| return 0, io.EOF |
| } |
| n, err = b.pipe.Read(p) |
| if err == io.EOF { |
| b.sawEOF = true |
| } |
| if b.conn == nil && http2inTests { |
| return |
| } |
| b.conn.noteBodyReadFromHandler(b.stream, n, err) |
| return |
| } |
| |
| // responseWriter is the http.ResponseWriter implementation. It's |
| // intentionally small (1 pointer wide) to minimize garbage. The |
| // responseWriterState pointer inside is zeroed at the end of a |
| // request (in handlerDone) and calls on the responseWriter thereafter |
| // simply crash (caller's mistake), but the much larger responseWriterState |
| // and buffers are reused between multiple requests. |
| type http2responseWriter struct { |
| rws *http2responseWriterState |
| } |
| |
| // Optional http.ResponseWriter interfaces implemented. |
| var ( |
| _ CloseNotifier = (*http2responseWriter)(nil) |
| _ Flusher = (*http2responseWriter)(nil) |
| _ http2stringWriter = (*http2responseWriter)(nil) |
| ) |
| |
| type http2responseWriterState struct { |
| // immutable within a request: |
| stream *http2stream |
| req *Request |
| body *http2requestBody // to close at end of request, if DATA frames didn't |
| conn *http2serverConn |
| |
| // TODO: adjust buffer writing sizes based on server config, frame size updates from peer, etc |
| bw *bufio.Writer // writing to a chunkWriter{this *responseWriterState} |
| |
| // mutated by http.Handler goroutine: |
| handlerHeader Header // nil until called |
| snapHeader Header // snapshot of handlerHeader at WriteHeader time |
| trailers []string // set in writeChunk |
| status int // status code passed to WriteHeader |
| wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet. |
| sentHeader bool // have we sent the header frame? |
| handlerDone bool // handler has finished |
| |
| sentContentLen int64 // non-zero if handler set a Content-Length header |
| wroteBytes int64 |
| |
| closeNotifierMu sync.Mutex // guards closeNotifierCh |
| closeNotifierCh chan bool // nil until first used |
| } |
| |
| type http2chunkWriter struct{ rws *http2responseWriterState } |
| |
| func (cw http2chunkWriter) Write(p []byte) (n int, err error) { return cw.rws.writeChunk(p) } |
| |
| func (rws *http2responseWriterState) hasTrailers() bool { return len(rws.trailers) != 0 } |
| |
| // declareTrailer is called for each Trailer header when the |
| // response header is written. It notes that a header will need to be |
| // written in the trailers at the end of the response. |
| func (rws *http2responseWriterState) declareTrailer(k string) { |
| k = CanonicalHeaderKey(k) |
| if !http2ValidTrailerHeader(k) { |
| |
| rws.conn.logf("ignoring invalid trailer %q", k) |
| return |
| } |
| if !http2strSliceContains(rws.trailers, k) { |
| rws.trailers = append(rws.trailers, k) |
| } |
| } |
| |
| // writeChunk writes chunks from the bufio.Writer. But because |
| // bufio.Writer may bypass its chunking, sometimes p may be |
| // arbitrarily large. |
| // |
| // writeChunk is also responsible (on the first chunk) for sending the |
| // HEADER response. |
| func (rws *http2responseWriterState) writeChunk(p []byte) (n int, err error) { |
| if !rws.wroteHeader { |
| rws.writeHeader(200) |
| } |
| |
| isHeadResp := rws.req.Method == "HEAD" |
| if !rws.sentHeader { |
| rws.sentHeader = true |
| var ctype, clen string |
| if clen = rws.snapHeader.Get("Content-Length"); clen != "" { |
| rws.snapHeader.Del("Content-Length") |
| clen64, err := strconv.ParseInt(clen, 10, 64) |
| if err == nil && clen64 >= 0 { |
| rws.sentContentLen = clen64 |
| } else { |
| clen = "" |
| } |
| } |
| if clen == "" && rws.handlerDone && http2bodyAllowedForStatus(rws.status) && (len(p) > 0 || !isHeadResp) { |
| clen = strconv.Itoa(len(p)) |
| } |
| _, hasContentType := rws.snapHeader["Content-Type"] |
| if !hasContentType && http2bodyAllowedForStatus(rws.status) { |
| ctype = DetectContentType(p) |
| } |
| var date string |
| if _, ok := rws.snapHeader["Date"]; !ok { |
| |
| date = time.Now().UTC().Format(TimeFormat) |
| } |
| |
| for _, v := range rws.snapHeader["Trailer"] { |
| http2foreachHeaderElement(v, rws.declareTrailer) |
| } |
| |
| endStream := (rws.handlerDone && !rws.hasTrailers() && len(p) == 0) || isHeadResp |
| err = rws.conn.writeHeaders(rws.stream, &http2writeResHeaders{ |
| streamID: rws.stream.id, |
| httpResCode: rws.status, |
| h: rws.snapHeader, |
| endStream: endStream, |
| contentType: ctype, |
| contentLength: clen, |
| date: date, |
| }) |
| if err != nil { |
| return 0, err |
| } |
| if endStream { |
| return 0, nil |
| } |
| } |
| if isHeadResp { |
| return len(p), nil |
| } |
| if len(p) == 0 && !rws.handlerDone { |
| return 0, nil |
| } |
| |
| if rws.handlerDone { |
| rws.promoteUndeclaredTrailers() |
| } |
| |
| endStream := rws.handlerDone && !rws.hasTrailers() |
| if len(p) > 0 || endStream { |
| |
| if err := rws.conn.writeDataFromHandler(rws.stream, p, endStream); err != nil { |
| return 0, err |
| } |
| } |
| |
| if rws.handlerDone && rws.hasTrailers() { |
| err = rws.conn.writeHeaders(rws.stream, &http2writeResHeaders{ |
| streamID: rws.stream.id, |
| h: rws.handlerHeader, |
| trailers: rws.trailers, |
| endStream: true, |
| }) |
| return len(p), err |
| } |
| return len(p), nil |
| } |
| |
| // TrailerPrefix is a magic prefix for ResponseWriter.Header map keys |
| // that, if present, signals that the map entry is actually for |
| // the response trailers, and not the response headers. The prefix |
| // is stripped after the ServeHTTP call finishes and the values are |
| // sent in the trailers. |
| // |
| // This mechanism is intended only for trailers that are not known |
| // prior to the headers being written. If the set of trailers is fixed |
| // or known before the header is written, the normal Go trailers mechanism |
| // is preferred: |
| // https://golang.org/pkg/net/http/#ResponseWriter |
| // https://golang.org/pkg/net/http/#example_ResponseWriter_trailers |
| const http2TrailerPrefix = "Trailer:" |
| |
| // promoteUndeclaredTrailers permits http.Handlers to set trailers |
| // after the header has already been flushed. Because the Go |
| // ResponseWriter interface has no way to set Trailers (only the |
| // Header), and because we didn't want to expand the ResponseWriter |
| // interface, and because nobody used trailers, and because RFC 2616 |
| // says you SHOULD (but not must) predeclare any trailers in the |
| // header, the official ResponseWriter rules said trailers in Go must |
| // be predeclared, and then we reuse the same ResponseWriter.Header() |
| // map to mean both Headers and Trailers. When it's time to write the |
| // Trailers, we pick out the fields of Headers that were declared as |
| // trailers. That worked for a while, until we found the first major |
| // user of Trailers in the wild: gRPC (using them only over http2), |
| // and gRPC libraries permit setting trailers mid-stream without |
| // predeclarnig them. So: change of plans. We still permit the old |
| // way, but we also permit this hack: if a Header() key begins with |
| // "Trailer:", the suffix of that key is a Trailer. Because ':' is an |
| // invalid token byte anyway, there is no ambiguity. (And it's already |
| // filtered out) It's mildly hacky, but not terrible. |
| // |
| // This method runs after the Handler is done and promotes any Header |
| // fields to be trailers. |
| func (rws *http2responseWriterState) promoteUndeclaredTrailers() { |
| for k, vv := range rws.handlerHeader { |
| if !strings.HasPrefix(k, http2TrailerPrefix) { |
| continue |
| } |
| trailerKey := strings.TrimPrefix(k, http2TrailerPrefix) |
| rws.declareTrailer(trailerKey) |
| rws.handlerHeader[CanonicalHeaderKey(trailerKey)] = vv |
| } |
| |
| if len(rws.trailers) > 1 { |
| sorter := http2sorterPool.Get().(*http2sorter) |
| sorter.SortStrings(rws.trailers) |
| http2sorterPool.Put(sorter) |
| } |
| } |
| |
| func (w *http2responseWriter) Flush() { |
| rws := w.rws |
| if rws == nil { |
| panic("Header called after Handler finished") |
| } |
| if rws.bw.Buffered() > 0 { |
| if err := rws.bw.Flush(); err != nil { |
| |
| return |
| } |
| } else { |
| |
| rws.writeChunk(nil) |
| } |
| } |
| |
| func (w *http2responseWriter) CloseNotify() <-chan bool { |
| rws := w.rws |
| if rws == nil { |
| panic("CloseNotify called after Handler finished") |
| } |
| rws.closeNotifierMu.Lock() |
| ch := rws.closeNotifierCh |
| if ch == nil { |
| ch = make(chan bool, 1) |
| rws.closeNotifierCh = ch |
| cw := rws.stream.cw |
| go func() { |
| cw.Wait() |
| ch <- true |
| }() |
| } |
| rws.closeNotifierMu.Unlock() |
| return ch |
| } |
| |
| func (w *http2responseWriter) Header() Header { |
| rws := w.rws |
| if rws == nil { |
| panic("Header called after Handler finished") |
| } |
| if rws.handlerHeader == nil { |
| rws.handlerHeader = make(Header) |
| } |
| return rws.handlerHeader |
| } |
| |
| func (w *http2responseWriter) WriteHeader(code int) { |
| rws := w.rws |
| if rws == nil { |
| panic("WriteHeader called after Handler finished") |
| } |
| rws.writeHeader(code) |
| } |
| |
| func (rws *http2responseWriterState) writeHeader(code int) { |
| if !rws.wroteHeader { |
| rws.wroteHeader = true |
| rws.status = code |
| if len(rws.handlerHeader) > 0 { |
| rws.snapHeader = http2cloneHeader(rws.handlerHeader) |
| } |
| } |
| } |
| |
| func http2cloneHeader(h Header) Header { |
| h2 := make(Header, len(h)) |
| for k, vv := range h { |
| vv2 := make([]string, len(vv)) |
| copy(vv2, vv) |
| h2[k] = vv2 |
| } |
| return h2 |
| } |
| |
| // The Life Of A Write is like this: |
| // |
| // * Handler calls w.Write or w.WriteString -> |
| // * -> rws.bw (*bufio.Writer) -> |
| // * (Handler migth call Flush) |
| // * -> chunkWriter{rws} |
| // * -> responseWriterState.writeChunk(p []byte) |
| // * -> responseWriterState.writeChunk (most of the magic; see comment there) |
| func (w *http2responseWriter) Write(p []byte) (n int, err error) { |
| return w.write(len(p), p, "") |
| } |
| |
| func (w *http2responseWriter) WriteString(s string) (n int, err error) { |
| return w.write(len(s), nil, s) |
| } |
| |
| // either dataB or dataS is non-zero. |
| func (w *http2responseWriter) write(lenData int, dataB []byte, dataS string) (n int, err error) { |
| rws := w.rws |
| if rws == nil { |
| panic("Write called after Handler finished") |
| } |
| if !rws.wroteHeader { |
| w.WriteHeader(200) |
| } |
| if !http2bodyAllowedForStatus(rws.status) { |
| return 0, ErrBodyNotAllowed |
| } |
| rws.wroteBytes += int64(len(dataB)) + int64(len(dataS)) |
| if rws.sentContentLen != 0 && rws.wroteBytes > rws.sentContentLen { |
| |
| return 0, errors.New("http2: handler wrote more than declared Content-Length") |
| } |
| |
| if dataB != nil { |
| return rws.bw.Write(dataB) |
| } else { |
| return rws.bw.WriteString(dataS) |
| } |
| } |
| |
| func (w *http2responseWriter) handlerDone() { |
| rws := w.rws |
| rws.handlerDone = true |
| w.Flush() |
| w.rws = nil |
| http2responseWriterStatePool.Put(rws) |
| } |
| |
| // Push errors. |
| var ( |
| http2ErrRecursivePush = errors.New("http2: recursive push not allowed") |
| http2ErrPushLimitReached = errors.New("http2: push would exceed peer's SETTINGS_MAX_CONCURRENT_STREAMS") |
| ) |
| |
| // pushOptions is the internal version of http.PushOptions, which we |
| // cannot include here because it's only defined in Go 1.8 and later. |
| type http2pushOptions struct { |
| Method string |
| Header Header |
| } |
| |
| func (w *http2responseWriter) push(target string, opts http2pushOptions) error { |
| st := w.rws.stream |
| sc := st.sc |
| sc.serveG.checkNotOn() |
| |
| if st.isPushed() { |
| return http2ErrRecursivePush |
| } |
| |
| if opts.Method == "" { |
| opts.Method = "GET" |
| } |
| if opts.Header == nil { |
| opts.Header = Header{} |
| } |
| wantScheme := "http" |
| if w.rws.req.TLS != nil { |
| wantScheme = "https" |
| } |
| |
| u, err := url.Parse(target) |
| if err != nil { |
| return err |
| } |
| if u.Scheme == "" { |
| if !strings.HasPrefix(target, "/") { |
| return fmt.Errorf("target must be an absolute URL or an absolute path: %q", target) |
| } |
| u.Scheme = wantScheme |
| u.Host = w.rws.req.Host |
| } else { |
| if u.Scheme != wantScheme { |
| return fmt.Errorf("cannot push URL with scheme %q from request with scheme %q", u.Scheme, wantScheme) |
| } |
| if u.Host == "" { |
| return errors.New("URL must have a host") |
| } |
| } |
| for k := range opts.Header { |
| if strings.HasPrefix(k, ":") { |
| return fmt.Errorf("promised request headers cannot include pseudo header %q", k) |
| } |
| |
| switch strings.ToLower(k) { |
| case "content-length", "content-encoding", "trailer", "te", "expect", "host": |
| return fmt.Errorf("promised request headers cannot include %q", k) |
| } |
| } |
| if err := http2checkValidHTTP2RequestHeaders(opts.Header); err != nil { |
| return err |
| } |
| |
| if opts.Method != "GET" && opts.Method != "HEAD" { |
| return fmt.Errorf("method %q must be GET or HEAD", opts.Method) |
| } |
| |
| msg := http2startPushRequest{ |
| parent: st, |
| method: opts.Method, |
| url: u, |
| header: http2cloneHeader(opts.Header), |
| done: http2errChanPool.Get().(chan error), |
| } |
| |
| select { |
| case <-sc.doneServing: |
| return http2errClientDisconnected |
| case <-st.cw: |
| return http2errStreamClosed |
| case sc.wantStartPushCh <- msg: |
| } |
| |
| select { |
| case <-sc.doneServing: |
| return http2errClientDisconnected |
| case <-st.cw: |
| return http2errStreamClosed |
| case err := <-msg.done: |
| http2errChanPool.Put(msg.done) |
| return err |
| } |
| } |
| |
| type http2startPushRequest struct { |
| parent *http2stream |
| method string |
| url *url.URL |
| header Header |
| done chan error |
| } |
| |
| func (sc *http2serverConn) startPush(msg http2startPushRequest) { |
| sc.serveG.check() |
| |
| if msg.parent.state != http2stateOpen && msg.parent.state != http2stateHalfClosedRemote { |
| |
| msg.done <- http2errStreamClosed |
| return |
| } |
| |
| if !sc.pushEnabled { |
| msg.done <- ErrNotSupported |
| return |
| } |
| |
| allocatePromisedID := func() (uint32, error) { |
| sc.serveG.check() |
| |
| if !sc.pushEnabled { |
| return 0, ErrNotSupported |
| } |
| |
| if sc.curPushedStreams+1 > sc.clientMaxStreams { |
| return 0, http2ErrPushLimitReached |
| } |
| |
| if sc.maxPushPromiseID+2 >= 1<<31 { |
| sc.startGracefulShutdown() |
| return 0, http2ErrPushLimitReached |
| } |
| sc.maxPushPromiseID += 2 |
| promisedID := sc.maxPushPromiseID |
| |
| promised := sc.newStream(promisedID, msg.parent.id, http2stateHalfClosedRemote) |
| rw, req, err := sc.newWriterAndRequestNoBody(promised, http2requestParam{ |
| method: msg.method, |
| scheme: msg.url.Scheme, |
| authority: msg.url.Host, |
| path: msg.url.RequestURI(), |
| header: http2cloneHeader(msg.header), |
| }) |
| if err != nil { |
| |
| panic(fmt.Sprintf("newWriterAndRequestNoBody(%+v): %v", msg.url, err)) |
| } |
| |
| go sc.runHandler(rw, req, sc.handler.ServeHTTP) |
| return promisedID, nil |
| } |
| |
| sc.writeFrame(http2FrameWriteRequest{ |
| write: &http2writePushPromise{ |
| streamID: msg.parent.id, |
| method: msg.method, |
| url: msg.url, |
| h: msg.header, |
| allocatePromisedID: allocatePromisedID, |
| }, |
| stream: msg.parent, |
| done: msg.done, |
| }) |
| } |
| |
| // foreachHeaderElement splits v according to the "#rule" construction |
| // in RFC 2616 section 2.1 and calls fn for each non-empty element. |
| func http2foreachHeaderElement(v string, fn func(string)) { |
| v = textproto.TrimString(v) |
| if v == "" { |
| return |
| } |
| if !strings.Contains(v, ",") { |
| fn(v) |
| return |
| } |
| for _, f := range strings.Split(v, ",") { |
| if f = textproto.TrimString(f); f != "" { |
| fn(f) |
| } |
| } |
| } |
| |
| // From http://httpwg.org/specs/rfc7540.html#rfc.section.8.1.2.2 |
| var http2connHeaders = []string{ |
| "Connection", |
| "Keep-Alive", |
| "Proxy-Connection", |
| "Transfer-Encoding", |
| "Upgrade", |
| } |
| |
| // checkValidHTTP2RequestHeaders checks whether h is a valid HTTP/2 request, |
| // per RFC 7540 Section 8.1.2.2. |
| // The returned error is reported to users. |
| func http2checkValidHTTP2RequestHeaders(h Header) error { |
| for _, k := range http2connHeaders { |
| if _, ok := h[k]; ok { |
| return fmt.Errorf("request header %q is not valid in HTTP/2", k) |
| } |
| } |
| te := h["Te"] |
| if len(te) > 0 && (len(te) > 1 || (te[0] != "trailers" && te[0] != "")) { |
| return errors.New(`request header "TE" may only be "trailers" in HTTP/2`) |
| } |
| return nil |
| } |
| |
| func http2new400Handler(err error) HandlerFunc { |
| return func(w ResponseWriter, r *Request) { |
| Error(w, err.Error(), StatusBadRequest) |
| } |
| } |
| |
| // ValidTrailerHeader reports whether name is a valid header field name to appear |
| // in trailers. |
| // See: http://tools.ietf.org/html/rfc7230#section-4.1.2 |
| func http2ValidTrailerHeader(name string) bool { |
| name = CanonicalHeaderKey(name) |
| if strings.HasPrefix(name, "If-") || http2badTrailer[name] { |
| return false |
| } |
| return true |
| } |
| |
| var http2badTrailer = map[string]bool{ |
| "Authorization": true, |
| "Cache-Control": true, |
| "Connection": true, |
| "Content-Encoding": true, |
| "Content-Length": true, |
| "Content-Range": true, |
| "Content-Type": true, |
| "Expect": true, |
| "Host": true, |
| "Keep-Alive": true, |
| "Max-Forwards": true, |
| "Pragma": true, |
| "Proxy-Authenticate": true, |
| "Proxy-Authorization": true, |
| "Proxy-Connection": true, |
| "Range": true, |
| "Realm": true, |
| "Te": true, |
| "Trailer": true, |
| "Transfer-Encoding": true, |
| "Www-Authenticate": true, |
| } |
| |
| // h1ServerShutdownChan returns a channel that will be closed when the |
| // provided *http.Server wants to shut down. |
| // |
| // This is a somewhat hacky way to get at http1 innards. It works |
| // when the http2 code is bundled into the net/http package in the |
| // standard library. The alternatives ended up making the cmd/go tool |
| // depend on http Servers. This is the lightest option for now. |
| // This is tested via the TestServeShutdown* tests in net/http. |
| func http2h1ServerShutdownChan(hs *Server) <-chan struct{} { |
| if fn := http2testh1ServerShutdownChan; fn != nil { |
| return fn(hs) |
| } |
| var x interface{} = hs |
| type I interface { |
| getDoneChan() <-chan struct{} |
| } |
| if hs, ok := x.(I); ok { |
| return hs.getDoneChan() |
| } |
| return nil |
| } |
| |
| // optional test hook for h1ServerShutdownChan. |
| var http2testh1ServerShutdownChan func(hs *Server) <-chan struct{} |
| |
| // h1ServerKeepAlivesDisabled reports whether hs has its keep-alives |
| // disabled. See comments on h1ServerShutdownChan above for why |
| // the code is written this way. |
| func http2h1ServerKeepAlivesDisabled(hs *Server) bool { |
| var x interface{} = hs |
| type I interface { |
| doKeepAlives() bool |
| } |
| if hs, ok := x.(I); ok { |
| return !hs.doKeepAlives() |
| } |
| return false |
| } |
| |
| const ( |
| // transportDefaultConnFlow is how many connection-level flow control |
| // tokens we give the server at start-up, past the default 64k. |
| http2transportDefaultConnFlow = 1 << 30 |
| |
| // transportDefaultStreamFlow is how many stream-level flow |
| // control tokens we announce to the peer, and how many bytes |
| // we buffer per stream. |
| http2transportDefaultStreamFlow = 4 << 20 |
| |
| // transportDefaultStreamMinRefresh is the minimum number of bytes we'll send |
| // a stream-level WINDOW_UPDATE for at a time. |
| http2transportDefaultStreamMinRefresh = 4 << 10 |
| |
| http2defaultUserAgent = "Go-http-client/2.0" |
| ) |
| |
| // Transport is an HTTP/2 Transport. |
| // |
| // A Transport internally caches connections to servers. It is safe |
| // for concurrent use by multiple goroutines. |
| type http2Transport struct { |
| // DialTLS specifies an optional dial function for creating |
| // TLS connections for requests. |
| // |
| // If DialTLS is nil, tls.Dial is used. |
| // |
| // If the returned net.Conn has a ConnectionState method like tls.Conn, |
| // it will be used to set http.Response.TLS. |
| DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error) |
| |
| // TLSClientConfig specifies the TLS configuration to use with |
| // tls.Client. If nil, the default configuration is used. |
| TLSClientConfig *tls.Config |
| |
| // ConnPool optionally specifies an alternate connection pool to use. |
| // If nil, the default is used. |
| ConnPool http2ClientConnPool |
| |
| // DisableCompression, if true, prevents the Transport from |
| // requesting compression with an "Accept-Encoding: gzip" |
| // request header when the Request contains no existing |
| // Accept-Encoding value. If the Transport requests gzip on |
| // its own and gets a gzipped response, it's transparently |
| // decoded in the Response.Body. However, if the user |
| // explicitly requested gzip it is not automatically |
| // uncompressed. |
| DisableCompression bool |
| |
| // AllowHTTP, if true, permits HTTP/2 requests using the insecure, |
| // plain-text "http" scheme. Note that this does not enable h2c support. |
| AllowHTTP bool |
| |
| // MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to |
| // send in the initial settings frame. It is how many bytes |
| // of response headers are allow. Unlike the http2 spec, zero here |
| // means to use a default limit (currently 10MB). If you actually |
| // want to advertise an ulimited value to the peer, Transport |
| // interprets the highest possible value here (0xffffffff or 1<<32-1) |
| // to mean no limit. |
| MaxHeaderListSize uint32 |
| |
| // t1, if non-nil, is the standard library Transport using |
| // this transport. Its settings are used (but not its |
| // RoundTrip method, etc). |
| t1 *Transport |
| |
| connPoolOnce sync.Once |
| connPoolOrDef http2ClientConnPool // non-nil version of ConnPool |
| } |
| |
| func (t *http2Transport) maxHeaderListSize() uint32 { |
| if t.MaxHeaderListSize == 0 { |
| return 10 << 20 |
| } |
| if t.MaxHeaderListSize == 0xffffffff { |
| return 0 |
| } |
| return t.MaxHeaderListSize |
| } |
| |
| func (t *http2Transport) disableCompression() bool { |
| return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression) |
| } |
| |
| var http2errTransportVersion = errors.New("http2: ConfigureTransport is only supported starting at Go 1.6") |
| |
| // ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2. |
| // It requires Go 1.6 or later and returns an error if the net/http package is too old |
| // or if t1 has already been HTTP/2-enabled. |
| func http2ConfigureTransport(t1 *Transport) error { |
| _, err := http2configureTransport(t1) |
| return err |
| } |
| |
| func (t *http2Transport) connPool() http2ClientConnPool { |
| t.connPoolOnce.Do(t.initConnPool) |
| return t.connPoolOrDef |
| } |
| |
| func (t *http2Transport) initConnPool() { |
| if t.ConnPool != nil { |
| t.connPoolOrDef = t.ConnPool |
| } else { |
| t.connPoolOrDef = &http2clientConnPool{t: t} |
| } |
| } |
| |
| // ClientConn is the state of a single HTTP/2 client connection to an |
| // HTTP/2 server. |
| type http2ClientConn struct { |
| t *http2Transport |
| tconn net.Conn // usually *tls.Conn, except specialized impls |
| tlsState *tls.ConnectionState // nil only for specialized impls |
| singleUse bool // whether being used for a single http.Request |
| |
| // readLoop goroutine fields: |
| readerDone chan struct{} // closed on error |
| readerErr error // set before readerDone is closed |
| |
| idleTimeout time.Duration // or 0 for never |
| idleTimer *time.Timer |
| |
| mu sync.Mutex // guards following |
| cond *sync.Cond // hold mu; broadcast on flow/closed changes |
| flow http2flow // our conn-level flow control quota (cs.flow is per stream) |
| inflow http2flow // peer's conn-level flow control |
| closed bool |
| wantSettingsAck bool // we sent a SETTINGS frame and haven't heard back |
| goAway *http2GoAwayFrame // if non-nil, the GoAwayFrame we received |
| goAwayDebug string // goAway frame's debug data, retained as a string |
| streams map[uint32]*http2clientStream // client-initiated |
| nextStreamID uint32 |
| pings map[[8]byte]chan struct{} // in flight ping data to notification channel |
| bw *bufio.Writer |
| br *bufio.Reader |
| fr *http2Framer |
| lastActive time.Time |
| // Settings from peer: (also guarded by mu) |
| maxFrameSize uint32 |
| maxConcurrentStreams uint32 |
| initialWindowSize uint32 |
| |
| hbuf bytes.Buffer // HPACK encoder writes into this |
| henc *hpack.Encoder |
| freeBuf [][]byte |
| |
| wmu sync.Mutex // held while writing; acquire AFTER mu if holding both |
| werr error // first write error that has occurred |
| } |
| |
| // clientStream is the state for a single HTTP/2 stream. One of these |
| // is created for each Transport.RoundTrip call. |
| type http2clientStream struct { |
| cc *http2ClientConn |
| req *Request |
| trace *http2clientTrace // or nil |
| ID uint32 |
| resc chan http2resAndError |
| bufPipe http2pipe // buffered pipe with the flow-controlled response payload |
| startedWrite bool // started request body write; guarded by cc.mu |
| requestedGzip bool |
| on100 func() // optional code to run if get a 100 continue response |
| |
| flow http2flow // guarded by cc.mu |
| inflow http2flow // guarded by cc.mu |
| bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read |
| readErr error // sticky read error; owned by transportResponseBody.Read |
| stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu |
| didReset bool // whether we sent a RST_STREAM to the server; guarded by cc.mu |
| |
| peerReset chan struct{} // closed on peer reset |
| resetErr error // populated before peerReset is closed |
| |
| done chan struct{} // closed when stream remove from cc.streams map; close calls guarded by cc.mu |
| |
| // owned by clientConnReadLoop: |
| firstByte bool // got the first response byte |
| pastHeaders bool // got first MetaHeadersFrame (actual headers) |
| pastTrailers bool // got optional second MetaHeadersFrame (trailers) |
| |
| trailer Header // accumulated trailers |
| resTrailer *Header // client's Response.Trailer |
| } |
| |
| // awaitRequestCancel runs in its own goroutine and waits for the user |
| // to cancel a RoundTrip request, its context to expire, or for the |
| // request to be done (any way it might be removed from the cc.streams |
| // map: peer reset, successful completion, TCP connection breakage, |
| // etc) |
| func (cs *http2clientStream) awaitRequestCancel(req *Request) { |
| ctx := http2reqContext(req) |
| if req.Cancel == nil && ctx.Done() == nil { |
| return |
| } |
| select { |
| case <-req.Cancel: |
| cs.cancelStream() |
| cs.bufPipe.CloseWithError(http2errRequestCanceled) |
| case <-ctx.Done(): |
| cs.cancelStream() |
| cs.bufPipe.CloseWithError(ctx.Err()) |
| case <-cs.done: |
| } |
| } |
| |
| func (cs *http2clientStream) cancelStream() { |
| cs.cc.mu.Lock() |
| didReset := cs.didReset |
| cs.didReset = true |
| cs.cc.mu.Unlock() |
| |
| if !didReset { |
| cs.cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) |
| } |
| } |
| |
| // checkResetOrDone reports any error sent in a RST_STREAM frame by the |
| // server, or errStreamClosed if the stream is complete. |
| func (cs *http2clientStream) checkResetOrDone() error { |
| select { |
| case <-cs.peerReset: |
| return cs.resetErr |
| case <-cs.done: |
| return http2errStreamClosed |
| default: |
| return nil |
| } |
| } |
| |
| func (cs *http2clientStream) abortRequestBodyWrite(err error) { |
| if err == nil { |
| panic("nil error") |
| } |
| cc := cs.cc |
| cc.mu.Lock() |
| cs.stopReqBody = err |
| cc.cond.Broadcast() |
| cc.mu.Unlock() |
| } |
| |
| type http2stickyErrWriter struct { |
| w io.Writer |
| err *error |
| } |
| |
| func (sew http2stickyErrWriter) Write(p []byte) (n int, err error) { |
| if *sew.err != nil { |
| return 0, *sew.err |
| } |
| n, err = sew.w.Write(p) |
| *sew.err = err |
| return |
| } |
| |
| var http2ErrNoCachedConn = errors.New("http2: no cached connection was available") |
| |
| // RoundTripOpt are options for the Transport.RoundTripOpt method. |
| type http2RoundTripOpt struct { |
| // OnlyCachedConn controls whether RoundTripOpt may |
| // create a new TCP connection. If set true and |
| // no cached connection is available, RoundTripOpt |
| // will return ErrNoCachedConn. |
| OnlyCachedConn bool |
| } |
| |
| func (t *http2Transport) RoundTrip(req *Request) (*Response, error) { |
| return t.RoundTripOpt(req, http2RoundTripOpt{}) |
| } |
| |
| // authorityAddr returns a given authority (a host/IP, or host:port / ip:port) |
| // and returns a host:port. The port 443 is added if needed. |
| func http2authorityAddr(scheme string, authority string) (addr string) { |
| host, port, err := net.SplitHostPort(authority) |
| if err != nil { |
| port = "443" |
| if scheme == "http" { |
| port = "80" |
| } |
| host = authority |
| } |
| if a, err := idna.ToASCII(host); err == nil { |
| host = a |
| } |
| |
| if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") { |
| return host + ":" + port |
| } |
| return net.JoinHostPort(host, port) |
| } |
| |
| // RoundTripOpt is like RoundTrip, but takes options. |
| func (t *http2Transport) RoundTripOpt(req *Request, opt http2RoundTripOpt) (*Response, error) { |
| if !(req.URL.Scheme == "https" || (req.URL.Scheme == "http" && t.AllowHTTP)) { |
| return nil, errors.New("http2: unsupported scheme") |
| } |
| |
| addr := http2authorityAddr(req.URL.Scheme, req.URL.Host) |
| for { |
| cc, err := t.connPool().GetClientConn(req, addr) |
| if err != nil { |
| t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err) |
| return nil, err |
| } |
| http2traceGotConn(req, cc) |
| res, err := cc.RoundTrip(req) |
| if err != nil { |
| if req, err = http2shouldRetryRequest(req, err); err == nil { |
| continue |
| } |
| } |
| if err != nil { |
| t.vlogf("RoundTrip failure: %v", err) |
| return nil, err |
| } |
| return res, nil |
| } |
| } |
| |
| // CloseIdleConnections closes any connections which were previously |
| // connected from previous requests but are now sitting idle. |
| // It does not interrupt any connections currently in use. |
| func (t *http2Transport) CloseIdleConnections() { |
| if cp, ok := t.connPool().(http2clientConnPoolIdleCloser); ok { |
| cp.closeIdleConnections() |
| } |
| } |
| |
| var ( |
| http2errClientConnClosed = errors.New("http2: client conn is closed") |
| http2errClientConnUnusable = errors.New("http2: client conn not usable") |
| |
| http2errClientConnGotGoAway = errors.New("http2: Transport received Server's graceful shutdown GOAWAY") |
| http2errClientConnGotGoAwayAfterSomeReqBody = errors.New("http2: Transport received Server's graceful shutdown GOAWAY; some request body already written") |
| ) |
| |
| // shouldRetryRequest is called by RoundTrip when a request fails to get |
| // response headers. It is always called with a non-nil error. |
| // It returns either a request to retry (either the same request, or a |
| // modified clone), or an error if the request can't be replayed. |
| func http2shouldRetryRequest(req *Request, err error) (*Request, error) { |
| switch err { |
| default: |
| return nil, err |
| case http2errClientConnUnusable, http2errClientConnGotGoAway: |
| return req, nil |
| case http2errClientConnGotGoAwayAfterSomeReqBody: |
| |
| if req.Body == nil || http2reqBodyIsNoBody(req.Body) { |
| return req, nil |
| } |
| |
| getBody := http2reqGetBody(req) |
| if getBody == nil { |
| return nil, errors.New("http2: Transport: peer server initiated graceful shutdown after some of Request.Body was written; define Request.GetBody to avoid this error") |
| } |
| body, err := getBody() |
| if err != nil { |
| return nil, err |
| } |
| newReq := *req |
| newReq.Body = body |
| return &newReq, nil |
| } |
| } |
| |
| func (t *http2Transport) dialClientConn(addr string, singleUse bool) (*http2ClientConn, error) { |
| host, _, err := net.SplitHostPort(addr) |
| if err != nil { |
| return nil, err |
| } |
| tconn, err := t.dialTLS()("tcp", addr, t.newTLSConfig(host)) |
| if err != nil { |
| return nil, err |
| } |
| return t.newClientConn(tconn, singleUse) |
| } |
| |
| func (t *http2Transport) newTLSConfig(host string) *tls.Config { |
| cfg := new(tls.Config) |
| if t.TLSClientConfig != nil { |
| *cfg = *http2cloneTLSConfig(t.TLSClientConfig) |
| } |
| if !http2strSliceContains(cfg.NextProtos, http2NextProtoTLS) { |
| cfg.NextProtos = append([]string{http2NextProtoTLS}, cfg.NextProtos...) |
| } |
| if cfg.ServerName == "" { |
| cfg.ServerName = host |
| } |
| return cfg |
| } |
| |
| func (t *http2Transport) dialTLS() func(string, string, *tls.Config) (net.Conn, error) { |
| if t.DialTLS != nil { |
| return t.DialTLS |
| } |
| return t.dialTLSDefault |
| } |
| |
| func (t *http2Transport) dialTLSDefault(network, addr string, cfg *tls.Config) (net.Conn, error) { |
| cn, err := tls.Dial(network, addr, cfg) |
| if err != nil { |
| return nil, err |
| } |
| if err := cn.Handshake(); err != nil { |
| return nil, err |
| } |
| if !cfg.InsecureSkipVerify { |
| if err := cn.VerifyHostname(cfg.ServerName); err != nil { |
| return nil, err |
| } |
| } |
| state := cn.ConnectionState() |
| if p := state.NegotiatedProtocol; p != http2NextProtoTLS { |
| return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, http2NextProtoTLS) |
| } |
| if !state.NegotiatedProtocolIsMutual { |
| return nil, errors.New("http2: could not negotiate protocol mutually") |
| } |
| return cn, nil |
| } |
| |
| // disableKeepAlives reports whether connections should be closed as |
| // soon as possible after handling the first request. |
| func (t *http2Transport) disableKeepAlives() bool { |
| return t.t1 != nil && t.t1.DisableKeepAlives |
| } |
| |
| func (t *http2Transport) expectContinueTimeout() time.Duration { |
| if t.t1 == nil { |
| return 0 |
| } |
| return http2transportExpectContinueTimeout(t.t1) |
| } |
| |
| func (t *http2Transport) NewClientConn(c net.Conn) (*http2ClientConn, error) { |
| return t.newClientConn(c, false) |
| } |
| |
| func (t *http2Transport) newClientConn(c net.Conn, singleUse bool) (*http2ClientConn, error) { |
| cc := &http2ClientConn{ |
| t: t, |
| tconn: c, |
| readerDone: make(chan struct{}), |
| nextStreamID: 1, |
| maxFrameSize: 16 << 10, |
| initialWindowSize: 65535, |
| maxConcurrentStreams: 1000, |
| streams: make(map[uint32]*http2clientStream), |
| singleUse: singleUse, |
| wantSettingsAck: true, |
| pings: make(map[[8]byte]chan struct{}), |
| } |
| if d := t.idleConnTimeout(); d != 0 { |
| cc.idleTimeout = d |
| cc.idleTimer = time.AfterFunc(d, cc.onIdleTimeout) |
| } |
| if http2VerboseLogs { |
| t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr()) |
| } |
| |
| cc.cond = sync.NewCond(&cc.mu) |
| cc.flow.add(int32(http2initialWindowSize)) |
| |
| cc.bw = bufio.NewWriter(http2stickyErrWriter{c, &cc.werr}) |
| cc.br = bufio.NewReader(c) |
| cc.fr = http2NewFramer(cc.bw, cc.br) |
| cc.fr.ReadMetaHeaders = hpack.NewDecoder(http2initialHeaderTableSize, nil) |
| cc.fr.MaxHeaderListSize = t.maxHeaderListSize() |
| |
| cc.henc = hpack.NewEncoder(&cc.hbuf) |
| |
| if cs, ok := c.(http2connectionStater); ok { |
| state := cs.ConnectionState() |
| cc.tlsState = &state |
| } |
| |
| initialSettings := []http2Setting{ |
| {ID: http2SettingEnablePush, Val: 0}, |
| {ID: http2SettingInitialWindowSize, Val: http2transportDefaultStreamFlow}, |
| } |
| if max := t.maxHeaderListSize(); max != 0 { |
| initialSettings = append(initialSettings, http2Setting{ID: http2SettingMaxHeaderListSize, Val: max}) |
| } |
| |
| cc.bw.Write(http2clientPreface) |
| cc.fr.WriteSettings(initialSettings...) |
| cc.fr.WriteWindowUpdate(0, http2transportDefaultConnFlow) |
| cc.inflow.add(http2transportDefaultConnFlow + http2initialWindowSize) |
| cc.bw.Flush() |
| if cc.werr != nil { |
| return nil, cc.werr |
| } |
| |
| go cc.readLoop() |
| return cc, nil |
| } |
| |
| func (cc *http2ClientConn) setGoAway(f *http2GoAwayFrame) { |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| |
| old := cc.goAway |
| cc.goAway = f |
| |
| if cc.goAwayDebug == "" { |
| cc.goAwayDebug = string(f.DebugData()) |
| } |
| if old != nil && old.ErrCode != http2ErrCodeNo { |
| cc.goAway.ErrCode = old.ErrCode |
| } |
| last := f.LastStreamID |
| for streamID, cs := range cc.streams { |
| if streamID > last { |
| select { |
| case cs.resc <- http2resAndError{err: http2errClientConnGotGoAway}: |
| default: |
| } |
| } |
| } |
| } |
| |
| func (cc *http2ClientConn) CanTakeNewRequest() bool { |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| return cc.canTakeNewRequestLocked() |
| } |
| |
| func (cc *http2ClientConn) canTakeNewRequestLocked() bool { |
| if cc.singleUse && cc.nextStreamID > 1 { |
| return false |
| } |
| return cc.goAway == nil && !cc.closed && |
| int64(len(cc.streams)+1) < int64(cc.maxConcurrentStreams) && |
| cc.nextStreamID < math.MaxInt32 |
| } |
| |
| // onIdleTimeout is called from a time.AfterFunc goroutine. It will |
| // only be called when we're idle, but because we're coming from a new |
| // goroutine, there could be a new request coming in at the same time, |
| // so this simply calls the synchronized closeIfIdle to shut down this |
| // connection. The timer could just call closeIfIdle, but this is more |
| // clear. |
| func (cc *http2ClientConn) onIdleTimeout() { |
| cc.closeIfIdle() |
| } |
| |
| func (cc *http2ClientConn) closeIfIdle() { |
| cc.mu.Lock() |
| if len(cc.streams) > 0 { |
| cc.mu.Unlock() |
| return |
| } |
| cc.closed = true |
| nextID := cc.nextStreamID |
| |
| cc.mu.Unlock() |
| |
| if http2VerboseLogs { |
| cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2) |
| } |
| cc.tconn.Close() |
| } |
| |
| const http2maxAllocFrameSize = 512 << 10 |
| |
| // frameBuffer returns a scratch buffer suitable for writing DATA frames. |
| // They're capped at the min of the peer's max frame size or 512KB |
| // (kinda arbitrarily), but definitely capped so we don't allocate 4GB |
| // bufers. |
| func (cc *http2ClientConn) frameScratchBuffer() []byte { |
| cc.mu.Lock() |
| size := cc.maxFrameSize |
| if size > http2maxAllocFrameSize { |
| size = http2maxAllocFrameSize |
| } |
| for i, buf := range cc.freeBuf { |
| if len(buf) >= int(size) { |
| cc.freeBuf[i] = nil |
| cc.mu.Unlock() |
| return buf[:size] |
| } |
| } |
| cc.mu.Unlock() |
| return make([]byte, size) |
| } |
| |
| func (cc *http2ClientConn) putFrameScratchBuffer(buf []byte) { |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| const maxBufs = 4 // arbitrary; 4 concurrent requests per conn? investigate. |
| if len(cc.freeBuf) < maxBufs { |
| cc.freeBuf = append(cc.freeBuf, buf) |
| return |
| } |
| for i, old := range cc.freeBuf { |
| if old == nil { |
| cc.freeBuf[i] = buf |
| return |
| } |
| } |
| |
| } |
| |
| // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not |
| // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests. |
| var http2errRequestCanceled = errors.New("net/http: request canceled") |
| |
| func http2commaSeparatedTrailers(req *Request) (string, error) { |
| keys := make([]string, 0, len(req.Trailer)) |
| for k := range req.Trailer { |
| k = CanonicalHeaderKey(k) |
| switch k { |
| case "Transfer-Encoding", "Trailer", "Content-Length": |
| return "", &http2badStringError{"invalid Trailer key", k} |
| } |
| keys = append(keys, k) |
| } |
| if len(keys) > 0 { |
| sort.Strings(keys) |
| return strings.Join(keys, ","), nil |
| } |
| return "", nil |
| } |
| |
| func (cc *http2ClientConn) responseHeaderTimeout() time.Duration { |
| if cc.t.t1 != nil { |
| return cc.t.t1.ResponseHeaderTimeout |
| } |
| |
| return 0 |
| } |
| |
| // checkConnHeaders checks whether req has any invalid connection-level headers. |
| // per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields. |
| // Certain headers are special-cased as okay but not transmitted later. |
| func http2checkConnHeaders(req *Request) error { |
| if v := req.Header.Get("Upgrade"); v != "" { |
| return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"]) |
| } |
| if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") { |
| return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv) |
| } |
| if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "close" && vv[0] != "keep-alive") { |
| return fmt.Errorf("http2: invalid Connection request header: %q", vv) |
| } |
| return nil |
| } |
| |
| // actualContentLength returns a sanitized version of |
| // req.ContentLength, where 0 actually means zero (not unknown) and -1 |
| // means unknown. |
| func http2actualContentLength(req *Request) int64 { |
| if req.Body == nil { |
| return 0 |
| } |
| if req.ContentLength != 0 { |
| return req.ContentLength |
| } |
| return -1 |
| } |
| |
| func (cc *http2ClientConn) RoundTrip(req *Request) (*Response, error) { |
| if err := http2checkConnHeaders(req); err != nil { |
| return nil, err |
| } |
| if cc.idleTimer != nil { |
| cc.idleTimer.Stop() |
| } |
| |
| trailers, err := http2commaSeparatedTrailers(req) |
| if err != nil { |
| return nil, err |
| } |
| hasTrailers := trailers != "" |
| |
| cc.mu.Lock() |
| cc.lastActive = time.Now() |
| if cc.closed || !cc.canTakeNewRequestLocked() { |
| cc.mu.Unlock() |
| return nil, http2errClientConnUnusable |
| } |
| |
| body := req.Body |
| hasBody := body != nil |
| contentLen := http2actualContentLength(req) |
| |
| // TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere? |
| var requestedGzip bool |
| if !cc.t.disableCompression() && |
| req.Header.Get("Accept-Encoding") == "" && |
| req.Header.Get("Range") == "" && |
| req.Method != "HEAD" { |
| |
| requestedGzip = true |
| } |
| |
| hdrs, err := cc.encodeHeaders(req, requestedGzip, trailers, contentLen) |
| if err != nil { |
| cc.mu.Unlock() |
| return nil, err |
| } |
| |
| cs := cc.newStream() |
| cs.req = req |
| cs.trace = http2requestTrace(req) |
| cs.requestedGzip = requestedGzip |
| bodyWriter := cc.t.getBodyWriterState(cs, body) |
| cs.on100 = bodyWriter.on100 |
| |
| cc.wmu.Lock() |
| endStream := !hasBody && !hasTrailers |
| werr := cc.writeHeaders(cs.ID, endStream, hdrs) |
| cc.wmu.Unlock() |
| http2traceWroteHeaders(cs.trace) |
| cc.mu.Unlock() |
| |
| if werr != nil { |
| if hasBody { |
| req.Body.Close() |
| bodyWriter.cancel() |
| } |
| cc.forgetStreamID(cs.ID) |
| |
| http2traceWroteRequest(cs.trace, werr) |
| return nil, werr |
| } |
| |
| var respHeaderTimer <-chan time.Time |
| if hasBody { |
| bodyWriter.scheduleBodyWrite() |
| } else { |
| http2traceWroteRequest(cs.trace, nil) |
| if d := cc.responseHeaderTimeout(); d != 0 { |
| timer := time.NewTimer(d) |
| defer timer.Stop() |
| respHeaderTimer = timer.C |
| } |
| } |
| |
| readLoopResCh := cs.resc |
| bodyWritten := false |
| ctx := http2reqContext(req) |
| |
| handleReadLoopResponse := func(re http2resAndError) (*Response, error) { |
| res := re.res |
| if re.err != nil || res.StatusCode > 299 { |
| |
| bodyWriter.cancel() |
| cs.abortRequestBodyWrite(http2errStopReqBodyWrite) |
| } |
| if re.err != nil { |
| if re.err == http2errClientConnGotGoAway { |
| cc.mu.Lock() |
| if cs.startedWrite { |
| re.err = http2errClientConnGotGoAwayAfterSomeReqBody |
| } |
| cc.mu.Unlock() |
| } |
| cc.forgetStreamID(cs.ID) |
| return nil, re.err |
| } |
| res.Request = req |
| res.TLS = cc.tlsState |
| return res, nil |
| } |
| |
| for { |
| select { |
| case re := <-readLoopResCh: |
| return handleReadLoopResponse(re) |
| case <-respHeaderTimer: |
| cc.forgetStreamID(cs.ID) |
| if !hasBody || bodyWritten { |
| cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) |
| } else { |
| bodyWriter.cancel() |
| cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) |
| } |
| return nil, http2errTimeout |
| case <-ctx.Done(): |
| cc.forgetStreamID(cs.ID) |
| if !hasBody || bodyWritten { |
| cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) |
| } else { |
| bodyWriter.cancel() |
| cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) |
| } |
| return nil, ctx.Err() |
| case <-req.Cancel: |
| cc.forgetStreamID(cs.ID) |
| if !hasBody || bodyWritten { |
| cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) |
| } else { |
| bodyWriter.cancel() |
| cs.abortRequestBodyWrite(http2errStopReqBodyWriteAndCancel) |
| } |
| return nil, http2errRequestCanceled |
| case <-cs.peerReset: |
| |
| return nil, cs.resetErr |
| case err := <-bodyWriter.resc: |
| |
| select { |
| case re := <-readLoopResCh: |
| return handleReadLoopResponse(re) |
| default: |
| } |
| if err != nil { |
| return nil, err |
| } |
| bodyWritten = true |
| if d := cc.responseHeaderTimeout(); d != 0 { |
| timer := time.NewTimer(d) |
| defer timer.Stop() |
| respHeaderTimer = timer.C |
| } |
| } |
| } |
| } |
| |
| // requires cc.wmu be held |
| func (cc *http2ClientConn) writeHeaders(streamID uint32, endStream bool, hdrs []byte) error { |
| first := true |
| frameSize := int(cc.maxFrameSize) |
| for len(hdrs) > 0 && cc.werr == nil { |
| chunk := hdrs |
| if len(chunk) > frameSize { |
| chunk = chunk[:frameSize] |
| } |
| hdrs = hdrs[len(chunk):] |
| endHeaders := len(hdrs) == 0 |
| if first { |
| cc.fr.WriteHeaders(http2HeadersFrameParam{ |
| StreamID: streamID, |
| BlockFragment: chunk, |
| EndStream: endStream, |
| EndHeaders: endHeaders, |
| }) |
| first = false |
| } else { |
| cc.fr.WriteContinuation(streamID, endHeaders, chunk) |
| } |
| } |
| |
| cc.bw.Flush() |
| return cc.werr |
| } |
| |
| // internal error values; they don't escape to callers |
| var ( |
| // abort request body write; don't send cancel |
| http2errStopReqBodyWrite = errors.New("http2: aborting request body write") |
| |
| // abort request body write, but send stream reset of cancel. |
| http2errStopReqBodyWriteAndCancel = errors.New("http2: canceling request") |
| ) |
| |
| func (cs *http2clientStream) writeRequestBody(body io.Reader, bodyCloser io.Closer) (err error) { |
| cc := cs.cc |
| sentEnd := false |
| buf := cc.frameScratchBuffer() |
| defer cc.putFrameScratchBuffer(buf) |
| |
| defer func() { |
| http2traceWroteRequest(cs.trace, err) |
| |
| cerr := bodyCloser.Close() |
| if err == nil { |
| err = cerr |
| } |
| }() |
| |
| req := cs.req |
| hasTrailers := req.Trailer != nil |
| |
| var sawEOF bool |
| for !sawEOF { |
| n, err := body.Read(buf) |
| if err == io.EOF { |
| sawEOF = true |
| err = nil |
| } else if err != nil { |
| return err |
| } |
| |
| remain := buf[:n] |
| for len(remain) > 0 && err == nil { |
| var allowed int32 |
| allowed, err = cs.awaitFlowControl(len(remain)) |
| switch { |
| case err == http2errStopReqBodyWrite: |
| return err |
| case err == http2errStopReqBodyWriteAndCancel: |
| cc.writeStreamReset(cs.ID, http2ErrCodeCancel, nil) |
| return err |
| case err != nil: |
| return err |
| } |
| cc.wmu.Lock() |
| data := remain[:allowed] |
| remain = remain[allowed:] |
| sentEnd = sawEOF && len(remain) == 0 && !hasTrailers |
| err = cc.fr.WriteData(cs.ID, sentEnd, data) |
| if err == nil { |
| |
| err = cc.bw.Flush() |
| } |
| cc.wmu.Unlock() |
| } |
| if err != nil { |
| return err |
| } |
| } |
| |
| if sentEnd { |
| |
| return nil |
| } |
| |
| var trls []byte |
| if hasTrailers { |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| trls = cc.encodeTrailers(req) |
| } |
| |
| cc.wmu.Lock() |
| defer cc.wmu.Unlock() |
| |
| if len(trls) > 0 { |
| err = cc.writeHeaders(cs.ID, true, trls) |
| } else { |
| err = cc.fr.WriteData(cs.ID, true, nil) |
| } |
| if ferr := cc.bw.Flush(); ferr != nil && err == nil { |
| err = ferr |
| } |
| return err |
| } |
| |
| // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow |
| // control tokens from the server. |
| // It returns either the non-zero number of tokens taken or an error |
| // if the stream is dead. |
| func (cs *http2clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) { |
| cc := cs.cc |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| for { |
| if cc.closed { |
| return 0, http2errClientConnClosed |
| } |
| if cs.stopReqBody != nil { |
| return 0, cs.stopReqBody |
| } |
| if err := cs.checkResetOrDone(); err != nil { |
| return 0, err |
| } |
| if a := cs.flow.available(); a > 0 { |
| take := a |
| if int(take) > maxBytes { |
| |
| take = int32(maxBytes) |
| } |
| if take > int32(cc.maxFrameSize) { |
| take = int32(cc.maxFrameSize) |
| } |
| cs.flow.take(take) |
| return take, nil |
| } |
| cc.cond.Wait() |
| } |
| } |
| |
| type http2badStringError struct { |
| what string |
| str string |
| } |
| |
| func (e *http2badStringError) Error() string { return fmt.Sprintf("%s %q", e.what, e.str) } |
| |
| // requires cc.mu be held. |
| func (cc *http2ClientConn) encodeHeaders(req *Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) { |
| cc.hbuf.Reset() |
| |
| host := req.Host |
| if host == "" { |
| host = req.URL.Host |
| } |
| host, err := httplex.PunycodeHostPort(host) |
| if err != nil { |
| return nil, err |
| } |
| |
| var path string |
| if req.Method != "CONNECT" { |
| path = req.URL.RequestURI() |
| if !http2validPseudoPath(path) { |
| orig := path |
| path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host) |
| if !http2validPseudoPath(path) { |
| if req.URL.Opaque != "" { |
| return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque) |
| } else { |
| return nil, fmt.Errorf("invalid request :path %q", orig) |
| } |
| } |
| } |
| } |
| |
| for k, vv := range req.Header { |
| if !httplex.ValidHeaderFieldName(k) { |
| return nil, fmt.Errorf("invalid HTTP header name %q", k) |
| } |
| for _, v := range vv { |
| if !httplex.ValidHeaderFieldValue(v) { |
| return nil, fmt.Errorf("invalid HTTP header value %q for header %q", v, k) |
| } |
| } |
| } |
| |
| cc.writeHeader(":authority", host) |
| cc.writeHeader(":method", req.Method) |
| if req.Method != "CONNECT" { |
| cc.writeHeader(":path", path) |
| cc.writeHeader(":scheme", req.URL.Scheme) |
| } |
| if trailers != "" { |
| cc.writeHeader("trailer", trailers) |
| } |
| |
| var didUA bool |
| for k, vv := range req.Header { |
| lowKey := strings.ToLower(k) |
| switch lowKey { |
| case "host", "content-length": |
| |
| continue |
| case "connection", "proxy-connection", "transfer-encoding", "upgrade", "keep-alive": |
| |
| continue |
| case "user-agent": |
| |
| didUA = true |
| if len(vv) < 1 { |
| continue |
| } |
| vv = vv[:1] |
| if vv[0] == "" { |
| continue |
| } |
| } |
| for _, v := range vv { |
| cc.writeHeader(lowKey, v) |
| } |
| } |
| if http2shouldSendReqContentLength(req.Method, contentLength) { |
| cc.writeHeader("content-length", strconv.FormatInt(contentLength, 10)) |
| } |
| if addGzipHeader { |
| cc.writeHeader("accept-encoding", "gzip") |
| } |
| if !didUA { |
| cc.writeHeader("user-agent", http2defaultUserAgent) |
| } |
| return cc.hbuf.Bytes(), nil |
| } |
| |
| // shouldSendReqContentLength reports whether the http2.Transport should send |
| // a "content-length" request header. This logic is basically a copy of the net/http |
| // transferWriter.shouldSendContentLength. |
| // The contentLength is the corrected contentLength (so 0 means actually 0, not unknown). |
| // -1 means unknown. |
| func http2shouldSendReqContentLength(method string, contentLength int64) bool { |
| if contentLength > 0 { |
| return true |
| } |
| if contentLength < 0 { |
| return false |
| } |
| |
| switch method { |
| case "POST", "PUT", "PATCH": |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // requires cc.mu be held. |
| func (cc *http2ClientConn) encodeTrailers(req *Request) []byte { |
| cc.hbuf.Reset() |
| for k, vv := range req.Trailer { |
| |
| lowKey := strings.ToLower(k) |
| for _, v := range vv { |
| cc.writeHeader(lowKey, v) |
| } |
| } |
| return cc.hbuf.Bytes() |
| } |
| |
| func (cc *http2ClientConn) writeHeader(name, value string) { |
| if http2VerboseLogs { |
| log.Printf("http2: Transport encoding header %q = %q", name, value) |
| } |
| cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value}) |
| } |
| |
| type http2resAndError struct { |
| res *Response |
| err error |
| } |
| |
| // requires cc.mu be held. |
| func (cc *http2ClientConn) newStream() *http2clientStream { |
| cs := &http2clientStream{ |
| cc: cc, |
| ID: cc.nextStreamID, |
| resc: make(chan http2resAndError, 1), |
| peerReset: make(chan struct{}), |
| done: make(chan struct{}), |
| } |
| cs.flow.add(int32(cc.initialWindowSize)) |
| cs.flow.setConnFlow(&cc.flow) |
| cs.inflow.add(http2transportDefaultStreamFlow) |
| cs.inflow.setConnFlow(&cc.inflow) |
| cc.nextStreamID += 2 |
| cc.streams[cs.ID] = cs |
| return cs |
| } |
| |
| func (cc *http2ClientConn) forgetStreamID(id uint32) { |
| cc.streamByID(id, true) |
| } |
| |
| func (cc *http2ClientConn) streamByID(id uint32, andRemove bool) *http2clientStream { |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| cs := cc.streams[id] |
| if andRemove && cs != nil && !cc.closed { |
| cc.lastActive = time.Now() |
| delete(cc.streams, id) |
| if len(cc.streams) == 0 && cc.idleTimer != nil { |
| cc.idleTimer.Reset(cc.idleTimeout) |
| } |
| close(cs.done) |
| cc.cond.Broadcast() |
| } |
| return cs |
| } |
| |
| // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop. |
| type http2clientConnReadLoop struct { |
| cc *http2ClientConn |
| activeRes map[uint32]*http2clientStream // keyed by streamID |
| closeWhenIdle bool |
| } |
| |
| // readLoop runs in its own goroutine and reads and dispatches frames. |
| func (cc *http2ClientConn) readLoop() { |
| rl := &http2clientConnReadLoop{ |
| cc: cc, |
| activeRes: make(map[uint32]*http2clientStream), |
| } |
| |
| defer rl.cleanup() |
| cc.readerErr = rl.run() |
| if ce, ok := cc.readerErr.(http2ConnectionError); ok { |
| cc.wmu.Lock() |
| cc.fr.WriteGoAway(0, http2ErrCode(ce), nil) |
| cc.wmu.Unlock() |
| } |
| } |
| |
| // GoAwayError is returned by the Transport when the server closes the |
| // TCP connection after sending a GOAWAY frame. |
| type http2GoAwayError struct { |
| LastStreamID uint32 |
| ErrCode http2ErrCode |
| DebugData string |
| } |
| |
| func (e http2GoAwayError) Error() string { |
| return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q", |
| e.LastStreamID, e.ErrCode, e.DebugData) |
| } |
| |
| func http2isEOFOrNetReadError(err error) bool { |
| if err == io.EOF { |
| return true |
| } |
| ne, ok := err.(*net.OpError) |
| return ok && ne.Op == "read" |
| } |
| |
| func (rl *http2clientConnReadLoop) cleanup() { |
| cc := rl.cc |
| defer cc.tconn.Close() |
| defer cc.t.connPool().MarkDead(cc) |
| defer close(cc.readerDone) |
| |
| if cc.idleTimer != nil { |
| cc.idleTimer.Stop() |
| } |
| |
| err := cc.readerErr |
| cc.mu.Lock() |
| if cc.goAway != nil && http2isEOFOrNetReadError(err) { |
| err = http2GoAwayError{ |
| LastStreamID: cc.goAway.LastStreamID, |
| ErrCode: cc.goAway.ErrCode, |
| DebugData: cc.goAwayDebug, |
| } |
| } else if err == io.EOF { |
| err = io.ErrUnexpectedEOF |
| } |
| for _, cs := range rl.activeRes { |
| cs.bufPipe.CloseWithError(err) |
| } |
| for _, cs := range cc.streams { |
| select { |
| case cs.resc <- http2resAndError{err: err}: |
| default: |
| } |
| close(cs.done) |
| } |
| cc.closed = true |
| cc.cond.Broadcast() |
| cc.mu.Unlock() |
| } |
| |
| func (rl *http2clientConnReadLoop) run() error { |
| cc := rl.cc |
| rl.closeWhenIdle = cc.t.disableKeepAlives() || cc.singleUse |
| gotReply := false |
| gotSettings := false |
| for { |
| f, err := cc.fr.ReadFrame() |
| if err != nil { |
| cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err) |
| } |
| if se, ok := err.(http2StreamError); ok { |
| if cs := cc.streamByID(se.StreamID, true); cs != nil { |
| cs.cc.writeStreamReset(cs.ID, se.Code, err) |
| if se.Cause == nil { |
| se.Cause = cc.fr.errDetail |
| } |
| rl.endStreamError(cs, se) |
| } |
| continue |
| } else if err != nil { |
| return err |
| } |
| if http2VerboseLogs { |
| cc.vlogf("http2: Transport received %s", http2summarizeFrame(f)) |
| } |
| if !gotSettings { |
| if _, ok := f.(*http2SettingsFrame); !ok { |
| cc.logf("protocol error: received %T before a SETTINGS frame", f) |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| gotSettings = true |
| } |
| maybeIdle := false |
| |
| switch f := f.(type) { |
| case *http2MetaHeadersFrame: |
| err = rl.processHeaders(f) |
| maybeIdle = true |
| gotReply = true |
| case *http2DataFrame: |
| err = rl.processData(f) |
| maybeIdle = true |
| case *http2GoAwayFrame: |
| err = rl.processGoAway(f) |
| maybeIdle = true |
| case *http2RSTStreamFrame: |
| err = rl.processResetStream(f) |
| maybeIdle = true |
| case *http2SettingsFrame: |
| err = rl.processSettings(f) |
| case *http2PushPromiseFrame: |
| err = rl.processPushPromise(f) |
| case *http2WindowUpdateFrame: |
| err = rl.processWindowUpdate(f) |
| case *http2PingFrame: |
| err = rl.processPing(f) |
| default: |
| cc.logf("Transport: unhandled response frame type %T", f) |
| } |
| if err != nil { |
| if http2VerboseLogs { |
| cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, http2summarizeFrame(f), err) |
| } |
| return err |
| } |
| if rl.closeWhenIdle && gotReply && maybeIdle && len(rl.activeRes) == 0 { |
| cc.closeIfIdle() |
| } |
| } |
| } |
| |
| func (rl *http2clientConnReadLoop) processHeaders(f *http2MetaHeadersFrame) error { |
| cc := rl.cc |
| cs := cc.streamByID(f.StreamID, f.StreamEnded()) |
| if cs == nil { |
| |
| return nil |
| } |
| if !cs.firstByte { |
| if cs.trace != nil { |
| |
| http2traceFirstResponseByte(cs.trace) |
| } |
| cs.firstByte = true |
| } |
| if !cs.pastHeaders { |
| cs.pastHeaders = true |
| } else { |
| return rl.processTrailers(cs, f) |
| } |
| |
| res, err := rl.handleResponse(cs, f) |
| if err != nil { |
| if _, ok := err.(http2ConnectionError); ok { |
| return err |
| } |
| |
| cs.cc.writeStreamReset(f.StreamID, http2ErrCodeProtocol, err) |
| cs.resc <- http2resAndError{err: err} |
| return nil |
| } |
| if res == nil { |
| |
| return nil |
| } |
| if res.Body != http2noBody { |
| rl.activeRes[cs.ID] = cs |
| } |
| cs.resTrailer = &res.Trailer |
| cs.resc <- http2resAndError{res: res} |
| return nil |
| } |
| |
| // may return error types nil, or ConnectionError. Any other error value |
| // is a StreamError of type ErrCodeProtocol. The returned error in that case |
| // is the detail. |
| // |
| // As a special case, handleResponse may return (nil, nil) to skip the |
| // frame (currently only used for 100 expect continue). This special |
| // case is going away after Issue 13851 is fixed. |
| func (rl *http2clientConnReadLoop) handleResponse(cs *http2clientStream, f *http2MetaHeadersFrame) (*Response, error) { |
| if f.Truncated { |
| return nil, http2errResponseHeaderListSize |
| } |
| |
| status := f.PseudoValue("status") |
| if status == "" { |
| return nil, errors.New("missing status pseudo header") |
| } |
| statusCode, err := strconv.Atoi(status) |
| if err != nil { |
| return nil, errors.New("malformed non-numeric status pseudo header") |
| } |
| |
| if statusCode == 100 { |
| http2traceGot100Continue(cs.trace) |
| if cs.on100 != nil { |
| cs.on100() |
| } |
| cs.pastHeaders = false |
| return nil, nil |
| } |
| |
| header := make(Header) |
| res := &Response{ |
| Proto: "HTTP/2.0", |
| ProtoMajor: 2, |
| Header: header, |
| StatusCode: statusCode, |
| Status: status + " " + StatusText(statusCode), |
| } |
| for _, hf := range f.RegularFields() { |
| key := CanonicalHeaderKey(hf.Name) |
| if key == "Trailer" { |
| t := res.Trailer |
| if t == nil { |
| t = make(Header) |
| res.Trailer = t |
| } |
| http2foreachHeaderElement(hf.Value, func(v string) { |
| t[CanonicalHeaderKey(v)] = nil |
| }) |
| } else { |
| header[key] = append(header[key], hf.Value) |
| } |
| } |
| |
| streamEnded := f.StreamEnded() |
| isHead := cs.req.Method == "HEAD" |
| if !streamEnded || isHead { |
| res.ContentLength = -1 |
| if clens := res.Header["Content-Length"]; len(clens) == 1 { |
| if clen64, err := strconv.ParseInt(clens[0], 10, 64); err == nil { |
| res.ContentLength = clen64 |
| } else { |
| |
| } |
| } else if len(clens) > 1 { |
| |
| } |
| } |
| |
| if streamEnded || isHead { |
| res.Body = http2noBody |
| return res, nil |
| } |
| |
| buf := new(bytes.Buffer) |
| cs.bufPipe = http2pipe{b: buf} |
| cs.bytesRemain = res.ContentLength |
| res.Body = http2transportResponseBody{cs} |
| go cs.awaitRequestCancel(cs.req) |
| |
| if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" { |
| res.Header.Del("Content-Encoding") |
| res.Header.Del("Content-Length") |
| res.ContentLength = -1 |
| res.Body = &http2gzipReader{body: res.Body} |
| http2setResponseUncompressed(res) |
| } |
| return res, nil |
| } |
| |
| func (rl *http2clientConnReadLoop) processTrailers(cs *http2clientStream, f *http2MetaHeadersFrame) error { |
| if cs.pastTrailers { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| cs.pastTrailers = true |
| if !f.StreamEnded() { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| if len(f.PseudoFields()) > 0 { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| |
| trailer := make(Header) |
| for _, hf := range f.RegularFields() { |
| key := CanonicalHeaderKey(hf.Name) |
| trailer[key] = append(trailer[key], hf.Value) |
| } |
| cs.trailer = trailer |
| |
| rl.endStream(cs) |
| return nil |
| } |
| |
| // transportResponseBody is the concrete type of Transport.RoundTrip's |
| // Response.Body. It is an io.ReadCloser. On Read, it reads from cs.body. |
| // On Close it sends RST_STREAM if EOF wasn't already seen. |
| type http2transportResponseBody struct { |
| cs *http2clientStream |
| } |
| |
| func (b http2transportResponseBody) Read(p []byte) (n int, err error) { |
| cs := b.cs |
| cc := cs.cc |
| |
| if cs.readErr != nil { |
| return 0, cs.readErr |
| } |
| n, err = b.cs.bufPipe.Read(p) |
| if cs.bytesRemain != -1 { |
| if int64(n) > cs.bytesRemain { |
| n = int(cs.bytesRemain) |
| if err == nil { |
| err = errors.New("net/http: server replied with more than declared Content-Length; truncated") |
| cc.writeStreamReset(cs.ID, http2ErrCodeProtocol, err) |
| } |
| cs.readErr = err |
| return int(cs.bytesRemain), err |
| } |
| cs.bytesRemain -= int64(n) |
| if err == io.EOF && cs.bytesRemain > 0 { |
| err = io.ErrUnexpectedEOF |
| cs.readErr = err |
| return n, err |
| } |
| } |
| if n == 0 { |
| |
| return |
| } |
| |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| |
| var connAdd, streamAdd int32 |
| |
| if v := cc.inflow.available(); v < http2transportDefaultConnFlow/2 { |
| connAdd = http2transportDefaultConnFlow - v |
| cc.inflow.add(connAdd) |
| } |
| if err == nil { |
| |
| v := int(cs.inflow.available()) + cs.bufPipe.Len() |
| if v < http2transportDefaultStreamFlow-http2transportDefaultStreamMinRefresh { |
| streamAdd = int32(http2transportDefaultStreamFlow - v) |
| cs.inflow.add(streamAdd) |
| } |
| } |
| if connAdd != 0 || streamAdd != 0 { |
| cc.wmu.Lock() |
| defer cc.wmu.Unlock() |
| if connAdd != 0 { |
| cc.fr.WriteWindowUpdate(0, http2mustUint31(connAdd)) |
| } |
| if streamAdd != 0 { |
| cc.fr.WriteWindowUpdate(cs.ID, http2mustUint31(streamAdd)) |
| } |
| cc.bw.Flush() |
| } |
| return |
| } |
| |
| var http2errClosedResponseBody = errors.New("http2: response body closed") |
| |
| func (b http2transportResponseBody) Close() error { |
| cs := b.cs |
| cc := cs.cc |
| |
| serverSentStreamEnd := cs.bufPipe.Err() == io.EOF |
| unread := cs.bufPipe.Len() |
| |
| if unread > 0 || !serverSentStreamEnd { |
| cc.mu.Lock() |
| cc.wmu.Lock() |
| if !serverSentStreamEnd { |
| cc.fr.WriteRSTStream(cs.ID, http2ErrCodeCancel) |
| } |
| |
| if unread > 0 { |
| cc.inflow.add(int32(unread)) |
| cc.fr.WriteWindowUpdate(0, uint32(unread)) |
| } |
| cc.bw.Flush() |
| cc.wmu.Unlock() |
| cc.mu.Unlock() |
| } |
| |
| cs.bufPipe.BreakWithError(http2errClosedResponseBody) |
| return nil |
| } |
| |
| func (rl *http2clientConnReadLoop) processData(f *http2DataFrame) error { |
| cc := rl.cc |
| cs := cc.streamByID(f.StreamID, f.StreamEnded()) |
| data := f.Data() |
| if cs == nil { |
| cc.mu.Lock() |
| neverSent := cc.nextStreamID |
| cc.mu.Unlock() |
| if f.StreamID >= neverSent { |
| |
| cc.logf("http2: Transport received unsolicited DATA frame; closing connection") |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| |
| if f.Length > 0 { |
| cc.mu.Lock() |
| cc.inflow.add(int32(f.Length)) |
| cc.mu.Unlock() |
| |
| cc.wmu.Lock() |
| cc.fr.WriteWindowUpdate(0, uint32(f.Length)) |
| cc.bw.Flush() |
| cc.wmu.Unlock() |
| } |
| return nil |
| } |
| if f.Length > 0 { |
| if len(data) > 0 && cs.bufPipe.b == nil { |
| |
| cc.logf("http2: Transport received DATA frame for closed stream; closing connection") |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| |
| cc.mu.Lock() |
| if cs.inflow.available() >= int32(f.Length) { |
| cs.inflow.take(int32(f.Length)) |
| } else { |
| cc.mu.Unlock() |
| return http2ConnectionError(http2ErrCodeFlowControl) |
| } |
| |
| if pad := int32(f.Length) - int32(len(data)); pad > 0 { |
| cs.inflow.add(pad) |
| cc.inflow.add(pad) |
| cc.wmu.Lock() |
| cc.fr.WriteWindowUpdate(0, uint32(pad)) |
| cc.fr.WriteWindowUpdate(cs.ID, uint32(pad)) |
| cc.bw.Flush() |
| cc.wmu.Unlock() |
| } |
| didReset := cs.didReset |
| cc.mu.Unlock() |
| |
| if len(data) > 0 && !didReset { |
| if _, err := cs.bufPipe.Write(data); err != nil { |
| rl.endStreamError(cs, err) |
| return err |
| } |
| } |
| } |
| |
| if f.StreamEnded() { |
| rl.endStream(cs) |
| } |
| return nil |
| } |
| |
| var http2errInvalidTrailers = errors.New("http2: invalid trailers") |
| |
| func (rl *http2clientConnReadLoop) endStream(cs *http2clientStream) { |
| |
| rl.endStreamError(cs, nil) |
| } |
| |
| func (rl *http2clientConnReadLoop) endStreamError(cs *http2clientStream, err error) { |
| var code func() |
| if err == nil { |
| err = io.EOF |
| code = cs.copyTrailers |
| } |
| cs.bufPipe.closeWithErrorAndCode(err, code) |
| delete(rl.activeRes, cs.ID) |
| if http2isConnectionCloseRequest(cs.req) { |
| rl.closeWhenIdle = true |
| } |
| |
| select { |
| case cs.resc <- http2resAndError{err: err}: |
| default: |
| } |
| } |
| |
| func (cs *http2clientStream) copyTrailers() { |
| for k, vv := range cs.trailer { |
| t := cs.resTrailer |
| if *t == nil { |
| *t = make(Header) |
| } |
| (*t)[k] = vv |
| } |
| } |
| |
| func (rl *http2clientConnReadLoop) processGoAway(f *http2GoAwayFrame) error { |
| cc := rl.cc |
| cc.t.connPool().MarkDead(cc) |
| if f.ErrCode != 0 { |
| |
| cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode) |
| } |
| cc.setGoAway(f) |
| return nil |
| } |
| |
| func (rl *http2clientConnReadLoop) processSettings(f *http2SettingsFrame) error { |
| cc := rl.cc |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| |
| if f.IsAck() { |
| if cc.wantSettingsAck { |
| cc.wantSettingsAck = false |
| return nil |
| } |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| |
| err := f.ForeachSetting(func(s http2Setting) error { |
| switch s.ID { |
| case http2SettingMaxFrameSize: |
| cc.maxFrameSize = s.Val |
| case http2SettingMaxConcurrentStreams: |
| cc.maxConcurrentStreams = s.Val |
| case http2SettingInitialWindowSize: |
| |
| if s.Val > math.MaxInt32 { |
| return http2ConnectionError(http2ErrCodeFlowControl) |
| } |
| |
| delta := int32(s.Val) - int32(cc.initialWindowSize) |
| for _, cs := range cc.streams { |
| cs.flow.add(delta) |
| } |
| cc.cond.Broadcast() |
| |
| cc.initialWindowSize = s.Val |
| default: |
| |
| cc.vlogf("Unhandled Setting: %v", s) |
| } |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| |
| cc.wmu.Lock() |
| defer cc.wmu.Unlock() |
| |
| cc.fr.WriteSettingsAck() |
| cc.bw.Flush() |
| return cc.werr |
| } |
| |
| func (rl *http2clientConnReadLoop) processWindowUpdate(f *http2WindowUpdateFrame) error { |
| cc := rl.cc |
| cs := cc.streamByID(f.StreamID, false) |
| if f.StreamID != 0 && cs == nil { |
| return nil |
| } |
| |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| |
| fl := &cc.flow |
| if cs != nil { |
| fl = &cs.flow |
| } |
| if !fl.add(int32(f.Increment)) { |
| return http2ConnectionError(http2ErrCodeFlowControl) |
| } |
| cc.cond.Broadcast() |
| return nil |
| } |
| |
| func (rl *http2clientConnReadLoop) processResetStream(f *http2RSTStreamFrame) error { |
| cs := rl.cc.streamByID(f.StreamID, true) |
| if cs == nil { |
| |
| return nil |
| } |
| select { |
| case <-cs.peerReset: |
| |
| default: |
| err := http2streamError(cs.ID, f.ErrCode) |
| cs.resetErr = err |
| close(cs.peerReset) |
| cs.bufPipe.CloseWithError(err) |
| cs.cc.cond.Broadcast() |
| } |
| delete(rl.activeRes, cs.ID) |
| return nil |
| } |
| |
| // Ping sends a PING frame to the server and waits for the ack. |
| // Public implementation is in go17.go and not_go17.go |
| func (cc *http2ClientConn) ping(ctx http2contextContext) error { |
| c := make(chan struct{}) |
| // Generate a random payload |
| var p [8]byte |
| for { |
| if _, err := rand.Read(p[:]); err != nil { |
| return err |
| } |
| cc.mu.Lock() |
| |
| if _, found := cc.pings[p]; !found { |
| cc.pings[p] = c |
| cc.mu.Unlock() |
| break |
| } |
| cc.mu.Unlock() |
| } |
| cc.wmu.Lock() |
| if err := cc.fr.WritePing(false, p); err != nil { |
| cc.wmu.Unlock() |
| return err |
| } |
| if err := cc.bw.Flush(); err != nil { |
| cc.wmu.Unlock() |
| return err |
| } |
| cc.wmu.Unlock() |
| select { |
| case <-c: |
| return nil |
| case <-ctx.Done(): |
| return ctx.Err() |
| case <-cc.readerDone: |
| |
| return cc.readerErr |
| } |
| } |
| |
| func (rl *http2clientConnReadLoop) processPing(f *http2PingFrame) error { |
| if f.IsAck() { |
| cc := rl.cc |
| cc.mu.Lock() |
| defer cc.mu.Unlock() |
| |
| if c, ok := cc.pings[f.Data]; ok { |
| close(c) |
| delete(cc.pings, f.Data) |
| } |
| return nil |
| } |
| cc := rl.cc |
| cc.wmu.Lock() |
| defer cc.wmu.Unlock() |
| if err := cc.fr.WritePing(true, f.Data); err != nil { |
| return err |
| } |
| return cc.bw.Flush() |
| } |
| |
| func (rl *http2clientConnReadLoop) processPushPromise(f *http2PushPromiseFrame) error { |
| |
| return http2ConnectionError(http2ErrCodeProtocol) |
| } |
| |
| func (cc *http2ClientConn) writeStreamReset(streamID uint32, code http2ErrCode, err error) { |
| |
| cc.wmu.Lock() |
| cc.fr.WriteRSTStream(streamID, code) |
| cc.bw.Flush() |
| cc.wmu.Unlock() |
| } |
| |
| var ( |
| http2errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit") |
| http2errPseudoTrailers = errors.New("http2: invalid pseudo header in trailers") |
| ) |
| |
| func (cc *http2ClientConn) logf(format string, args ...interface{}) { |
| cc.t.logf(format, args...) |
| } |
| |
| func (cc *http2ClientConn) vlogf(format string, args ...interface{}) { |
| cc.t.vlogf(format, args...) |
| } |
| |
| func (t *http2Transport) vlogf(format string, args ...interface{}) { |
| if http2VerboseLogs { |
| t.logf(format, args...) |
| } |
| } |
| |
| func (t *http2Transport) logf(format string, args ...interface{}) { |
| log.Printf(format, args...) |
| } |
| |
| var http2noBody io.ReadCloser = ioutil.NopCloser(bytes.NewReader(nil)) |
| |
| func http2strSliceContains(ss []string, s string) bool { |
| for _, v := range ss { |
| if v == s { |
| return true |
| } |
| } |
| return false |
| } |
| |
| type http2erringRoundTripper struct{ err error } |
| |
| func (rt http2erringRoundTripper) RoundTrip(*Request) (*Response, error) { return nil, rt.err } |
| |
| // gzipReader wraps a response body so it can lazily |
| // call gzip.NewReader on the first call to Read |
| type http2gzipReader struct { |
| body io.ReadCloser // underlying Response.Body |
| zr *gzip.Reader // lazily-initialized gzip reader |
| zerr error // sticky error |
| } |
| |
| func (gz *http2gzipReader) Read(p []byte) (n int, err error) { |
| if gz.zerr != nil { |
| return 0, gz.zerr |
| } |
| if gz.zr == nil { |
| gz.zr, err = gzip.NewReader(gz.body) |
| if err != nil { |
| gz.zerr = err |
| return 0, err |
| } |
| } |
| return gz.zr.Read(p) |
| } |
| |
| func (gz *http2gzipReader) Close() error { |
| return gz.body.Close() |
| } |
| |
| type http2errorReader struct{ err error } |
| |
| func (r http2errorReader) Read(p []byte) (int, error) { return 0, r.err } |
| |
| // bodyWriterState encapsulates various state around the Transport's writing |
| // of the request body, particularly regarding doing delayed writes of the body |
| // when the request contains "Expect: 100-continue". |
| type http2bodyWriterState struct { |
| cs *http2clientStream |
| timer *time.Timer // if non-nil, we're doing a delayed write |
| fnonce *sync.Once // to call fn with |
| fn func() // the code to run in the goroutine, writing the body |
| resc chan error // result of fn's execution |
| delay time.Duration // how long we should delay a delayed write for |
| } |
| |
| func (t *http2Transport) getBodyWriterState(cs *http2clientStream, body io.Reader) (s http2bodyWriterState) { |
| s.cs = cs |
| if body == nil { |
| return |
| } |
| resc := make(chan error, 1) |
| s.resc = resc |
| s.fn = func() { |
| cs.cc.mu.Lock() |
| cs.startedWrite = true |
| cs.cc.mu.Unlock() |
| resc <- cs.writeRequestBody(body, cs.req.Body) |
| } |
| s.delay = t.expectContinueTimeout() |
| if s.delay == 0 || |
| !httplex.HeaderValuesContainsToken( |
| cs.req.Header["Expect"], |
| "100-continue") { |
| return |
| } |
| s.fnonce = new(sync.Once) |
| |
| // Arm the timer with a very large duration, which we'll |
| // intentionally lower later. It has to be large now because |
| // we need a handle to it before writing the headers, but the |
| // s.delay value is defined to not start until after the |
| // request headers were written. |
| const hugeDuration = 365 * 24 * time.Hour |
| s.timer = time.AfterFunc(hugeDuration, func() { |
| s.fnonce.Do(s.fn) |
| }) |
| return |
| } |
| |
| func (s http2bodyWriterState) cancel() { |
| if s.timer != nil { |
| s.timer.Stop() |
| } |
| } |
| |
| func (s http2bodyWriterState) on100() { |
| if s.timer == nil { |
| |
| return |
| } |
| s.timer.Stop() |
| go func() { s.fnonce.Do(s.fn) }() |
| } |
| |
| // scheduleBodyWrite starts writing the body, either immediately (in |
| // the common case) or after the delay timeout. It should not be |
| // called until after the headers have been written. |
| func (s http2bodyWriterState) scheduleBodyWrite() { |
| if s.timer == nil { |
| |
| go s.fn() |
| return |
| } |
| http2traceWait100Continue(s.cs.trace) |
| if s.timer.Stop() { |
| s.timer.Reset(s.delay) |
| } |
| } |
| |
| // isConnectionCloseRequest reports whether req should use its own |
| // connection for a single request and then close the connection. |
| func http2isConnectionCloseRequest(req *Request) bool { |
| return req.Close || httplex.HeaderValuesContainsToken(req.Header["Connection"], "close") |
| } |
| |
| // writeFramer is implemented by any type that is used to write frames. |
| type http2writeFramer interface { |
| writeFrame(http2writeContext) error |
| |
| // staysWithinBuffer reports whether this writer promises that |
| // it will only write less than or equal to size bytes, and it |
| // won't Flush the write context. |
| staysWithinBuffer(size int) bool |
| } |
| |
| // writeContext is the interface needed by the various frame writer |
| // types below. All the writeFrame methods below are scheduled via the |
| // frame writing scheduler (see writeScheduler in writesched.go). |
| // |
| // This interface is implemented by *serverConn. |
| // |
| // TODO: decide whether to a) use this in the client code (which didn't |
| // end up using this yet, because it has a simpler design, not |
| // currently implementing priorities), or b) delete this and |
| // make the server code a bit more concrete. |
| type http2writeContext interface { |
| Framer() *http2Framer |
| Flush() error |
| CloseConn() error |
| // HeaderEncoder returns an HPACK encoder that writes to the |
| // returned buffer. |
| HeaderEncoder() (*hpack.Encoder, *bytes.Buffer) |
| } |
| |
| // writeEndsStream reports whether w writes a frame that will transition |
| // the stream to a half-closed local state. This returns false for RST_STREAM, |
| // which closes the entire stream (not just the local half). |
| func http2writeEndsStream(w http2writeFramer) bool { |
| switch v := w.(type) { |
| case *http2writeData: |
| return v.endStream |
| case *http2writeResHeaders: |
| return v.endStream |
| case nil: |
| |
| panic("writeEndsStream called on nil writeFramer") |
| } |
| return false |
| } |
| |
| type http2flushFrameWriter struct{} |
| |
| func (http2flushFrameWriter) writeFrame(ctx http2writeContext) error { |
| return ctx.Flush() |
| } |
| |
| func (http2flushFrameWriter) staysWithinBuffer(max int) bool { return false } |
| |
| type http2writeSettings []http2Setting |
| |
| func (s http2writeSettings) staysWithinBuffer(max int) bool { |
| const settingSize = 6 // uint16 + uint32 |
| return http2frameHeaderLen+settingSize*len(s) <= max |
| |
| } |
| |
| func (s http2writeSettings) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteSettings([]http2Setting(s)...) |
| } |
| |
| type http2writeGoAway struct { |
| maxStreamID uint32 |
| code http2ErrCode |
| } |
| |
| func (p *http2writeGoAway) writeFrame(ctx http2writeContext) error { |
| err := ctx.Framer().WriteGoAway(p.maxStreamID, p.code, nil) |
| if p.code != 0 { |
| ctx.Flush() |
| time.Sleep(50 * time.Millisecond) |
| ctx.CloseConn() |
| } |
| return err |
| } |
| |
| func (*http2writeGoAway) staysWithinBuffer(max int) bool { return false } |
| |
| type http2writeData struct { |
| streamID uint32 |
| p []byte |
| endStream bool |
| } |
| |
| func (w *http2writeData) String() string { |
| return fmt.Sprintf("writeData(stream=%d, p=%d, endStream=%v)", w.streamID, len(w.p), w.endStream) |
| } |
| |
| func (w *http2writeData) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteData(w.streamID, w.endStream, w.p) |
| } |
| |
| func (w *http2writeData) staysWithinBuffer(max int) bool { |
| return http2frameHeaderLen+len(w.p) <= max |
| } |
| |
| // handlerPanicRST is the message sent from handler goroutines when |
| // the handler panics. |
| type http2handlerPanicRST struct { |
| StreamID uint32 |
| } |
| |
| func (hp http2handlerPanicRST) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteRSTStream(hp.StreamID, http2ErrCodeInternal) |
| } |
| |
| func (hp http2handlerPanicRST) staysWithinBuffer(max int) bool { return http2frameHeaderLen+4 <= max } |
| |
| func (se http2StreamError) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteRSTStream(se.StreamID, se.Code) |
| } |
| |
| func (se http2StreamError) staysWithinBuffer(max int) bool { return http2frameHeaderLen+4 <= max } |
| |
| type http2writePingAck struct{ pf *http2PingFrame } |
| |
| func (w http2writePingAck) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WritePing(true, w.pf.Data) |
| } |
| |
| func (w http2writePingAck) staysWithinBuffer(max int) bool { |
| return http2frameHeaderLen+len(w.pf.Data) <= max |
| } |
| |
| type http2writeSettingsAck struct{} |
| |
| func (http2writeSettingsAck) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteSettingsAck() |
| } |
| |
| func (http2writeSettingsAck) staysWithinBuffer(max int) bool { return http2frameHeaderLen <= max } |
| |
| // splitHeaderBlock splits headerBlock into fragments so that each fragment fits |
| // in a single frame, then calls fn for each fragment. firstFrag/lastFrag are true |
| // for the first/last fragment, respectively. |
| func http2splitHeaderBlock(ctx http2writeContext, headerBlock []byte, fn func(ctx http2writeContext, frag []byte, firstFrag, lastFrag bool) error) error { |
| // For now we're lazy and just pick the minimum MAX_FRAME_SIZE |
| // that all peers must support (16KB). Later we could care |
| // more and send larger frames if the peer advertised it, but |
| // there's little point. Most headers are small anyway (so we |
| // generally won't have CONTINUATION frames), and extra frames |
| // only waste 9 bytes anyway. |
| const maxFrameSize = 16384 |
| |
| first := true |
| for len(headerBlock) > 0 { |
| frag := headerBlock |
| if len(frag) > maxFrameSize { |
| frag = frag[:maxFrameSize] |
| } |
| headerBlock = headerBlock[len(frag):] |
| if err := fn(ctx, frag, first, len(headerBlock) == 0); err != nil { |
| return err |
| } |
| first = false |
| } |
| return nil |
| } |
| |
| // writeResHeaders is a request to write a HEADERS and 0+ CONTINUATION frames |
| // for HTTP response headers or trailers from a server handler. |
| type http2writeResHeaders struct { |
| streamID uint32 |
| httpResCode int // 0 means no ":status" line |
| h Header // may be nil |
| trailers []string // if non-nil, which keys of h to write. nil means all. |
| endStream bool |
| |
| date string |
| contentType string |
| contentLength string |
| } |
| |
| func http2encKV(enc *hpack.Encoder, k, v string) { |
| if http2VerboseLogs { |
| log.Printf("http2: server encoding header %q = %q", k, v) |
| } |
| enc.WriteField(hpack.HeaderField{Name: k, Value: v}) |
| } |
| |
| func (w *http2writeResHeaders) staysWithinBuffer(max int) bool { |
| |
| return false |
| } |
| |
| func (w *http2writeResHeaders) writeFrame(ctx http2writeContext) error { |
| enc, buf := ctx.HeaderEncoder() |
| buf.Reset() |
| |
| if w.httpResCode != 0 { |
| http2encKV(enc, ":status", http2httpCodeString(w.httpResCode)) |
| } |
| |
| http2encodeHeaders(enc, w.h, w.trailers) |
| |
| if w.contentType != "" { |
| http2encKV(enc, "content-type", w.contentType) |
| } |
| if w.contentLength != "" { |
| http2encKV(enc, "content-length", w.contentLength) |
| } |
| if w.date != "" { |
| http2encKV(enc, "date", w.date) |
| } |
| |
| headerBlock := buf.Bytes() |
| if len(headerBlock) == 0 && w.trailers == nil { |
| panic("unexpected empty hpack") |
| } |
| |
| return http2splitHeaderBlock(ctx, headerBlock, w.writeHeaderBlock) |
| } |
| |
| func (w *http2writeResHeaders) writeHeaderBlock(ctx http2writeContext, frag []byte, firstFrag, lastFrag bool) error { |
| if firstFrag { |
| return ctx.Framer().WriteHeaders(http2HeadersFrameParam{ |
| StreamID: w.streamID, |
| BlockFragment: frag, |
| EndStream: w.endStream, |
| EndHeaders: lastFrag, |
| }) |
| } else { |
| return ctx.Framer().WriteContinuation(w.streamID, lastFrag, frag) |
| } |
| } |
| |
| // writePushPromise is a request to write a PUSH_PROMISE and 0+ CONTINUATION frames. |
| type http2writePushPromise struct { |
| streamID uint32 // pusher stream |
| method string // for :method |
| url *url.URL // for :scheme, :authority, :path |
| h Header |
| |
| // Creates an ID for a pushed stream. This runs on serveG just before |
| // the frame is written. The returned ID is copied to promisedID. |
| allocatePromisedID func() (uint32, error) |
| promisedID uint32 |
| } |
| |
| func (w *http2writePushPromise) staysWithinBuffer(max int) bool { |
| |
| return false |
| } |
| |
| func (w *http2writePushPromise) writeFrame(ctx http2writeContext) error { |
| enc, buf := ctx.HeaderEncoder() |
| buf.Reset() |
| |
| http2encKV(enc, ":method", w.method) |
| http2encKV(enc, ":scheme", w.url.Scheme) |
| http2encKV(enc, ":authority", w.url.Host) |
| http2encKV(enc, ":path", w.url.RequestURI()) |
| http2encodeHeaders(enc, w.h, nil) |
| |
| headerBlock := buf.Bytes() |
| if len(headerBlock) == 0 { |
| panic("unexpected empty hpack") |
| } |
| |
| return http2splitHeaderBlock(ctx, headerBlock, w.writeHeaderBlock) |
| } |
| |
| func (w *http2writePushPromise) writeHeaderBlock(ctx http2writeContext, frag []byte, firstFrag, lastFrag bool) error { |
| if firstFrag { |
| return ctx.Framer().WritePushPromise(http2PushPromiseParam{ |
| StreamID: w.streamID, |
| PromiseID: w.promisedID, |
| BlockFragment: frag, |
| EndHeaders: lastFrag, |
| }) |
| } else { |
| return ctx.Framer().WriteContinuation(w.streamID, lastFrag, frag) |
| } |
| } |
| |
| type http2write100ContinueHeadersFrame struct { |
| streamID uint32 |
| } |
| |
| func (w http2write100ContinueHeadersFrame) writeFrame(ctx http2writeContext) error { |
| enc, buf := ctx.HeaderEncoder() |
| buf.Reset() |
| http2encKV(enc, ":status", "100") |
| return ctx.Framer().WriteHeaders(http2HeadersFrameParam{ |
| StreamID: w.streamID, |
| BlockFragment: buf.Bytes(), |
| EndStream: false, |
| EndHeaders: true, |
| }) |
| } |
| |
| func (w http2write100ContinueHeadersFrame) staysWithinBuffer(max int) bool { |
| |
| return 9+2*(len(":status")+len("100")) <= max |
| } |
| |
| type http2writeWindowUpdate struct { |
| streamID uint32 // or 0 for conn-level |
| n uint32 |
| } |
| |
| func (wu http2writeWindowUpdate) staysWithinBuffer(max int) bool { return http2frameHeaderLen+4 <= max } |
| |
| func (wu http2writeWindowUpdate) writeFrame(ctx http2writeContext) error { |
| return ctx.Framer().WriteWindowUpdate(wu.streamID, wu.n) |
| } |
| |
| // encodeHeaders encodes an http.Header. If keys is not nil, then (k, h[k]) |
| // is encoded only only if k is in keys. |
| func http2encodeHeaders(enc *hpack.Encoder, h Header, keys []string) { |
| if keys == nil { |
| sorter := http2sorterPool.Get().(*http2sorter) |
| |
| defer http2sorterPool.Put(sorter) |
| keys = sorter.Keys(h) |
| } |
| for _, k := range keys { |
| vv := h[k] |
| k = http2lowerHeader(k) |
| if !http2validWireHeaderFieldName(k) { |
| |
| continue |
| } |
| isTE := k == "transfer-encoding" |
| for _, v := range vv { |
| if !httplex.ValidHeaderFieldValue(v) { |
| |
| continue |
| } |
| |
| if isTE && v != "trailers" { |
| continue |
| } |
| http2encKV(enc, k, v) |
| } |
| } |
| } |
| |
| // WriteScheduler is the interface implemented by HTTP/2 write schedulers. |
| // Methods are never called concurrently. |
| type http2WriteScheduler interface { |
| // OpenStream opens a new stream in the write scheduler. |
| // It is illegal to call this with streamID=0 or with a streamID that is |
| // already open -- the call may panic. |
| OpenStream(streamID uint32, options http2OpenStreamOptions) |
| |
| // CloseStream closes a stream in the write scheduler. Any frames queued on |
| // this stream should be discarded. It is illegal to call this on a stream |
| // that is not open -- the call may panic. |
| CloseStream(streamID uint32) |
| |
| // AdjustStream adjusts the priority of the given stream. This may be called |
| // on a stream that has not yet been opened or has been closed. Note that |
| // RFC 7540 allows PRIORITY frames to be sent on streams in any state. See: |
| // https://tools.ietf.org/html/rfc7540#section-5.1 |
| AdjustStream(streamID uint32, priority http2PriorityParam) |
| |
| // Push queues a frame in the scheduler. In most cases, this will not be |
| // called with wr.StreamID()!=0 unless that stream is currently open. The one |
| // exception is RST_STREAM frames, which may be sent on idle or closed streams. |
| Push(wr http2FrameWriteRequest) |
| |
| // Pop dequeues the next frame to write. Returns false if no frames can |
| // be written. Frames with a given wr.StreamID() are Pop'd in the same |
| // order they are Push'd. |
| Pop() (wr http2FrameWriteRequest, ok bool) |
| } |
| |
| // OpenStreamOptions specifies extra options for WriteScheduler.OpenStream. |
| type http2OpenStreamOptions struct { |
| // PusherID is zero if the stream was initiated by the client. Otherwise, |
| // PusherID names the stream that pushed the newly opened stream. |
| PusherID uint32 |
| } |
| |
| // FrameWriteRequest is a request to write a frame. |
| type http2FrameWriteRequest struct { |
| // write is the interface value that does the writing, once the |
| // WriteScheduler has selected this frame to write. The write |
| // functions are all defined in write.go. |
| write http2writeFramer |
| |
| // stream is the stream on which this frame will be written. |
| // nil for non-stream frames like PING and SETTINGS. |
| stream *http2stream |
| |
| // done, if non-nil, must be a buffered channel with space for |
| // 1 message and is sent the return value from write (or an |
| // earlier error) when the frame has been written. |
| done chan error |
| } |
| |
| // StreamID returns the id of the stream this frame will be written to. |
| // 0 is used for non-stream frames such as PING and SETTINGS. |
| func (wr http2FrameWriteRequest) StreamID() uint32 { |
| if wr.stream == nil { |
| if se, ok := wr.write.(http2StreamError); ok { |
| |
| return se.StreamID |
| } |
| return 0 |
| } |
| return wr.stream.id |
| } |
| |
| // DataSize returns the number of flow control bytes that must be consumed |
| // to write this entire frame. This is 0 for non-DATA frames. |
| func (wr http2FrameWriteRequest) DataSize() int { |
| if wd, ok := wr.write.(*http2writeData); ok { |
| return len(wd.p) |
| } |
| return 0 |
| } |
| |
| // Consume consumes min(n, available) bytes from this frame, where available |
| // is the number of flow control bytes available on the stream. Consume returns |
| // 0, 1, or 2 frames, where the integer return value gives the number of frames |
| // returned. |
| // |
| // If flow control prevents consuming any bytes, this returns (_, _, 0). If |
| // the entire frame was consumed, this returns (wr, _, 1). Otherwise, this |
| // returns (consumed, rest, 2), where 'consumed' contains the consumed bytes and |
| // 'rest' contains the remaining bytes. The consumed bytes are deducted from the |
| // underlying stream's flow control budget. |
| func (wr http2FrameWriteRequest) Consume(n int32) (http2FrameWriteRequest, http2FrameWriteRequest, int) { |
| var empty http2FrameWriteRequest |
| |
| wd, ok := wr.write.(*http2writeData) |
| if !ok || len(wd.p) == 0 { |
| return wr, empty, 1 |
| } |
| |
| allowed := wr.stream.flow.available() |
| if n < allowed { |
| allowed = n |
| } |
| if wr.stream.sc.maxFrameSize < allowed { |
| allowed = wr.stream.sc.maxFrameSize |
| } |
| if allowed <= 0 { |
| return empty, empty, 0 |
| } |
| if len(wd.p) > int(allowed) { |
| wr.stream.flow.take(allowed) |
| consumed := http2FrameWriteRequest{ |
| stream: wr.stream, |
| write: &http2writeData{ |
| streamID: wd.streamID, |
| p: wd.p[:allowed], |
| |
| endStream: false, |
| }, |
| |
| done: nil, |
| } |
| rest := http2FrameWriteRequest{ |
| stream: wr.stream, |
| write: &http2writeData{ |
| streamID: wd.streamID, |
| p: wd.p[allowed:], |
| endStream: wd.endStream, |
| }, |
| done: wr.done, |
| } |
| return consumed, rest, 2 |
| } |
| |
| wr.stream.flow.take(int32(len(wd.p))) |
| return wr, empty, 1 |
| } |
| |
| // String is for debugging only. |
| func (wr http2FrameWriteRequest) String() string { |
| var des string |
| if s, ok := wr.write.(fmt.Stringer); ok { |
| des = s.String() |
| } else { |
| des = fmt.Sprintf("%T", wr.write) |
| } |
| return fmt.Sprintf("[FrameWriteRequest stream=%d, ch=%v, writer=%v]", wr.StreamID(), wr.done != nil, des) |
| } |
| |
| // replyToWriter sends err to wr.done and panics if the send must block |
| // This does nothing if wr.done is nil. |
| func (wr *http2FrameWriteRequest) replyToWriter(err error) { |
| if wr.done == nil { |
| return |
| } |
| select { |
| case wr.done <- err: |
| default: |
| panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wr.write)) |
| } |
| wr.write = nil |
| } |
| |
| // writeQueue is used by implementations of WriteScheduler. |
| type http2writeQueue struct { |
| s []http2FrameWriteRequest |
| } |
| |
| func (q *http2writeQueue) empty() bool { return len(q.s) == 0 } |
| |
| func (q *http2writeQueue) push(wr http2FrameWriteRequest) { |
| q.s = append(q.s, wr) |
| } |
| |
| func (q *http2writeQueue) shift() http2FrameWriteRequest { |
| if len(q.s) == 0 { |
| panic("invalid use of queue") |
| } |
| wr := q.s[0] |
| |
| copy(q.s, q.s[1:]) |
| q.s[len(q.s)-1] = http2FrameWriteRequest{} |
| q.s = q.s[:len(q.s)-1] |
| return wr |
| } |
| |
| // consume consumes up to n bytes from q.s[0]. If the frame is |
| // entirely consumed, it is removed from the queue. If the frame |
| // is partially consumed, the frame is kept with the consumed |
| // bytes removed. Returns true iff any bytes were consumed. |
| func (q *http2writeQueue) consume(n int32) (http2FrameWriteRequest, bool) { |
| if len(q.s) == 0 { |
| return http2FrameWriteRequest{}, false |
| } |
| consumed, rest, numresult := q.s[0].Consume(n) |
| switch numresult { |
| case 0: |
| return http2FrameWriteRequest{}, false |
| case 1: |
| q.shift() |
| case 2: |
| q.s[0] = rest |
| } |
| return consumed, true |
| } |
| |
| type http2writeQueuePool []*http2writeQueue |
| |
| // put inserts an unused writeQueue into the pool. |
| func (p *http2writeQueuePool) put(q *http2writeQueue) { |
| for i := range q.s { |
| q.s[i] = http2FrameWriteRequest{} |
| } |
| q.s = q.s[:0] |
| *p = append(*p, q) |
| } |
| |
| // get returns an empty writeQueue. |
| func (p *http2writeQueuePool) get() *http2writeQueue { |
| ln := len(*p) |
| if ln == 0 { |
| return new(http2writeQueue) |
| } |
| x := ln - 1 |
| q := (*p)[x] |
| (*p)[x] = nil |
| *p = (*p)[:x] |
| return q |
| } |
| |
| // RFC 7540, Section 5.3.5: the default weight is 16. |
| const http2priorityDefaultWeight = 15 // 16 = 15 + 1 |
| |
| // PriorityWriteSchedulerConfig configures a priorityWriteScheduler. |
| type http2PriorityWriteSchedulerConfig struct { |
| // MaxClosedNodesInTree controls the maximum number of closed streams to |
| // retain in the priority tree. Setting this to zero saves a small amount |
| // of memory at the cost of performance. |
| // |
| // See RFC 7540, Section 5.3.4: |
| // "It is possible for a stream to become closed while prioritization |
| // information ... is in transit. ... This potentially creates suboptimal |
| // prioritization, since the stream could be given a priority that is |
| // different from what is intended. To avoid these problems, an endpoint |
| // SHOULD retain stream prioritization state for a period after streams |
| // become closed. The longer state is retained, the lower the chance that |
| // streams are assigned incorrect or default priority values." |
| MaxClosedNodesInTree int |
| |
| // MaxIdleNodesInTree controls the maximum number of idle streams to |
| // retain in the priority tree. Setting this to zero saves a small amount |
| // of memory at the cost of performance. |
| // |
| // See RFC 7540, Section 5.3.4: |
| // Similarly, streams that are in the "idle" state can be assigned |
| // priority or become a parent of other streams. This allows for the |
| // creation of a grouping node in the dependency tree, which enables |
| // more flexible expressions of priority. Idle streams begin with a |
| // default priority (Section 5.3.5). |
| MaxIdleNodesInTree int |
| |
| // ThrottleOutOfOrderWrites enables write throttling to help ensure that |
| // data is delivered in priority order. This works around a race where |
| // stream B depends on stream A and both streams are about to call Write |
| // to queue DATA frames. If B wins the race, a naive scheduler would eagerly |
| // write as much data from B as possible, but this is suboptimal because A |
| // is a higher-priority stream. With throttling enabled, we write a small |
| // amount of data from B to minimize the amount of bandwidth that B can |
| // steal from A. |
| ThrottleOutOfOrderWrites bool |
| } |
| |
| // NewPriorityWriteScheduler constructs a WriteScheduler that schedules |
| // frames by following HTTP/2 priorities as described in RFC 7340 Section 5.3. |
| // If cfg is nil, default options are used. |
| func http2NewPriorityWriteScheduler(cfg *http2PriorityWriteSchedulerConfig) http2WriteScheduler { |
| if cfg == nil { |
| |
| cfg = &http2PriorityWriteSchedulerConfig{ |
| MaxClosedNodesInTree: 10, |
| MaxIdleNodesInTree: 10, |
| ThrottleOutOfOrderWrites: false, |
| } |
| } |
| |
| ws := &http2priorityWriteScheduler{ |
| nodes: make(map[uint32]*http2priorityNode), |
| maxClosedNodesInTree: cfg.MaxClosedNodesInTree, |
| maxIdleNodesInTree: cfg.MaxIdleNodesInTree, |
| enableWriteThrottle: cfg.ThrottleOutOfOrderWrites, |
| } |
| ws.nodes[0] = &ws.root |
| if cfg.ThrottleOutOfOrderWrites { |
| ws.writeThrottleLimit = 1024 |
| } else { |
| ws.writeThrottleLimit = math.MaxInt32 |
| } |
| return ws |
| } |
| |
| type http2priorityNodeState int |
| |
| const ( |
| http2priorityNodeOpen http2priorityNodeState = iota |
| http2priorityNodeClosed |
| http2priorityNodeIdle |
| ) |
| |
| // priorityNode is a node in an HTTP/2 priority tree. |
| // Each node is associated with a single stream ID. |
| // See RFC 7540, Section 5.3. |
| type http2priorityNode struct { |
| q http2writeQueue // queue of pending frames to write |
| id uint32 // id of the stream, or 0 for the root of the tree |
| weight uint8 // the actual weight is weight+1, so the value is in [1,256] |
| state http2priorityNodeState // open | closed | idle |
| bytes int64 // number of bytes written by this node, or 0 if closed |
| subtreeBytes int64 // sum(node.bytes) of all nodes in this subtree |
| |
| // These links form the priority tree. |
| parent *http2priorityNode |
| kids *http2priorityNode // start of the kids list |
| prev, next *http2priorityNode // doubly-linked list of siblings |
| } |
| |
| func (n *http2priorityNode) setParent(parent *http2priorityNode) { |
| if n == parent { |
| panic("setParent to self") |
| } |
| if n.parent == parent { |
| return |
| } |
| |
| if parent := n.parent; parent != nil { |
| if n.prev == nil { |
| parent.kids = n.next |
| } else { |
| n.prev.next = n.next |
| } |
| if n.next != nil { |
| n.next.prev = n.prev |
| } |
| } |
| |
| n.parent = parent |
| if parent == nil { |
| n.next = nil |
| n.prev = nil |
| } else { |
| n.next = parent.kids |
| n.prev = nil |
| if n.next != nil { |
| n.next.prev = n |
| } |
| parent.kids = n |
| } |
| } |
| |
| func (n *http2priorityNode) addBytes(b int64) { |
| n.bytes += b |
| for ; n != nil; n = n.parent { |
| n.subtreeBytes += b |
| } |
| } |
| |
| // walkReadyInOrder iterates over the tree in priority order, calling f for each node |
| // with a non-empty write queue. When f returns true, this funcion returns true and the |
| // walk halts. tmp is used as scratch space for sorting. |
| // |
| // f(n, openParent) takes two arguments: the node to visit, n, and a bool that is true |
| // if any ancestor p of n is still open (ignoring the root node). |
| func (n *http2priorityNode) walkReadyInOrder(openParent bool, tmp *[]*http2priorityNode, f func(*http2priorityNode, bool) bool) bool { |
| if !n.q.empty() && f(n, openParent) { |
| return true |
| } |
| if n.kids == nil { |
| return false |
| } |
| |
| if n.id != 0 { |
| openParent = openParent || (n.state == http2priorityNodeOpen) |
| } |
| |
| w := n.kids.weight |
| needSort := false |
| for k := n.kids.next; k != nil; k = k.next { |
| if k.weight != w { |
| needSort = true |
| break |
| } |
| } |
| if !needSort { |
| for k := n.kids; k != nil; k = k.next { |
| if k.walkReadyInOrder(openParent, tmp, f) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| *tmp = (*tmp)[:0] |
| for n.kids != nil { |
| *tmp = append(*tmp, n.kids) |
| n.kids.setParent(nil) |
| } |
| sort.Sort(http2sortPriorityNodeSiblings(*tmp)) |
| for i := len(*tmp) - 1; i >= 0; i-- { |
| (*tmp)[i].setParent(n) |
| } |
| for k := n.kids; k != nil; k = k.next { |
| if k.walkReadyInOrder(openParent, tmp, f) { |
| return true |
| } |
| } |
| return false |
| } |
| |
| type http2sortPriorityNodeSiblings []*http2priorityNode |
| |
| func (z http2sortPriorityNodeSiblings) Len() int { return len(z) } |
| |
| func (z http2sortPriorityNodeSiblings) Swap(i, k int) { z[i], z[k] = z[k], z[i] } |
| |
| func (z http2sortPriorityNodeSiblings) Less(i, k int) bool { |
| |
| wi, bi := float64(z[i].weight+1), float64(z[i].subtreeBytes) |
| wk, bk := float64(z[k].weight+1), float64(z[k].subtreeBytes) |
| if bi == 0 && bk == 0 { |
| return wi >= wk |
| } |
| if bk == 0 { |
| return false |
| } |
| return bi/bk <= wi/wk |
| } |
| |
| type http2priorityWriteScheduler struct { |
| // root is the root of the priority tree, where root.id = 0. |
| // The root queues control frames that are not associated with any stream. |
| root http2priorityNode |
| |
| // nodes maps stream ids to priority tree nodes. |
| nodes map[uint32]*http2priorityNode |
| |
| // maxID is the maximum stream id in nodes. |
| maxID uint32 |
| |
| // lists of nodes that have been closed or are idle, but are kept in |
| // the tree for improved prioritization. When the lengths exceed either |
| // maxClosedNodesInTree or maxIdleNodesInTree, old nodes are discarded. |
| closedNodes, idleNodes []*http2priorityNode |
| |
| // From the config. |
| maxClosedNodesInTree int |
| maxIdleNodesInTree int |
| writeThrottleLimit int32 |
| enableWriteThrottle bool |
| |
| // tmp is scratch space for priorityNode.walkReadyInOrder to reduce allocations. |
| tmp []*http2priorityNode |
| |
| // pool of empty queues for reuse. |
| queuePool http2writeQueuePool |
| } |
| |
| func (ws *http2priorityWriteScheduler) OpenStream(streamID uint32, options http2OpenStreamOptions) { |
| |
| if curr := ws.nodes[streamID]; curr != nil { |
| if curr.state != http2priorityNodeIdle { |
| panic(fmt.Sprintf("stream %d already opened", streamID)) |
| } |
| curr.state = http2priorityNodeOpen |
| return |
| } |
| |
| parent := ws.nodes[options.PusherID] |
| if parent == nil { |
| parent = &ws.root |
| } |
| n := &http2priorityNode{ |
| q: *ws.queuePool.get(), |
| id: streamID, |
| weight: http2priorityDefaultWeight, |
| state: http2priorityNodeOpen, |
| } |
| n.setParent(parent) |
| ws.nodes[streamID] = n |
| if streamID > ws.maxID { |
| ws.maxID = streamID |
| } |
| } |
| |
| func (ws *http2priorityWriteScheduler) CloseStream(streamID uint32) { |
| if streamID == 0 { |
| panic("violation of WriteScheduler interface: cannot close stream 0") |
| } |
| if ws.nodes[streamID] == nil { |
| panic(fmt.Sprintf("violation of WriteScheduler interface: unknown stream %d", streamID)) |
| } |
| if ws.nodes[streamID].state != http2priorityNodeOpen { |
| panic(fmt.Sprintf("violation of WriteScheduler interface: stream %d already closed", streamID)) |
| } |
| |
| n := ws.nodes[streamID] |
| n.state = http2priorityNodeClosed |
| n.addBytes(-n.bytes) |
| |
| q := n.q |
| ws.queuePool.put(&q) |
| n.q.s = nil |
| if ws.maxClosedNodesInTree > 0 { |
| ws.addClosedOrIdleNode(&ws.closedNodes, ws.maxClosedNodesInTree, n) |
| } else { |
| ws.removeNode(n) |
| } |
| } |
| |
| func (ws *http2priorityWriteScheduler) AdjustStream(streamID uint32, priority http2PriorityParam) { |
| if streamID == 0 { |
| panic("adjustPriority on root") |
| } |
| |
| n := ws.nodes[streamID] |
| if n == nil { |
| if streamID <= ws.maxID || ws.maxIdleNodesInTree == 0 { |
| return |
| } |
| ws.maxID = streamID |
| n = &http2priorityNode{ |
| q: *ws.queuePool.get(), |
| id: streamID, |
| weight: http2priorityDefaultWeight, |
| state: http2priorityNodeIdle, |
| } |
| n.setParent(&ws.root) |
| ws.nodes[streamID] = n |
| ws.addClosedOrIdleNode(&ws.idleNodes, ws.maxIdleNodesInTree, n) |
| } |
| |
| parent := ws.nodes[priority.StreamDep] |
| if parent == nil { |
| n.setParent(&ws.root) |
| n.weight = http2priorityDefaultWeight |
| return |
| } |
| |
| if n == parent { |
| return |
| } |
| |
| for x := parent.parent; x != nil; x = x.parent { |
| if x == n { |
| parent.setParent(n.parent) |
| break |
| } |
| } |
| |
| if priority.Exclusive { |
| k := parent.kids |
| for k != nil { |
| next := k.next |
| if k != n { |
| k.setParent(n) |
| } |
| k = next |
| } |
| } |
| |
| n.setParent(parent) |
| n.weight = priority.Weight |
| } |
| |
| func (ws *http2priorityWriteScheduler) Push(wr http2FrameWriteRequest) { |
| var n *http2priorityNode |
| if id := wr.StreamID(); id == 0 { |
| n = &ws.root |
| } else { |
| n = ws.nodes[id] |
| if n == nil { |
| |
| if wr.DataSize() > 0 { |
| panic("add DATA on non-open stream") |
| } |
| n = &ws.root |
| } |
| } |
| n.q.push(wr) |
| } |
| |
| func (ws *http2priorityWriteScheduler) Pop() (wr http2FrameWriteRequest, ok bool) { |
| ws.root.walkReadyInOrder(false, &ws.tmp, func(n *http2priorityNode, openParent bool) bool { |
| limit := int32(math.MaxInt32) |
| if openParent { |
| limit = ws.writeThrottleLimit |
| } |
| wr, ok = n.q.consume(limit) |
| if !ok { |
| return false |
| } |
| n.addBytes(int64(wr.DataSize())) |
| |
| if openParent { |
| ws.writeThrottleLimit += 1024 |
| if ws.writeThrottleLimit < 0 { |
| ws.writeThrottleLimit = math.MaxInt32 |
| } |
| } else if ws.enableWriteThrottle { |
| ws.writeThrottleLimit = 1024 |
| } |
| return true |
| }) |
| return wr, ok |
| } |
| |
| func (ws *http2priorityWriteScheduler) addClosedOrIdleNode(list *[]*http2priorityNode, maxSize int, n *http2priorityNode) { |
| if maxSize == 0 { |
| return |
| } |
| if len(*list) == maxSize { |
| |
| ws.removeNode((*list)[0]) |
| x := (*list)[1:] |
| copy(*list, x) |
| *list = (*list)[:len(x)] |
| } |
| *list = append(*list, n) |
| } |
| |
| func (ws *http2priorityWriteScheduler) removeNode(n *http2priorityNode) { |
| for k := n.kids; k != nil; k = k.next { |
| k.setParent(n.parent) |
| } |
| n.setParent(nil) |
| delete(ws.nodes, n.id) |
| } |
| |
| // NewRandomWriteScheduler constructs a WriteScheduler that ignores HTTP/2 |
| // priorities. Control frames like SETTINGS and PING are written before DATA |
| // frames, but if no control frames are queued and multiple streams have queued |
| // HEADERS or DATA frames, Pop selects a ready stream arbitrarily. |
| func http2NewRandomWriteScheduler() http2WriteScheduler { |
| return &http2randomWriteScheduler{sq: make(map[uint32]*http2writeQueue)} |
| } |
| |
| type http2randomWriteScheduler struct { |
| // zero are frames not associated with a specific stream. |
| zero http2writeQueue |
| |
| // sq contains the stream-specific queues, keyed by stream ID. |
| // When a stream is idle or closed, it's deleted from the map. |
| sq map[uint32]*http2writeQueue |
| |
| // pool of empty queues for reuse. |
| queuePool http2writeQueuePool |
| } |
| |
| func (ws *http2randomWriteScheduler) OpenStream(streamID uint32, options http2OpenStreamOptions) { |
| |
| } |
| |
| func (ws *http2randomWriteScheduler) CloseStream(streamID uint32) { |
| q, ok := ws.sq[streamID] |
| if !ok { |
| return |
| } |
| delete(ws.sq, streamID) |
| ws.queuePool.put(q) |
| } |
| |
| func (ws *http2randomWriteScheduler) AdjustStream(streamID uint32, priority http2PriorityParam) { |
| |
| } |
| |
| func (ws *http2randomWriteScheduler) Push(wr http2FrameWriteRequest) { |
| id := wr.StreamID() |
| if id == 0 { |
| ws.zero.push(wr) |
| return |
| } |
| q, ok := ws.sq[id] |
| if !ok { |
| q = ws.queuePool.get() |
| ws.sq[id] = q |
| } |
| q.push(wr) |
| } |
| |
| func (ws *http2randomWriteScheduler) Pop() (http2FrameWriteRequest, bool) { |
| |
| if !ws.zero.empty() { |
| return ws.zero.shift(), true |
| } |
| |
| for _, q := range ws.sq { |
| if wr, ok := q.consume(math.MaxInt32); ok { |
| return wr, true |
| } |
| } |
| return http2FrameWriteRequest{}, false |
| } |