| // Copyright 2011 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // HTTP client implementation. See RFC 7230 through 7235. |
| // |
| // This is the low-level Transport implementation of RoundTripper. |
| // The high-level interface is in client.go. |
| |
| package http |
| |
| import ( |
| "bufio" |
| "compress/gzip" |
| "container/list" |
| "context" |
| "crypto/tls" |
| "errors" |
| "fmt" |
| "io" |
| "log" |
| "net" |
| "net/http/httptrace" |
| "net/textproto" |
| "net/url" |
| "os" |
| "reflect" |
| "strings" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "golang.org/x/net/http/httpguts" |
| "golang.org/x/net/http/httpproxy" |
| ) |
| |
| // DefaultTransport is the default implementation of Transport and is |
| // used by DefaultClient. It establishes network connections as needed |
| // and caches them for reuse by subsequent calls. It uses HTTP proxies |
| // as directed by the $HTTP_PROXY and $NO_PROXY (or $http_proxy and |
| // $no_proxy) environment variables. |
| var DefaultTransport RoundTripper = &Transport{ |
| Proxy: ProxyFromEnvironment, |
| DialContext: (&net.Dialer{ |
| Timeout: 30 * time.Second, |
| KeepAlive: 30 * time.Second, |
| }).DialContext, |
| ForceAttemptHTTP2: true, |
| MaxIdleConns: 100, |
| IdleConnTimeout: 90 * time.Second, |
| TLSHandshakeTimeout: 10 * time.Second, |
| ExpectContinueTimeout: 1 * time.Second, |
| } |
| |
| // DefaultMaxIdleConnsPerHost is the default value of Transport's |
| // MaxIdleConnsPerHost. |
| const DefaultMaxIdleConnsPerHost = 2 |
| |
| // Transport is an implementation of RoundTripper that supports HTTP, |
| // HTTPS, and HTTP proxies (for either HTTP or HTTPS with CONNECT). |
| // |
| // By default, Transport caches connections for future re-use. |
| // This may leave many open connections when accessing many hosts. |
| // This behavior can be managed using Transport's CloseIdleConnections method |
| // and the MaxIdleConnsPerHost and DisableKeepAlives fields. |
| // |
| // Transports should be reused instead of created as needed. |
| // Transports are safe for concurrent use by multiple goroutines. |
| // |
| // A Transport is a low-level primitive for making HTTP and HTTPS requests. |
| // For high-level functionality, such as cookies and redirects, see Client. |
| // |
| // Transport uses HTTP/1.1 for HTTP URLs and either HTTP/1.1 or HTTP/2 |
| // for HTTPS URLs, depending on whether the server supports HTTP/2, |
| // and how the Transport is configured. The DefaultTransport supports HTTP/2. |
| // To explicitly enable HTTP/2 on a transport, use golang.org/x/net/http2 |
| // and call ConfigureTransport. See the package docs for more about HTTP/2. |
| // |
| // Responses with status codes in the 1xx range are either handled |
| // automatically (100 expect-continue) or ignored. The one |
| // exception is HTTP status code 101 (Switching Protocols), which is |
| // considered a terminal status and returned by RoundTrip. To see the |
| // ignored 1xx responses, use the httptrace trace package's |
| // ClientTrace.Got1xxResponse. |
| // |
| // Transport only retries a request upon encountering a network error |
| // if the request is idempotent and either has no body or has its |
| // Request.GetBody defined. HTTP requests are considered idempotent if |
| // they have HTTP methods GET, HEAD, OPTIONS, or TRACE; or if their |
| // Header map contains an "Idempotency-Key" or "X-Idempotency-Key" |
| // entry. If the idempotency key value is a zero-length slice, the |
| // request is treated as idempotent but the header is not sent on the |
| // wire. |
| type Transport struct { |
| idleMu sync.Mutex |
| closeIdle bool // user has requested to close all idle conns |
| idleConn map[connectMethodKey][]*persistConn // most recently used at end |
| idleConnWait map[connectMethodKey]wantConnQueue // waiting getConns |
| idleLRU connLRU |
| |
| reqMu sync.Mutex |
| reqCanceler map[cancelKey]func(error) |
| |
| altMu sync.Mutex // guards changing altProto only |
| altProto atomic.Value // of nil or map[string]RoundTripper, key is URI scheme |
| |
| connsPerHostMu sync.Mutex |
| connsPerHost map[connectMethodKey]int |
| connsPerHostWait map[connectMethodKey]wantConnQueue // waiting getConns |
| |
| // Proxy specifies a function to return a proxy for a given |
| // Request. If the function returns a non-nil error, the |
| // request is aborted with the provided error. |
| // |
| // The proxy type is determined by the URL scheme. "http", |
| // "https", and "socks5" are supported. If the scheme is empty, |
| // "http" is assumed. |
| // |
| // If Proxy is nil or returns a nil *URL, no proxy is used. |
| Proxy func(*Request) (*url.URL, error) |
| |
| // DialContext specifies the dial function for creating unencrypted TCP connections. |
| // If DialContext is nil (and the deprecated Dial below is also nil), |
| // then the transport dials using package net. |
| // |
| // DialContext runs concurrently with calls to RoundTrip. |
| // A RoundTrip call that initiates a dial may end up using |
| // a connection dialed previously when the earlier connection |
| // becomes idle before the later DialContext completes. |
| DialContext func(ctx context.Context, network, addr string) (net.Conn, error) |
| |
| // Dial specifies the dial function for creating unencrypted TCP connections. |
| // |
| // Dial runs concurrently with calls to RoundTrip. |
| // A RoundTrip call that initiates a dial may end up using |
| // a connection dialed previously when the earlier connection |
| // becomes idle before the later Dial completes. |
| // |
| // Deprecated: Use DialContext instead, which allows the transport |
| // to cancel dials as soon as they are no longer needed. |
| // If both are set, DialContext takes priority. |
| Dial func(network, addr string) (net.Conn, error) |
| |
| // DialTLSContext specifies an optional dial function for creating |
| // TLS connections for non-proxied HTTPS requests. |
| // |
| // If DialTLSContext is nil (and the deprecated DialTLS below is also nil), |
| // DialContext and TLSClientConfig are used. |
| // |
| // If DialTLSContext is set, the Dial and DialContext hooks are not used for HTTPS |
| // requests and the TLSClientConfig and TLSHandshakeTimeout |
| // are ignored. The returned net.Conn is assumed to already be |
| // past the TLS handshake. |
| DialTLSContext func(ctx context.Context, network, addr string) (net.Conn, error) |
| |
| // DialTLS specifies an optional dial function for creating |
| // TLS connections for non-proxied HTTPS requests. |
| // |
| // Deprecated: Use DialTLSContext instead, which allows the transport |
| // to cancel dials as soon as they are no longer needed. |
| // If both are set, DialTLSContext takes priority. |
| DialTLS func(network, addr string) (net.Conn, error) |
| |
| // TLSClientConfig specifies the TLS configuration to use with |
| // tls.Client. |
| // If nil, the default configuration is used. |
| // If non-nil, HTTP/2 support may not be enabled by default. |
| TLSClientConfig *tls.Config |
| |
| // TLSHandshakeTimeout specifies the maximum amount of time waiting to |
| // wait for a TLS handshake. Zero means no timeout. |
| TLSHandshakeTimeout time.Duration |
| |
| // DisableKeepAlives, if true, disables HTTP keep-alives and |
| // will only use the connection to the server for a single |
| // HTTP request. |
| // |
| // This is unrelated to the similarly named TCP keep-alives. |
| DisableKeepAlives bool |
| |
| // DisableCompression, if true, prevents the Transport from |
| // requesting compression with an "Accept-Encoding: gzip" |
| // request header when the Request contains no existing |
| // Accept-Encoding value. If the Transport requests gzip on |
| // its own and gets a gzipped response, it's transparently |
| // decoded in the Response.Body. However, if the user |
| // explicitly requested gzip it is not automatically |
| // uncompressed. |
| DisableCompression bool |
| |
| // MaxIdleConns controls the maximum number of idle (keep-alive) |
| // connections across all hosts. Zero means no limit. |
| MaxIdleConns int |
| |
| // MaxIdleConnsPerHost, if non-zero, controls the maximum idle |
| // (keep-alive) connections to keep per-host. If zero, |
| // DefaultMaxIdleConnsPerHost is used. |
| MaxIdleConnsPerHost int |
| |
| // MaxConnsPerHost optionally limits the total number of |
| // connections per host, including connections in the dialing, |
| // active, and idle states. On limit violation, dials will block. |
| // |
| // Zero means no limit. |
| MaxConnsPerHost int |
| |
| // IdleConnTimeout is the maximum amount of time an idle |
| // (keep-alive) connection will remain idle before closing |
| // itself. |
| // Zero means no limit. |
| IdleConnTimeout time.Duration |
| |
| // ResponseHeaderTimeout, if non-zero, specifies the amount of |
| // time to wait for a server's response headers after fully |
| // writing the request (including its body, if any). This |
| // time does not include the time to read the response body. |
| ResponseHeaderTimeout time.Duration |
| |
| // ExpectContinueTimeout, if non-zero, specifies the amount of |
| // time to wait for a server's first response headers after fully |
| // writing the request headers if the request has an |
| // "Expect: 100-continue" header. Zero means no timeout and |
| // causes the body to be sent immediately, without |
| // waiting for the server to approve. |
| // This time does not include the time to send the request header. |
| ExpectContinueTimeout time.Duration |
| |
| // TLSNextProto specifies how the Transport switches to an |
| // alternate protocol (such as HTTP/2) after a TLS ALPN |
| // protocol negotiation. If Transport dials an TLS connection |
| // with a non-empty protocol name and TLSNextProto contains a |
| // map entry for that key (such as "h2"), then the func is |
| // called with the request's authority (such as "example.com" |
| // or "example.com:1234") and the TLS connection. The function |
| // must return a RoundTripper that then handles the request. |
| // If TLSNextProto is not nil, HTTP/2 support is not enabled |
| // automatically. |
| TLSNextProto map[string]func(authority string, c *tls.Conn) RoundTripper |
| |
| // ProxyConnectHeader optionally specifies headers to send to |
| // proxies during CONNECT requests. |
| // To set the header dynamically, see GetProxyConnectHeader. |
| ProxyConnectHeader Header |
| |
| // GetProxyConnectHeader optionally specifies a func to return |
| // headers to send to proxyURL during a CONNECT request to the |
| // ip:port target. |
| // If it returns an error, the Transport's RoundTrip fails with |
| // that error. It can return (nil, nil) to not add headers. |
| // If GetProxyConnectHeader is non-nil, ProxyConnectHeader is |
| // ignored. |
| GetProxyConnectHeader func(ctx context.Context, proxyURL *url.URL, target string) (Header, error) |
| |
| // MaxResponseHeaderBytes specifies a limit on how many |
| // response bytes are allowed in the server's response |
| // header. |
| // |
| // Zero means to use a default limit. |
| MaxResponseHeaderBytes int64 |
| |
| // WriteBufferSize specifies the size of the write buffer used |
| // when writing to the transport. |
| // If zero, a default (currently 4KB) is used. |
| WriteBufferSize int |
| |
| // ReadBufferSize specifies the size of the read buffer used |
| // when reading from the transport. |
| // If zero, a default (currently 4KB) is used. |
| ReadBufferSize int |
| |
| // nextProtoOnce guards initialization of TLSNextProto and |
| // h2transport (via onceSetNextProtoDefaults) |
| nextProtoOnce sync.Once |
| h2transport h2Transport // non-nil if http2 wired up |
| tlsNextProtoWasNil bool // whether TLSNextProto was nil when the Once fired |
| |
| // ForceAttemptHTTP2 controls whether HTTP/2 is enabled when a non-zero |
| // Dial, DialTLS, or DialContext func or TLSClientConfig is provided. |
| // By default, use of any those fields conservatively disables HTTP/2. |
| // To use a custom dialer or TLS config and still attempt HTTP/2 |
| // upgrades, set this to true. |
| ForceAttemptHTTP2 bool |
| } |
| |
| // A cancelKey is the key of the reqCanceler map. |
| // We wrap the *Request in this type since we want to use the original request, |
| // not any transient one created by roundTrip. |
| type cancelKey struct { |
| req *Request |
| } |
| |
| func (t *Transport) writeBufferSize() int { |
| if t.WriteBufferSize > 0 { |
| return t.WriteBufferSize |
| } |
| return 4 << 10 |
| } |
| |
| func (t *Transport) readBufferSize() int { |
| if t.ReadBufferSize > 0 { |
| return t.ReadBufferSize |
| } |
| return 4 << 10 |
| } |
| |
| // Clone returns a deep copy of t's exported fields. |
| func (t *Transport) Clone() *Transport { |
| t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) |
| t2 := &Transport{ |
| Proxy: t.Proxy, |
| DialContext: t.DialContext, |
| Dial: t.Dial, |
| DialTLS: t.DialTLS, |
| DialTLSContext: t.DialTLSContext, |
| TLSHandshakeTimeout: t.TLSHandshakeTimeout, |
| DisableKeepAlives: t.DisableKeepAlives, |
| DisableCompression: t.DisableCompression, |
| MaxIdleConns: t.MaxIdleConns, |
| MaxIdleConnsPerHost: t.MaxIdleConnsPerHost, |
| MaxConnsPerHost: t.MaxConnsPerHost, |
| IdleConnTimeout: t.IdleConnTimeout, |
| ResponseHeaderTimeout: t.ResponseHeaderTimeout, |
| ExpectContinueTimeout: t.ExpectContinueTimeout, |
| ProxyConnectHeader: t.ProxyConnectHeader.Clone(), |
| GetProxyConnectHeader: t.GetProxyConnectHeader, |
| MaxResponseHeaderBytes: t.MaxResponseHeaderBytes, |
| ForceAttemptHTTP2: t.ForceAttemptHTTP2, |
| WriteBufferSize: t.WriteBufferSize, |
| ReadBufferSize: t.ReadBufferSize, |
| } |
| if t.TLSClientConfig != nil { |
| t2.TLSClientConfig = t.TLSClientConfig.Clone() |
| } |
| if !t.tlsNextProtoWasNil { |
| npm := map[string]func(authority string, c *tls.Conn) RoundTripper{} |
| for k, v := range t.TLSNextProto { |
| npm[k] = v |
| } |
| t2.TLSNextProto = npm |
| } |
| return t2 |
| } |
| |
| // h2Transport is the interface we expect to be able to call from |
| // net/http against an *http2.Transport that's either bundled into |
| // h2_bundle.go or supplied by the user via x/net/http2. |
| // |
| // We name it with the "h2" prefix to stay out of the "http2" prefix |
| // namespace used by x/tools/cmd/bundle for h2_bundle.go. |
| type h2Transport interface { |
| CloseIdleConnections() |
| } |
| |
| func (t *Transport) hasCustomTLSDialer() bool { |
| return t.DialTLS != nil || t.DialTLSContext != nil |
| } |
| |
| // onceSetNextProtoDefaults initializes TLSNextProto. |
| // It must be called via t.nextProtoOnce.Do. |
| func (t *Transport) onceSetNextProtoDefaults() { |
| t.tlsNextProtoWasNil = (t.TLSNextProto == nil) |
| if strings.Contains(os.Getenv("GODEBUG"), "http2client=0") { |
| return |
| } |
| |
| // If they've already configured http2 with |
| // golang.org/x/net/http2 instead of the bundled copy, try to |
| // get at its http2.Transport value (via the "https" |
| // altproto map) so we can call CloseIdleConnections on it if |
| // requested. (Issue 22891) |
| altProto, _ := t.altProto.Load().(map[string]RoundTripper) |
| if rv := reflect.ValueOf(altProto["https"]); rv.IsValid() && rv.Type().Kind() == reflect.Struct && rv.Type().NumField() == 1 { |
| if v := rv.Field(0); v.CanInterface() { |
| if h2i, ok := v.Interface().(h2Transport); ok { |
| t.h2transport = h2i |
| return |
| } |
| } |
| } |
| |
| if t.TLSNextProto != nil { |
| // This is the documented way to disable http2 on a |
| // Transport. |
| return |
| } |
| if !t.ForceAttemptHTTP2 && (t.TLSClientConfig != nil || t.Dial != nil || t.DialContext != nil || t.hasCustomTLSDialer()) { |
| // Be conservative and don't automatically enable |
| // http2 if they've specified a custom TLS config or |
| // custom dialers. Let them opt-in themselves via |
| // http2.ConfigureTransport so we don't surprise them |
| // by modifying their tls.Config. Issue 14275. |
| // However, if ForceAttemptHTTP2 is true, it overrides the above checks. |
| return |
| } |
| if omitBundledHTTP2 { |
| return |
| } |
| t2, err := http2configureTransports(t) |
| if err != nil { |
| log.Printf("Error enabling Transport HTTP/2 support: %v", err) |
| return |
| } |
| t.h2transport = t2 |
| |
| // Auto-configure the http2.Transport's MaxHeaderListSize from |
| // the http.Transport's MaxResponseHeaderBytes. They don't |
| // exactly mean the same thing, but they're close. |
| // |
| // TODO: also add this to x/net/http2.Configure Transport, behind |
| // a +build go1.7 build tag: |
| if limit1 := t.MaxResponseHeaderBytes; limit1 != 0 && t2.MaxHeaderListSize == 0 { |
| const h2max = 1<<32 - 1 |
| if limit1 >= h2max { |
| t2.MaxHeaderListSize = h2max |
| } else { |
| t2.MaxHeaderListSize = uint32(limit1) |
| } |
| } |
| } |
| |
| // ProxyFromEnvironment returns the URL of the proxy to use for a |
| // given request, as indicated by the environment variables |
| // HTTP_PROXY, HTTPS_PROXY and NO_PROXY (or the lowercase versions |
| // thereof). HTTPS_PROXY takes precedence over HTTP_PROXY for https |
| // requests. |
| // |
| // The environment values may be either a complete URL or a |
| // "host[:port]", in which case the "http" scheme is assumed. |
| // An error is returned if the value is a different form. |
| // |
| // A nil URL and nil error are returned if no proxy is defined in the |
| // environment, or a proxy should not be used for the given request, |
| // as defined by NO_PROXY. |
| // |
| // As a special case, if req.URL.Host is "localhost" (with or without |
| // a port number), then a nil URL and nil error will be returned. |
| func ProxyFromEnvironment(req *Request) (*url.URL, error) { |
| return envProxyFunc()(req.URL) |
| } |
| |
| // ProxyURL returns a proxy function (for use in a Transport) |
| // that always returns the same URL. |
| func ProxyURL(fixedURL *url.URL) func(*Request) (*url.URL, error) { |
| return func(*Request) (*url.URL, error) { |
| return fixedURL, nil |
| } |
| } |
| |
| // transportRequest is a wrapper around a *Request that adds |
| // optional extra headers to write and stores any error to return |
| // from roundTrip. |
| type transportRequest struct { |
| *Request // original request, not to be mutated |
| extra Header // extra headers to write, or nil |
| trace *httptrace.ClientTrace // optional |
| cancelKey cancelKey |
| |
| mu sync.Mutex // guards err |
| err error // first setError value for mapRoundTripError to consider |
| } |
| |
| func (tr *transportRequest) extraHeaders() Header { |
| if tr.extra == nil { |
| tr.extra = make(Header) |
| } |
| return tr.extra |
| } |
| |
| func (tr *transportRequest) setError(err error) { |
| tr.mu.Lock() |
| if tr.err == nil { |
| tr.err = err |
| } |
| tr.mu.Unlock() |
| } |
| |
| // useRegisteredProtocol reports whether an alternate protocol (as registered |
| // with Transport.RegisterProtocol) should be respected for this request. |
| func (t *Transport) useRegisteredProtocol(req *Request) bool { |
| if req.URL.Scheme == "https" && req.requiresHTTP1() { |
| // If this request requires HTTP/1, don't use the |
| // "https" alternate protocol, which is used by the |
| // HTTP/2 code to take over requests if there's an |
| // existing cached HTTP/2 connection. |
| return false |
| } |
| return true |
| } |
| |
| // alternateRoundTripper returns the alternate RoundTripper to use |
| // for this request if the Request's URL scheme requires one, |
| // or nil for the normal case of using the Transport. |
| func (t *Transport) alternateRoundTripper(req *Request) RoundTripper { |
| if !t.useRegisteredProtocol(req) { |
| return nil |
| } |
| altProto, _ := t.altProto.Load().(map[string]RoundTripper) |
| return altProto[req.URL.Scheme] |
| } |
| |
| // roundTrip implements a RoundTripper over HTTP. |
| func (t *Transport) roundTrip(req *Request) (*Response, error) { |
| t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) |
| ctx := req.Context() |
| trace := httptrace.ContextClientTrace(ctx) |
| |
| if req.URL == nil { |
| req.closeBody() |
| return nil, errors.New("http: nil Request.URL") |
| } |
| if req.Header == nil { |
| req.closeBody() |
| return nil, errors.New("http: nil Request.Header") |
| } |
| scheme := req.URL.Scheme |
| isHTTP := scheme == "http" || scheme == "https" |
| if isHTTP { |
| for k, vv := range req.Header { |
| if !httpguts.ValidHeaderFieldName(k) { |
| req.closeBody() |
| return nil, fmt.Errorf("net/http: invalid header field name %q", k) |
| } |
| for _, v := range vv { |
| if !httpguts.ValidHeaderFieldValue(v) { |
| req.closeBody() |
| return nil, fmt.Errorf("net/http: invalid header field value %q for key %v", v, k) |
| } |
| } |
| } |
| } |
| |
| origReq := req |
| cancelKey := cancelKey{origReq} |
| req = setupRewindBody(req) |
| |
| if altRT := t.alternateRoundTripper(req); altRT != nil { |
| if resp, err := altRT.RoundTrip(req); err != ErrSkipAltProtocol { |
| return resp, err |
| } |
| var err error |
| req, err = rewindBody(req) |
| if err != nil { |
| return nil, err |
| } |
| } |
| if !isHTTP { |
| req.closeBody() |
| return nil, badStringError("unsupported protocol scheme", scheme) |
| } |
| if req.Method != "" && !validMethod(req.Method) { |
| req.closeBody() |
| return nil, fmt.Errorf("net/http: invalid method %q", req.Method) |
| } |
| if req.URL.Host == "" { |
| req.closeBody() |
| return nil, errors.New("http: no Host in request URL") |
| } |
| |
| for { |
| select { |
| case <-ctx.Done(): |
| req.closeBody() |
| return nil, ctx.Err() |
| default: |
| } |
| |
| // treq gets modified by roundTrip, so we need to recreate for each retry. |
| treq := &transportRequest{Request: req, trace: trace, cancelKey: cancelKey} |
| cm, err := t.connectMethodForRequest(treq) |
| if err != nil { |
| req.closeBody() |
| return nil, err |
| } |
| |
| // Get the cached or newly-created connection to either the |
| // host (for http or https), the http proxy, or the http proxy |
| // pre-CONNECTed to https server. In any case, we'll be ready |
| // to send it requests. |
| pconn, err := t.getConn(treq, cm) |
| if err != nil { |
| t.setReqCanceler(cancelKey, nil) |
| req.closeBody() |
| return nil, err |
| } |
| |
| var resp *Response |
| if pconn.alt != nil { |
| // HTTP/2 path. |
| t.setReqCanceler(cancelKey, nil) // not cancelable with CancelRequest |
| resp, err = pconn.alt.RoundTrip(req) |
| } else { |
| resp, err = pconn.roundTrip(treq) |
| } |
| if err == nil { |
| resp.Request = origReq |
| return resp, nil |
| } |
| |
| // Failed. Clean up and determine whether to retry. |
| if http2isNoCachedConnError(err) { |
| if t.removeIdleConn(pconn) { |
| t.decConnsPerHost(pconn.cacheKey) |
| } |
| } else if !pconn.shouldRetryRequest(req, err) { |
| // Issue 16465: return underlying net.Conn.Read error from peek, |
| // as we've historically done. |
| if e, ok := err.(transportReadFromServerError); ok { |
| err = e.err |
| } |
| return nil, err |
| } |
| testHookRoundTripRetried() |
| |
| // Rewind the body if we're able to. |
| req, err = rewindBody(req) |
| if err != nil { |
| return nil, err |
| } |
| } |
| } |
| |
| var errCannotRewind = errors.New("net/http: cannot rewind body after connection loss") |
| |
| type readTrackingBody struct { |
| io.ReadCloser |
| didRead bool |
| didClose bool |
| } |
| |
| func (r *readTrackingBody) Read(data []byte) (int, error) { |
| r.didRead = true |
| return r.ReadCloser.Read(data) |
| } |
| |
| func (r *readTrackingBody) Close() error { |
| r.didClose = true |
| return r.ReadCloser.Close() |
| } |
| |
| // setupRewindBody returns a new request with a custom body wrapper |
| // that can report whether the body needs rewinding. |
| // This lets rewindBody avoid an error result when the request |
| // does not have GetBody but the body hasn't been read at all yet. |
| func setupRewindBody(req *Request) *Request { |
| if req.Body == nil || req.Body == NoBody { |
| return req |
| } |
| newReq := *req |
| newReq.Body = &readTrackingBody{ReadCloser: req.Body} |
| return &newReq |
| } |
| |
| // rewindBody returns a new request with the body rewound. |
| // It returns req unmodified if the body does not need rewinding. |
| // rewindBody takes care of closing req.Body when appropriate |
| // (in all cases except when rewindBody returns req unmodified). |
| func rewindBody(req *Request) (rewound *Request, err error) { |
| if req.Body == nil || req.Body == NoBody || (!req.Body.(*readTrackingBody).didRead && !req.Body.(*readTrackingBody).didClose) { |
| return req, nil // nothing to rewind |
| } |
| if !req.Body.(*readTrackingBody).didClose { |
| req.closeBody() |
| } |
| if req.GetBody == nil { |
| return nil, errCannotRewind |
| } |
| body, err := req.GetBody() |
| if err != nil { |
| return nil, err |
| } |
| newReq := *req |
| newReq.Body = &readTrackingBody{ReadCloser: body} |
| return &newReq, nil |
| } |
| |
| // shouldRetryRequest reports whether we should retry sending a failed |
| // HTTP request on a new connection. The non-nil input error is the |
| // error from roundTrip. |
| func (pc *persistConn) shouldRetryRequest(req *Request, err error) bool { |
| if http2isNoCachedConnError(err) { |
| // Issue 16582: if the user started a bunch of |
| // requests at once, they can all pick the same conn |
| // and violate the server's max concurrent streams. |
| // Instead, match the HTTP/1 behavior for now and dial |
| // again to get a new TCP connection, rather than failing |
| // this request. |
| return true |
| } |
| if err == errMissingHost { |
| // User error. |
| return false |
| } |
| if !pc.isReused() { |
| // This was a fresh connection. There's no reason the server |
| // should've hung up on us. |
| // |
| // Also, if we retried now, we could loop forever |
| // creating new connections and retrying if the server |
| // is just hanging up on us because it doesn't like |
| // our request (as opposed to sending an error). |
| return false |
| } |
| if _, ok := err.(nothingWrittenError); ok { |
| // We never wrote anything, so it's safe to retry, if there's no body or we |
| // can "rewind" the body with GetBody. |
| return req.outgoingLength() == 0 || req.GetBody != nil |
| } |
| if !req.isReplayable() { |
| // Don't retry non-idempotent requests. |
| return false |
| } |
| if _, ok := err.(transportReadFromServerError); ok { |
| // We got some non-EOF net.Conn.Read failure reading |
| // the 1st response byte from the server. |
| return true |
| } |
| if err == errServerClosedIdle { |
| // The server replied with io.EOF while we were trying to |
| // read the response. Probably an unfortunately keep-alive |
| // timeout, just as the client was writing a request. |
| return true |
| } |
| return false // conservatively |
| } |
| |
| // ErrSkipAltProtocol is a sentinel error value defined by Transport.RegisterProtocol. |
| var ErrSkipAltProtocol = errors.New("net/http: skip alternate protocol") |
| |
| // RegisterProtocol registers a new protocol with scheme. |
| // The Transport will pass requests using the given scheme to rt. |
| // It is rt's responsibility to simulate HTTP request semantics. |
| // |
| // RegisterProtocol can be used by other packages to provide |
| // implementations of protocol schemes like "ftp" or "file". |
| // |
| // If rt.RoundTrip returns ErrSkipAltProtocol, the Transport will |
| // handle the RoundTrip itself for that one request, as if the |
| // protocol were not registered. |
| func (t *Transport) RegisterProtocol(scheme string, rt RoundTripper) { |
| t.altMu.Lock() |
| defer t.altMu.Unlock() |
| oldMap, _ := t.altProto.Load().(map[string]RoundTripper) |
| if _, exists := oldMap[scheme]; exists { |
| panic("protocol " + scheme + " already registered") |
| } |
| newMap := make(map[string]RoundTripper) |
| for k, v := range oldMap { |
| newMap[k] = v |
| } |
| newMap[scheme] = rt |
| t.altProto.Store(newMap) |
| } |
| |
| // CloseIdleConnections closes any connections which were previously |
| // connected from previous requests but are now sitting idle in |
| // a "keep-alive" state. It does not interrupt any connections currently |
| // in use. |
| func (t *Transport) CloseIdleConnections() { |
| t.nextProtoOnce.Do(t.onceSetNextProtoDefaults) |
| t.idleMu.Lock() |
| m := t.idleConn |
| t.idleConn = nil |
| t.closeIdle = true // close newly idle connections |
| t.idleLRU = connLRU{} |
| t.idleMu.Unlock() |
| for _, conns := range m { |
| for _, pconn := range conns { |
| pconn.close(errCloseIdleConns) |
| } |
| } |
| if t2 := t.h2transport; t2 != nil { |
| t2.CloseIdleConnections() |
| } |
| } |
| |
| // CancelRequest cancels an in-flight request by closing its connection. |
| // CancelRequest should only be called after RoundTrip has returned. |
| // |
| // Deprecated: Use Request.WithContext to create a request with a |
| // cancelable context instead. CancelRequest cannot cancel HTTP/2 |
| // requests. |
| func (t *Transport) CancelRequest(req *Request) { |
| t.cancelRequest(cancelKey{req}, errRequestCanceled) |
| } |
| |
| // Cancel an in-flight request, recording the error value. |
| // Returns whether the request was canceled. |
| func (t *Transport) cancelRequest(key cancelKey, err error) bool { |
| t.reqMu.Lock() |
| cancel := t.reqCanceler[key] |
| delete(t.reqCanceler, key) |
| t.reqMu.Unlock() |
| if cancel != nil { |
| cancel(err) |
| } |
| |
| return cancel != nil |
| } |
| |
| // |
| // Private implementation past this point. |
| // |
| |
| var ( |
| // proxyConfigOnce guards proxyConfig |
| envProxyOnce sync.Once |
| envProxyFuncValue func(*url.URL) (*url.URL, error) |
| ) |
| |
| // defaultProxyConfig returns a ProxyConfig value looked up |
| // from the environment. This mitigates expensive lookups |
| // on some platforms (e.g. Windows). |
| func envProxyFunc() func(*url.URL) (*url.URL, error) { |
| envProxyOnce.Do(func() { |
| envProxyFuncValue = httpproxy.FromEnvironment().ProxyFunc() |
| }) |
| return envProxyFuncValue |
| } |
| |
| // resetProxyConfig is used by tests. |
| func resetProxyConfig() { |
| envProxyOnce = sync.Once{} |
| envProxyFuncValue = nil |
| } |
| |
| func (t *Transport) connectMethodForRequest(treq *transportRequest) (cm connectMethod, err error) { |
| cm.targetScheme = treq.URL.Scheme |
| cm.targetAddr = canonicalAddr(treq.URL) |
| if t.Proxy != nil { |
| cm.proxyURL, err = t.Proxy(treq.Request) |
| } |
| cm.onlyH1 = treq.requiresHTTP1() |
| return cm, err |
| } |
| |
| // proxyAuth returns the Proxy-Authorization header to set |
| // on requests, if applicable. |
| func (cm *connectMethod) proxyAuth() string { |
| if cm.proxyURL == nil { |
| return "" |
| } |
| if u := cm.proxyURL.User; u != nil { |
| username := u.Username() |
| password, _ := u.Password() |
| return "Basic " + basicAuth(username, password) |
| } |
| return "" |
| } |
| |
| // error values for debugging and testing, not seen by users. |
| var ( |
| errKeepAlivesDisabled = errors.New("http: putIdleConn: keep alives disabled") |
| errConnBroken = errors.New("http: putIdleConn: connection is in bad state") |
| errCloseIdle = errors.New("http: putIdleConn: CloseIdleConnections was called") |
| errTooManyIdle = errors.New("http: putIdleConn: too many idle connections") |
| errTooManyIdleHost = errors.New("http: putIdleConn: too many idle connections for host") |
| errCloseIdleConns = errors.New("http: CloseIdleConnections called") |
| errReadLoopExiting = errors.New("http: persistConn.readLoop exiting") |
| errIdleConnTimeout = errors.New("http: idle connection timeout") |
| |
| // errServerClosedIdle is not seen by users for idempotent requests, but may be |
| // seen by a user if the server shuts down an idle connection and sends its FIN |
| // in flight with already-written POST body bytes from the client. |
| // See https://github.com/golang/go/issues/19943#issuecomment-355607646 |
| errServerClosedIdle = errors.New("http: server closed idle connection") |
| ) |
| |
| // transportReadFromServerError is used by Transport.readLoop when the |
| // 1 byte peek read fails and we're actually anticipating a response. |
| // Usually this is just due to the inherent keep-alive shut down race, |
| // where the server closed the connection at the same time the client |
| // wrote. The underlying err field is usually io.EOF or some |
| // ECONNRESET sort of thing which varies by platform. But it might be |
| // the user's custom net.Conn.Read error too, so we carry it along for |
| // them to return from Transport.RoundTrip. |
| type transportReadFromServerError struct { |
| err error |
| } |
| |
| func (e transportReadFromServerError) Unwrap() error { return e.err } |
| |
| func (e transportReadFromServerError) Error() string { |
| return fmt.Sprintf("net/http: Transport failed to read from server: %v", e.err) |
| } |
| |
| func (t *Transport) putOrCloseIdleConn(pconn *persistConn) { |
| if err := t.tryPutIdleConn(pconn); err != nil { |
| pconn.close(err) |
| } |
| } |
| |
| func (t *Transport) maxIdleConnsPerHost() int { |
| if v := t.MaxIdleConnsPerHost; v != 0 { |
| return v |
| } |
| return DefaultMaxIdleConnsPerHost |
| } |
| |
| // tryPutIdleConn adds pconn to the list of idle persistent connections awaiting |
| // a new request. |
| // If pconn is no longer needed or not in a good state, tryPutIdleConn returns |
| // an error explaining why it wasn't registered. |
| // tryPutIdleConn does not close pconn. Use putOrCloseIdleConn instead for that. |
| func (t *Transport) tryPutIdleConn(pconn *persistConn) error { |
| if t.DisableKeepAlives || t.MaxIdleConnsPerHost < 0 { |
| return errKeepAlivesDisabled |
| } |
| if pconn.isBroken() { |
| return errConnBroken |
| } |
| pconn.markReused() |
| |
| t.idleMu.Lock() |
| defer t.idleMu.Unlock() |
| |
| // HTTP/2 (pconn.alt != nil) connections do not come out of the idle list, |
| // because multiple goroutines can use them simultaneously. |
| // If this is an HTTP/2 connection being “returned,” we're done. |
| if pconn.alt != nil && t.idleLRU.m[pconn] != nil { |
| return nil |
| } |
| |
| // Deliver pconn to goroutine waiting for idle connection, if any. |
| // (They may be actively dialing, but this conn is ready first. |
| // Chrome calls this socket late binding. |
| // See https://www.chromium.org/developers/design-documents/network-stack#TOC-Connection-Management.) |
| key := pconn.cacheKey |
| if q, ok := t.idleConnWait[key]; ok { |
| done := false |
| if pconn.alt == nil { |
| // HTTP/1. |
| // Loop over the waiting list until we find a w that isn't done already, and hand it pconn. |
| for q.len() > 0 { |
| w := q.popFront() |
| if w.tryDeliver(pconn, nil) { |
| done = true |
| break |
| } |
| } |
| } else { |
| // HTTP/2. |
| // Can hand the same pconn to everyone in the waiting list, |
| // and we still won't be done: we want to put it in the idle |
| // list unconditionally, for any future clients too. |
| for q.len() > 0 { |
| w := q.popFront() |
| w.tryDeliver(pconn, nil) |
| } |
| } |
| if q.len() == 0 { |
| delete(t.idleConnWait, key) |
| } else { |
| t.idleConnWait[key] = q |
| } |
| if done { |
| return nil |
| } |
| } |
| |
| if t.closeIdle { |
| return errCloseIdle |
| } |
| if t.idleConn == nil { |
| t.idleConn = make(map[connectMethodKey][]*persistConn) |
| } |
| idles := t.idleConn[key] |
| if len(idles) >= t.maxIdleConnsPerHost() { |
| return errTooManyIdleHost |
| } |
| for _, exist := range idles { |
| if exist == pconn { |
| log.Fatalf("dup idle pconn %p in freelist", pconn) |
| } |
| } |
| t.idleConn[key] = append(idles, pconn) |
| t.idleLRU.add(pconn) |
| if t.MaxIdleConns != 0 && t.idleLRU.len() > t.MaxIdleConns { |
| oldest := t.idleLRU.removeOldest() |
| oldest.close(errTooManyIdle) |
| t.removeIdleConnLocked(oldest) |
| } |
| |
| // Set idle timer, but only for HTTP/1 (pconn.alt == nil). |
| // The HTTP/2 implementation manages the idle timer itself |
| // (see idleConnTimeout in h2_bundle.go). |
| if t.IdleConnTimeout > 0 && pconn.alt == nil { |
| if pconn.idleTimer != nil { |
| pconn.idleTimer.Reset(t.IdleConnTimeout) |
| } else { |
| pconn.idleTimer = time.AfterFunc(t.IdleConnTimeout, pconn.closeConnIfStillIdle) |
| } |
| } |
| pconn.idleAt = time.Now() |
| return nil |
| } |
| |
| // queueForIdleConn queues w to receive the next idle connection for w.cm. |
| // As an optimization hint to the caller, queueForIdleConn reports whether |
| // it successfully delivered an already-idle connection. |
| func (t *Transport) queueForIdleConn(w *wantConn) (delivered bool) { |
| if t.DisableKeepAlives { |
| return false |
| } |
| |
| t.idleMu.Lock() |
| defer t.idleMu.Unlock() |
| |
| // Stop closing connections that become idle - we might want one. |
| // (That is, undo the effect of t.CloseIdleConnections.) |
| t.closeIdle = false |
| |
| if w == nil { |
| // Happens in test hook. |
| return false |
| } |
| |
| // If IdleConnTimeout is set, calculate the oldest |
| // persistConn.idleAt time we're willing to use a cached idle |
| // conn. |
| var oldTime time.Time |
| if t.IdleConnTimeout > 0 { |
| oldTime = time.Now().Add(-t.IdleConnTimeout) |
| } |
| |
| // Look for most recently-used idle connection. |
| if list, ok := t.idleConn[w.key]; ok { |
| stop := false |
| delivered := false |
| for len(list) > 0 && !stop { |
| pconn := list[len(list)-1] |
| |
| // See whether this connection has been idle too long, considering |
| // only the wall time (the Round(0)), in case this is a laptop or VM |
| // coming out of suspend with previously cached idle connections. |
| tooOld := !oldTime.IsZero() && pconn.idleAt.Round(0).Before(oldTime) |
| if tooOld { |
| // Async cleanup. Launch in its own goroutine (as if a |
| // time.AfterFunc called it); it acquires idleMu, which we're |
| // holding, and does a synchronous net.Conn.Close. |
| go pconn.closeConnIfStillIdle() |
| } |
| if pconn.isBroken() || tooOld { |
| // If either persistConn.readLoop has marked the connection |
| // broken, but Transport.removeIdleConn has not yet removed it |
| // from the idle list, or if this persistConn is too old (it was |
| // idle too long), then ignore it and look for another. In both |
| // cases it's already in the process of being closed. |
| list = list[:len(list)-1] |
| continue |
| } |
| delivered = w.tryDeliver(pconn, nil) |
| if delivered { |
| if pconn.alt != nil { |
| // HTTP/2: multiple clients can share pconn. |
| // Leave it in the list. |
| } else { |
| // HTTP/1: only one client can use pconn. |
| // Remove it from the list. |
| t.idleLRU.remove(pconn) |
| list = list[:len(list)-1] |
| } |
| } |
| stop = true |
| } |
| if len(list) > 0 { |
| t.idleConn[w.key] = list |
| } else { |
| delete(t.idleConn, w.key) |
| } |
| if stop { |
| return delivered |
| } |
| } |
| |
| // Register to receive next connection that becomes idle. |
| if t.idleConnWait == nil { |
| t.idleConnWait = make(map[connectMethodKey]wantConnQueue) |
| } |
| q := t.idleConnWait[w.key] |
| q.cleanFront() |
| q.pushBack(w) |
| t.idleConnWait[w.key] = q |
| return false |
| } |
| |
| // removeIdleConn marks pconn as dead. |
| func (t *Transport) removeIdleConn(pconn *persistConn) bool { |
| t.idleMu.Lock() |
| defer t.idleMu.Unlock() |
| return t.removeIdleConnLocked(pconn) |
| } |
| |
| // t.idleMu must be held. |
| func (t *Transport) removeIdleConnLocked(pconn *persistConn) bool { |
| if pconn.idleTimer != nil { |
| pconn.idleTimer.Stop() |
| } |
| t.idleLRU.remove(pconn) |
| key := pconn.cacheKey |
| pconns := t.idleConn[key] |
| var removed bool |
| switch len(pconns) { |
| case 0: |
| // Nothing |
| case 1: |
| if pconns[0] == pconn { |
| delete(t.idleConn, key) |
| removed = true |
| } |
| default: |
| for i, v := range pconns { |
| if v != pconn { |
| continue |
| } |
| // Slide down, keeping most recently-used |
| // conns at the end. |
| copy(pconns[i:], pconns[i+1:]) |
| t.idleConn[key] = pconns[:len(pconns)-1] |
| removed = true |
| break |
| } |
| } |
| return removed |
| } |
| |
| func (t *Transport) setReqCanceler(key cancelKey, fn func(error)) { |
| t.reqMu.Lock() |
| defer t.reqMu.Unlock() |
| if t.reqCanceler == nil { |
| t.reqCanceler = make(map[cancelKey]func(error)) |
| } |
| if fn != nil { |
| t.reqCanceler[key] = fn |
| } else { |
| delete(t.reqCanceler, key) |
| } |
| } |
| |
| // replaceReqCanceler replaces an existing cancel function. If there is no cancel function |
| // for the request, we don't set the function and return false. |
| // Since CancelRequest will clear the canceler, we can use the return value to detect if |
| // the request was canceled since the last setReqCancel call. |
| func (t *Transport) replaceReqCanceler(key cancelKey, fn func(error)) bool { |
| t.reqMu.Lock() |
| defer t.reqMu.Unlock() |
| _, ok := t.reqCanceler[key] |
| if !ok { |
| return false |
| } |
| if fn != nil { |
| t.reqCanceler[key] = fn |
| } else { |
| delete(t.reqCanceler, key) |
| } |
| return true |
| } |
| |
| var zeroDialer net.Dialer |
| |
| func (t *Transport) dial(ctx context.Context, network, addr string) (net.Conn, error) { |
| if t.DialContext != nil { |
| return t.DialContext(ctx, network, addr) |
| } |
| if t.Dial != nil { |
| c, err := t.Dial(network, addr) |
| if c == nil && err == nil { |
| err = errors.New("net/http: Transport.Dial hook returned (nil, nil)") |
| } |
| return c, err |
| } |
| return zeroDialer.DialContext(ctx, network, addr) |
| } |
| |
| // A wantConn records state about a wanted connection |
| // (that is, an active call to getConn). |
| // The conn may be gotten by dialing or by finding an idle connection, |
| // or a cancellation may make the conn no longer wanted. |
| // These three options are racing against each other and use |
| // wantConn to coordinate and agree about the winning outcome. |
| type wantConn struct { |
| cm connectMethod |
| key connectMethodKey // cm.key() |
| ctx context.Context // context for dial |
| ready chan struct{} // closed when pc, err pair is delivered |
| |
| // hooks for testing to know when dials are done |
| // beforeDial is called in the getConn goroutine when the dial is queued. |
| // afterDial is called when the dial is completed or cancelled. |
| beforeDial func() |
| afterDial func() |
| |
| mu sync.Mutex // protects pc, err, close(ready) |
| pc *persistConn |
| err error |
| } |
| |
| // waiting reports whether w is still waiting for an answer (connection or error). |
| func (w *wantConn) waiting() bool { |
| select { |
| case <-w.ready: |
| return false |
| default: |
| return true |
| } |
| } |
| |
| // tryDeliver attempts to deliver pc, err to w and reports whether it succeeded. |
| func (w *wantConn) tryDeliver(pc *persistConn, err error) bool { |
| w.mu.Lock() |
| defer w.mu.Unlock() |
| |
| if w.pc != nil || w.err != nil { |
| return false |
| } |
| |
| w.pc = pc |
| w.err = err |
| if w.pc == nil && w.err == nil { |
| panic("net/http: internal error: misuse of tryDeliver") |
| } |
| close(w.ready) |
| return true |
| } |
| |
| // cancel marks w as no longer wanting a result (for example, due to cancellation). |
| // If a connection has been delivered already, cancel returns it with t.putOrCloseIdleConn. |
| func (w *wantConn) cancel(t *Transport, err error) { |
| w.mu.Lock() |
| if w.pc == nil && w.err == nil { |
| close(w.ready) // catch misbehavior in future delivery |
| } |
| pc := w.pc |
| w.pc = nil |
| w.err = err |
| w.mu.Unlock() |
| |
| if pc != nil { |
| t.putOrCloseIdleConn(pc) |
| } |
| } |
| |
| // A wantConnQueue is a queue of wantConns. |
| type wantConnQueue struct { |
| // This is a queue, not a deque. |
| // It is split into two stages - head[headPos:] and tail. |
| // popFront is trivial (headPos++) on the first stage, and |
| // pushBack is trivial (append) on the second stage. |
| // If the first stage is empty, popFront can swap the |
| // first and second stages to remedy the situation. |
| // |
| // This two-stage split is analogous to the use of two lists |
| // in Okasaki's purely functional queue but without the |
| // overhead of reversing the list when swapping stages. |
| head []*wantConn |
| headPos int |
| tail []*wantConn |
| } |
| |
| // len returns the number of items in the queue. |
| func (q *wantConnQueue) len() int { |
| return len(q.head) - q.headPos + len(q.tail) |
| } |
| |
| // pushBack adds w to the back of the queue. |
| func (q *wantConnQueue) pushBack(w *wantConn) { |
| q.tail = append(q.tail, w) |
| } |
| |
| // popFront removes and returns the wantConn at the front of the queue. |
| func (q *wantConnQueue) popFront() *wantConn { |
| if q.headPos >= len(q.head) { |
| if len(q.tail) == 0 { |
| return nil |
| } |
| // Pick up tail as new head, clear tail. |
| q.head, q.headPos, q.tail = q.tail, 0, q.head[:0] |
| } |
| w := q.head[q.headPos] |
| q.head[q.headPos] = nil |
| q.headPos++ |
| return w |
| } |
| |
| // peekFront returns the wantConn at the front of the queue without removing it. |
| func (q *wantConnQueue) peekFront() *wantConn { |
| if q.headPos < len(q.head) { |
| return q.head[q.headPos] |
| } |
| if len(q.tail) > 0 { |
| return q.tail[0] |
| } |
| return nil |
| } |
| |
| // cleanFront pops any wantConns that are no longer waiting from the head of the |
| // queue, reporting whether any were popped. |
| func (q *wantConnQueue) cleanFront() (cleaned bool) { |
| for { |
| w := q.peekFront() |
| if w == nil || w.waiting() { |
| return cleaned |
| } |
| q.popFront() |
| cleaned = true |
| } |
| } |
| |
| func (t *Transport) customDialTLS(ctx context.Context, network, addr string) (conn net.Conn, err error) { |
| if t.DialTLSContext != nil { |
| conn, err = t.DialTLSContext(ctx, network, addr) |
| } else { |
| conn, err = t.DialTLS(network, addr) |
| } |
| if conn == nil && err == nil { |
| err = errors.New("net/http: Transport.DialTLS or DialTLSContext returned (nil, nil)") |
| } |
| return |
| } |
| |
| // getConn dials and creates a new persistConn to the target as |
| // specified in the connectMethod. This includes doing a proxy CONNECT |
| // and/or setting up TLS. If this doesn't return an error, the persistConn |
| // is ready to write requests to. |
| func (t *Transport) getConn(treq *transportRequest, cm connectMethod) (pc *persistConn, err error) { |
| req := treq.Request |
| trace := treq.trace |
| ctx := req.Context() |
| if trace != nil && trace.GetConn != nil { |
| trace.GetConn(cm.addr()) |
| } |
| |
| w := &wantConn{ |
| cm: cm, |
| key: cm.key(), |
| ctx: ctx, |
| ready: make(chan struct{}, 1), |
| beforeDial: testHookPrePendingDial, |
| afterDial: testHookPostPendingDial, |
| } |
| defer func() { |
| if err != nil { |
| w.cancel(t, err) |
| } |
| }() |
| |
| // Queue for idle connection. |
| if delivered := t.queueForIdleConn(w); delivered { |
| pc := w.pc |
| // Trace only for HTTP/1. |
| // HTTP/2 calls trace.GotConn itself. |
| if pc.alt == nil && trace != nil && trace.GotConn != nil { |
| trace.GotConn(pc.gotIdleConnTrace(pc.idleAt)) |
| } |
| // set request canceler to some non-nil function so we |
| // can detect whether it was cleared between now and when |
| // we enter roundTrip |
| t.setReqCanceler(treq.cancelKey, func(error) {}) |
| return pc, nil |
| } |
| |
| cancelc := make(chan error, 1) |
| t.setReqCanceler(treq.cancelKey, func(err error) { cancelc <- err }) |
| |
| // Queue for permission to dial. |
| t.queueForDial(w) |
| |
| // Wait for completion or cancellation. |
| select { |
| case <-w.ready: |
| // Trace success but only for HTTP/1. |
| // HTTP/2 calls trace.GotConn itself. |
| if w.pc != nil && w.pc.alt == nil && trace != nil && trace.GotConn != nil { |
| trace.GotConn(httptrace.GotConnInfo{Conn: w.pc.conn, Reused: w.pc.isReused()}) |
| } |
| if w.err != nil { |
| // If the request has been cancelled, that's probably |
| // what caused w.err; if so, prefer to return the |
| // cancellation error (see golang.org/issue/16049). |
| select { |
| case <-req.Cancel: |
| return nil, errRequestCanceledConn |
| case <-req.Context().Done(): |
| return nil, req.Context().Err() |
| case err := <-cancelc: |
| if err == errRequestCanceled { |
| err = errRequestCanceledConn |
| } |
| return nil, err |
| default: |
| // return below |
| } |
| } |
| return w.pc, w.err |
| case <-req.Cancel: |
| return nil, errRequestCanceledConn |
| case <-req.Context().Done(): |
| return nil, req.Context().Err() |
| case err := <-cancelc: |
| if err == errRequestCanceled { |
| err = errRequestCanceledConn |
| } |
| return nil, err |
| } |
| } |
| |
| // queueForDial queues w to wait for permission to begin dialing. |
| // Once w receives permission to dial, it will do so in a separate goroutine. |
| func (t *Transport) queueForDial(w *wantConn) { |
| w.beforeDial() |
| if t.MaxConnsPerHost <= 0 { |
| go t.dialConnFor(w) |
| return |
| } |
| |
| t.connsPerHostMu.Lock() |
| defer t.connsPerHostMu.Unlock() |
| |
| if n := t.connsPerHost[w.key]; n < t.MaxConnsPerHost { |
| if t.connsPerHost == nil { |
| t.connsPerHost = make(map[connectMethodKey]int) |
| } |
| t.connsPerHost[w.key] = n + 1 |
| go t.dialConnFor(w) |
| return |
| } |
| |
| if t.connsPerHostWait == nil { |
| t.connsPerHostWait = make(map[connectMethodKey]wantConnQueue) |
| } |
| q := t.connsPerHostWait[w.key] |
| q.cleanFront() |
| q.pushBack(w) |
| t.connsPerHostWait[w.key] = q |
| } |
| |
| // dialConnFor dials on behalf of w and delivers the result to w. |
| // dialConnFor has received permission to dial w.cm and is counted in t.connCount[w.cm.key()]. |
| // If the dial is cancelled or unsuccessful, dialConnFor decrements t.connCount[w.cm.key()]. |
| func (t *Transport) dialConnFor(w *wantConn) { |
| defer w.afterDial() |
| |
| pc, err := t.dialConn(w.ctx, w.cm) |
| delivered := w.tryDeliver(pc, err) |
| if err == nil && (!delivered || pc.alt != nil) { |
| // pconn was not passed to w, |
| // or it is HTTP/2 and can be shared. |
| // Add to the idle connection pool. |
| t.putOrCloseIdleConn(pc) |
| } |
| if err != nil { |
| t.decConnsPerHost(w.key) |
| } |
| } |
| |
| // decConnsPerHost decrements the per-host connection count for key, |
| // which may in turn give a different waiting goroutine permission to dial. |
| func (t *Transport) decConnsPerHost(key connectMethodKey) { |
| if t.MaxConnsPerHost <= 0 { |
| return |
| } |
| |
| t.connsPerHostMu.Lock() |
| defer t.connsPerHostMu.Unlock() |
| n := t.connsPerHost[key] |
| if n == 0 { |
| // Shouldn't happen, but if it does, the counting is buggy and could |
| // easily lead to a silent deadlock, so report the problem loudly. |
| panic("net/http: internal error: connCount underflow") |
| } |
| |
| // Can we hand this count to a goroutine still waiting to dial? |
| // (Some goroutines on the wait list may have timed out or |
| // gotten a connection another way. If they're all gone, |
| // we don't want to kick off any spurious dial operations.) |
| if q := t.connsPerHostWait[key]; q.len() > 0 { |
| done := false |
| for q.len() > 0 { |
| w := q.popFront() |
| if w.waiting() { |
| go t.dialConnFor(w) |
| done = true |
| break |
| } |
| } |
| if q.len() == 0 { |
| delete(t.connsPerHostWait, key) |
| } else { |
| // q is a value (like a slice), so we have to store |
| // the updated q back into the map. |
| t.connsPerHostWait[key] = q |
| } |
| if done { |
| return |
| } |
| } |
| |
| // Otherwise, decrement the recorded count. |
| if n--; n == 0 { |
| delete(t.connsPerHost, key) |
| } else { |
| t.connsPerHost[key] = n |
| } |
| } |
| |
| // Add TLS to a persistent connection, i.e. negotiate a TLS session. If pconn is already a TLS |
| // tunnel, this function establishes a nested TLS session inside the encrypted channel. |
| // The remote endpoint's name may be overridden by TLSClientConfig.ServerName. |
| func (pconn *persistConn) addTLS(name string, trace *httptrace.ClientTrace) error { |
| // Initiate TLS and check remote host name against certificate. |
| cfg := cloneTLSConfig(pconn.t.TLSClientConfig) |
| if cfg.ServerName == "" { |
| cfg.ServerName = name |
| } |
| if pconn.cacheKey.onlyH1 { |
| cfg.NextProtos = nil |
| } |
| plainConn := pconn.conn |
| tlsConn := tls.Client(plainConn, cfg) |
| errc := make(chan error, 2) |
| var timer *time.Timer // for canceling TLS handshake |
| if d := pconn.t.TLSHandshakeTimeout; d != 0 { |
| timer = time.AfterFunc(d, func() { |
| errc <- tlsHandshakeTimeoutError{} |
| }) |
| } |
| go func() { |
| if trace != nil && trace.TLSHandshakeStart != nil { |
| trace.TLSHandshakeStart() |
| } |
| err := tlsConn.Handshake() |
| if timer != nil { |
| timer.Stop() |
| } |
| errc <- err |
| }() |
| if err := <-errc; err != nil { |
| plainConn.Close() |
| if trace != nil && trace.TLSHandshakeDone != nil { |
| trace.TLSHandshakeDone(tls.ConnectionState{}, err) |
| } |
| return err |
| } |
| cs := tlsConn.ConnectionState() |
| if trace != nil && trace.TLSHandshakeDone != nil { |
| trace.TLSHandshakeDone(cs, nil) |
| } |
| pconn.tlsState = &cs |
| pconn.conn = tlsConn |
| return nil |
| } |
| |
| type erringRoundTripper interface { |
| RoundTripErr() error |
| } |
| |
| func (t *Transport) dialConn(ctx context.Context, cm connectMethod) (pconn *persistConn, err error) { |
| pconn = &persistConn{ |
| t: t, |
| cacheKey: cm.key(), |
| reqch: make(chan requestAndChan, 1), |
| writech: make(chan writeRequest, 1), |
| closech: make(chan struct{}), |
| writeErrCh: make(chan error, 1), |
| writeLoopDone: make(chan struct{}), |
| } |
| trace := httptrace.ContextClientTrace(ctx) |
| wrapErr := func(err error) error { |
| if cm.proxyURL != nil { |
| // Return a typed error, per Issue 16997 |
| return &net.OpError{Op: "proxyconnect", Net: "tcp", Err: err} |
| } |
| return err |
| } |
| if cm.scheme() == "https" && t.hasCustomTLSDialer() { |
| var err error |
| pconn.conn, err = t.customDialTLS(ctx, "tcp", cm.addr()) |
| if err != nil { |
| return nil, wrapErr(err) |
| } |
| if tc, ok := pconn.conn.(*tls.Conn); ok { |
| // Handshake here, in case DialTLS didn't. TLSNextProto below |
| // depends on it for knowing the connection state. |
| if trace != nil && trace.TLSHandshakeStart != nil { |
| trace.TLSHandshakeStart() |
| } |
| if err := tc.Handshake(); err != nil { |
| go pconn.conn.Close() |
| if trace != nil && trace.TLSHandshakeDone != nil { |
| trace.TLSHandshakeDone(tls.ConnectionState{}, err) |
| } |
| return nil, err |
| } |
| cs := tc.ConnectionState() |
| if trace != nil && trace.TLSHandshakeDone != nil { |
| trace.TLSHandshakeDone(cs, nil) |
| } |
| pconn.tlsState = &cs |
| } |
| } else { |
| conn, err := t.dial(ctx, "tcp", cm.addr()) |
| if err != nil { |
| return nil, wrapErr(err) |
| } |
| pconn.conn = conn |
| if cm.scheme() == "https" { |
| var firstTLSHost string |
| if firstTLSHost, _, err = net.SplitHostPort(cm.addr()); err != nil { |
| return nil, wrapErr(err) |
| } |
| if err = pconn.addTLS(firstTLSHost, trace); err != nil { |
| return nil, wrapErr(err) |
| } |
| } |
| } |
| |
| // Proxy setup. |
| switch { |
| case cm.proxyURL == nil: |
| // Do nothing. Not using a proxy. |
| case cm.proxyURL.Scheme == "socks5": |
| conn := pconn.conn |
| d := socksNewDialer("tcp", conn.RemoteAddr().String()) |
| if u := cm.proxyURL.User; u != nil { |
| auth := &socksUsernamePassword{ |
| Username: u.Username(), |
| } |
| auth.Password, _ = u.Password() |
| d.AuthMethods = []socksAuthMethod{ |
| socksAuthMethodNotRequired, |
| socksAuthMethodUsernamePassword, |
| } |
| d.Authenticate = auth.Authenticate |
| } |
| if _, err := d.DialWithConn(ctx, conn, "tcp", cm.targetAddr); err != nil { |
| conn.Close() |
| return nil, err |
| } |
| case cm.targetScheme == "http": |
| pconn.isProxy = true |
| if pa := cm.proxyAuth(); pa != "" { |
| pconn.mutateHeaderFunc = func(h Header) { |
| h.Set("Proxy-Authorization", pa) |
| } |
| } |
| case cm.targetScheme == "https": |
| conn := pconn.conn |
| var hdr Header |
| if t.GetProxyConnectHeader != nil { |
| var err error |
| hdr, err = t.GetProxyConnectHeader(ctx, cm.proxyURL, cm.targetAddr) |
| if err != nil { |
| conn.Close() |
| return nil, err |
| } |
| } else { |
| hdr = t.ProxyConnectHeader |
| } |
| if hdr == nil { |
| hdr = make(Header) |
| } |
| if pa := cm.proxyAuth(); pa != "" { |
| hdr = hdr.Clone() |
| hdr.Set("Proxy-Authorization", pa) |
| } |
| connectReq := &Request{ |
| Method: "CONNECT", |
| URL: &url.URL{Opaque: cm.targetAddr}, |
| Host: cm.targetAddr, |
| Header: hdr, |
| } |
| |
| // If there's no done channel (no deadline or cancellation |
| // from the caller possible), at least set some (long) |
| // timeout here. This will make sure we don't block forever |
| // and leak a goroutine if the connection stops replying |
| // after the TCP connect. |
| connectCtx := ctx |
| if ctx.Done() == nil { |
| newCtx, cancel := context.WithTimeout(ctx, 1*time.Minute) |
| defer cancel() |
| connectCtx = newCtx |
| } |
| |
| didReadResponse := make(chan struct{}) // closed after CONNECT write+read is done or fails |
| var ( |
| resp *Response |
| err error // write or read error |
| ) |
| // Write the CONNECT request & read the response. |
| go func() { |
| defer close(didReadResponse) |
| err = connectReq.Write(conn) |
| if err != nil { |
| return |
| } |
| // Okay to use and discard buffered reader here, because |
| // TLS server will not speak until spoken to. |
| br := bufio.NewReader(conn) |
| resp, err = ReadResponse(br, connectReq) |
| }() |
| select { |
| case <-connectCtx.Done(): |
| conn.Close() |
| <-didReadResponse |
| return nil, connectCtx.Err() |
| case <-didReadResponse: |
| // resp or err now set |
| } |
| if err != nil { |
| conn.Close() |
| return nil, err |
| } |
| if resp.StatusCode != 200 { |
| f := strings.SplitN(resp.Status, " ", 2) |
| conn.Close() |
| if len(f) < 2 { |
| return nil, errors.New("unknown status code") |
| } |
| return nil, errors.New(f[1]) |
| } |
| } |
| |
| if cm.proxyURL != nil && cm.targetScheme == "https" { |
| if err := pconn.addTLS(cm.tlsHost(), trace); err != nil { |
| return nil, err |
| } |
| } |
| |
| if s := pconn.tlsState; s != nil && s.NegotiatedProtocolIsMutual && s.NegotiatedProtocol != "" { |
| if next, ok := t.TLSNextProto[s.NegotiatedProtocol]; ok { |
| alt := next(cm.targetAddr, pconn.conn.(*tls.Conn)) |
| if e, ok := alt.(erringRoundTripper); ok { |
| // pconn.conn was closed by next (http2configureTransports.upgradeFn). |
| return nil, e.RoundTripErr() |
| } |
| return &persistConn{t: t, cacheKey: pconn.cacheKey, alt: alt}, nil |
| } |
| } |
| |
| pconn.br = bufio.NewReaderSize(pconn, t.readBufferSize()) |
| pconn.bw = bufio.NewWriterSize(persistConnWriter{pconn}, t.writeBufferSize()) |
| |
| go pconn.readLoop() |
| go pconn.writeLoop() |
| return pconn, nil |
| } |
| |
| // persistConnWriter is the io.Writer written to by pc.bw. |
| // It accumulates the number of bytes written to the underlying conn, |
| // so the retry logic can determine whether any bytes made it across |
| // the wire. |
| // This is exactly 1 pointer field wide so it can go into an interface |
| // without allocation. |
| type persistConnWriter struct { |
| pc *persistConn |
| } |
| |
| func (w persistConnWriter) Write(p []byte) (n int, err error) { |
| n, err = w.pc.conn.Write(p) |
| w.pc.nwrite += int64(n) |
| return |
| } |
| |
| // ReadFrom exposes persistConnWriter's underlying Conn to io.Copy and if |
| // the Conn implements io.ReaderFrom, it can take advantage of optimizations |
| // such as sendfile. |
| func (w persistConnWriter) ReadFrom(r io.Reader) (n int64, err error) { |
| n, err = io.Copy(w.pc.conn, r) |
| w.pc.nwrite += n |
| return |
| } |
| |
| var _ io.ReaderFrom = (*persistConnWriter)(nil) |
| |
| // connectMethod is the map key (in its String form) for keeping persistent |
| // TCP connections alive for subsequent HTTP requests. |
| // |
| // A connect method may be of the following types: |
| // |
| // connectMethod.key().String() Description |
| // ------------------------------ ------------------------- |
| // |http|foo.com http directly to server, no proxy |
| // |https|foo.com https directly to server, no proxy |
| // |https,h1|foo.com https directly to server w/o HTTP/2, no proxy |
| // http://proxy.com|https|foo.com http to proxy, then CONNECT to foo.com |
| // http://proxy.com|http http to proxy, http to anywhere after that |
| // socks5://proxy.com|http|foo.com socks5 to proxy, then http to foo.com |
| // socks5://proxy.com|https|foo.com socks5 to proxy, then https to foo.com |
| // https://proxy.com|https|foo.com https to proxy, then CONNECT to foo.com |
| // https://proxy.com|http https to proxy, http to anywhere after that |
| // |
| type connectMethod struct { |
| _ incomparable |
| proxyURL *url.URL // nil for no proxy, else full proxy URL |
| targetScheme string // "http" or "https" |
| // If proxyURL specifies an http or https proxy, and targetScheme is http (not https), |
| // then targetAddr is not included in the connect method key, because the socket can |
| // be reused for different targetAddr values. |
| targetAddr string |
| onlyH1 bool // whether to disable HTTP/2 and force HTTP/1 |
| } |
| |
| func (cm *connectMethod) key() connectMethodKey { |
| proxyStr := "" |
| targetAddr := cm.targetAddr |
| if cm.proxyURL != nil { |
| proxyStr = cm.proxyURL.String() |
| if (cm.proxyURL.Scheme == "http" || cm.proxyURL.Scheme == "https") && cm.targetScheme == "http" { |
| targetAddr = "" |
| } |
| } |
| return connectMethodKey{ |
| proxy: proxyStr, |
| scheme: cm.targetScheme, |
| addr: targetAddr, |
| onlyH1: cm.onlyH1, |
| } |
| } |
| |
| // scheme returns the first hop scheme: http, https, or socks5 |
| func (cm *connectMethod) scheme() string { |
| if cm.proxyURL != nil { |
| return cm.proxyURL.Scheme |
| } |
| return cm.targetScheme |
| } |
| |
| // addr returns the first hop "host:port" to which we need to TCP connect. |
| func (cm *connectMethod) addr() string { |
| if cm.proxyURL != nil { |
| return canonicalAddr(cm.proxyURL) |
| } |
| return cm.targetAddr |
| } |
| |
| // tlsHost returns the host name to match against the peer's |
| // TLS certificate. |
| func (cm *connectMethod) tlsHost() string { |
| h := cm.targetAddr |
| if hasPort(h) { |
| h = h[:strings.LastIndex(h, ":")] |
| } |
| return h |
| } |
| |
| // connectMethodKey is the map key version of connectMethod, with a |
| // stringified proxy URL (or the empty string) instead of a pointer to |
| // a URL. |
| type connectMethodKey struct { |
| proxy, scheme, addr string |
| onlyH1 bool |
| } |
| |
| func (k connectMethodKey) String() string { |
| // Only used by tests. |
| var h1 string |
| if k.onlyH1 { |
| h1 = ",h1" |
| } |
| return fmt.Sprintf("%s|%s%s|%s", k.proxy, k.scheme, h1, k.addr) |
| } |
| |
| // persistConn wraps a connection, usually a persistent one |
| // (but may be used for non-keep-alive requests as well) |
| type persistConn struct { |
| // alt optionally specifies the TLS NextProto RoundTripper. |
| // This is used for HTTP/2 today and future protocols later. |
| // If it's non-nil, the rest of the fields are unused. |
| alt RoundTripper |
| |
| t *Transport |
| cacheKey connectMethodKey |
| conn net.Conn |
| tlsState *tls.ConnectionState |
| br *bufio.Reader // from conn |
| bw *bufio.Writer // to conn |
| nwrite int64 // bytes written |
| reqch chan requestAndChan // written by roundTrip; read by readLoop |
| writech chan writeRequest // written by roundTrip; read by writeLoop |
| closech chan struct{} // closed when conn closed |
| isProxy bool |
| sawEOF bool // whether we've seen EOF from conn; owned by readLoop |
| readLimit int64 // bytes allowed to be read; owned by readLoop |
| // writeErrCh passes the request write error (usually nil) |
| // from the writeLoop goroutine to the readLoop which passes |
| // it off to the res.Body reader, which then uses it to decide |
| // whether or not a connection can be reused. Issue 7569. |
| writeErrCh chan error |
| |
| writeLoopDone chan struct{} // closed when write loop ends |
| |
| // Both guarded by Transport.idleMu: |
| idleAt time.Time // time it last become idle |
| idleTimer *time.Timer // holding an AfterFunc to close it |
| |
| mu sync.Mutex // guards following fields |
| numExpectedResponses int |
| closed error // set non-nil when conn is closed, before closech is closed |
| canceledErr error // set non-nil if conn is canceled |
| broken bool // an error has happened on this connection; marked broken so it's not reused. |
| reused bool // whether conn has had successful request/response and is being reused. |
| // mutateHeaderFunc is an optional func to modify extra |
| // headers on each outbound request before it's written. (the |
| // original Request given to RoundTrip is not modified) |
| mutateHeaderFunc func(Header) |
| } |
| |
| func (pc *persistConn) maxHeaderResponseSize() int64 { |
| if v := pc.t.MaxResponseHeaderBytes; v != 0 { |
| return v |
| } |
| return 10 << 20 // conservative default; same as http2 |
| } |
| |
| func (pc *persistConn) Read(p []byte) (n int, err error) { |
| if pc.readLimit <= 0 { |
| return 0, fmt.Errorf("read limit of %d bytes exhausted", pc.maxHeaderResponseSize()) |
| } |
| if int64(len(p)) > pc.readLimit { |
| p = p[:pc.readLimit] |
| } |
| n, err = pc.conn.Read(p) |
| if err == io.EOF { |
| pc.sawEOF = true |
| } |
| pc.readLimit -= int64(n) |
| return |
| } |
| |
| // isBroken reports whether this connection is in a known broken state. |
| func (pc *persistConn) isBroken() bool { |
| pc.mu.Lock() |
| b := pc.closed != nil |
| pc.mu.Unlock() |
| return b |
| } |
| |
| // canceled returns non-nil if the connection was closed due to |
| // CancelRequest or due to context cancellation. |
| func (pc *persistConn) canceled() error { |
| pc.mu.Lock() |
| defer pc.mu.Unlock() |
| return pc.canceledErr |
| } |
| |
| // isReused reports whether this connection has been used before. |
| func (pc *persistConn) isReused() bool { |
| pc.mu.Lock() |
| r := pc.reused |
| pc.mu.Unlock() |
| return r |
| } |
| |
| func (pc *persistConn) gotIdleConnTrace(idleAt time.Time) (t httptrace.GotConnInfo) { |
| pc.mu.Lock() |
| defer pc.mu.Unlock() |
| t.Reused = pc.reused |
| t.Conn = pc.conn |
| t.WasIdle = true |
| if !idleAt.IsZero() { |
| t.IdleTime = time.Since(idleAt) |
| } |
| return |
| } |
| |
| func (pc *persistConn) cancelRequest(err error) { |
| pc.mu.Lock() |
| defer pc.mu.Unlock() |
| pc.canceledErr = err |
| pc.closeLocked(errRequestCanceled) |
| } |
| |
| // closeConnIfStillIdle closes the connection if it's still sitting idle. |
| // This is what's called by the persistConn's idleTimer, and is run in its |
| // own goroutine. |
| func (pc *persistConn) closeConnIfStillIdle() { |
| t := pc.t |
| t.idleMu.Lock() |
| defer t.idleMu.Unlock() |
| if _, ok := t.idleLRU.m[pc]; !ok { |
| // Not idle. |
| return |
| } |
| t.removeIdleConnLocked(pc) |
| pc.close(errIdleConnTimeout) |
| } |
| |
| // mapRoundTripError returns the appropriate error value for |
| // persistConn.roundTrip. |
| // |
| // The provided err is the first error that (*persistConn).roundTrip |
| // happened to receive from its select statement. |
| // |
| // The startBytesWritten value should be the value of pc.nwrite before the roundTrip |
| // started writing the request. |
| func (pc *persistConn) mapRoundTripError(req *transportRequest, startBytesWritten int64, err error) error { |
| if err == nil { |
| return nil |
| } |
| |
| // Wait for the writeLoop goroutine to terminate to avoid data |
| // races on callers who mutate the request on failure. |
| // |
| // When resc in pc.roundTrip and hence rc.ch receives a responseAndError |
| // with a non-nil error it implies that the persistConn is either closed |
| // or closing. Waiting on pc.writeLoopDone is hence safe as all callers |
| // close closech which in turn ensures writeLoop returns. |
| <-pc.writeLoopDone |
| |
| // If the request was canceled, that's better than network |
| // failures that were likely the result of tearing down the |
| // connection. |
| if cerr := pc.canceled(); cerr != nil { |
| return cerr |
| } |
| |
| // See if an error was set explicitly. |
| req.mu.Lock() |
| reqErr := req.err |
| req.mu.Unlock() |
| if reqErr != nil { |
| return reqErr |
| } |
| |
| if err == errServerClosedIdle { |
| // Don't decorate |
| return err |
| } |
| |
| if _, ok := err.(transportReadFromServerError); ok { |
| // Don't decorate |
| return err |
| } |
| if pc.isBroken() { |
| if pc.nwrite == startBytesWritten { |
| return nothingWrittenError{err} |
| } |
| return fmt.Errorf("net/http: HTTP/1.x transport connection broken: %v", err) |
| } |
| return err |
| } |
| |
| // errCallerOwnsConn is an internal sentinel error used when we hand |
| // off a writable response.Body to the caller. We use this to prevent |
| // closing a net.Conn that is now owned by the caller. |
| var errCallerOwnsConn = errors.New("read loop ending; caller owns writable underlying conn") |
| |
| func (pc *persistConn) readLoop() { |
| closeErr := errReadLoopExiting // default value, if not changed below |
| defer func() { |
| pc.close(closeErr) |
| pc.t.removeIdleConn(pc) |
| }() |
| |
| tryPutIdleConn := func(trace *httptrace.ClientTrace) bool { |
| if err := pc.t.tryPutIdleConn(pc); err != nil { |
| closeErr = err |
| if trace != nil && trace.PutIdleConn != nil && err != errKeepAlivesDisabled { |
| trace.PutIdleConn(err) |
| } |
| return false |
| } |
| if trace != nil && trace.PutIdleConn != nil { |
| trace.PutIdleConn(nil) |
| } |
| return true |
| } |
| |
| // eofc is used to block caller goroutines reading from Response.Body |
| // at EOF until this goroutines has (potentially) added the connection |
| // back to the idle pool. |
| eofc := make(chan struct{}) |
| defer close(eofc) // unblock reader on errors |
| |
| // Read this once, before loop starts. (to avoid races in tests) |
| testHookMu.Lock() |
| testHookReadLoopBeforeNextRead := testHookReadLoopBeforeNextRead |
| testHookMu.Unlock() |
| |
| alive := true |
| for alive { |
| pc.readLimit = pc.maxHeaderResponseSize() |
| _, err := pc.br.Peek(1) |
| |
| pc.mu.Lock() |
| if pc.numExpectedResponses == 0 { |
| pc.readLoopPeekFailLocked(err) |
| pc.mu.Unlock() |
| return |
| } |
| pc.mu.Unlock() |
| |
| rc := <-pc.reqch |
| trace := httptrace.ContextClientTrace(rc.req.Context()) |
| |
| var resp *Response |
| if err == nil { |
| resp, err = pc.readResponse(rc, trace) |
| } else { |
| err = transportReadFromServerError{err} |
| closeErr = err |
| } |
| |
| if err != nil { |
| if pc.readLimit <= 0 { |
| err = fmt.Errorf("net/http: server response headers exceeded %d bytes; aborted", pc.maxHeaderResponseSize()) |
| } |
| |
| select { |
| case rc.ch <- responseAndError{err: err}: |
| case <-rc.callerGone: |
| return |
| } |
| return |
| } |
| pc.readLimit = maxInt64 // effectively no limit for response bodies |
| |
| pc.mu.Lock() |
| pc.numExpectedResponses-- |
| pc.mu.Unlock() |
| |
| bodyWritable := resp.bodyIsWritable() |
| hasBody := rc.req.Method != "HEAD" && resp.ContentLength != 0 |
| |
| if resp.Close || rc.req.Close || resp.StatusCode <= 199 || bodyWritable { |
| // Don't do keep-alive on error if either party requested a close |
| // or we get an unexpected informational (1xx) response. |
| // StatusCode 100 is already handled above. |
| alive = false |
| } |
| |
| if !hasBody || bodyWritable { |
| replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) |
| |
| // Put the idle conn back into the pool before we send the response |
| // so if they process it quickly and make another request, they'll |
| // get this same conn. But we use the unbuffered channel 'rc' |
| // to guarantee that persistConn.roundTrip got out of its select |
| // potentially waiting for this persistConn to close. |
| alive = alive && |
| !pc.sawEOF && |
| pc.wroteRequest() && |
| replaced && tryPutIdleConn(trace) |
| |
| if bodyWritable { |
| closeErr = errCallerOwnsConn |
| } |
| |
| select { |
| case rc.ch <- responseAndError{res: resp}: |
| case <-rc.callerGone: |
| return |
| } |
| |
| // Now that they've read from the unbuffered channel, they're safely |
| // out of the select that also waits on this goroutine to die, so |
| // we're allowed to exit now if needed (if alive is false) |
| testHookReadLoopBeforeNextRead() |
| continue |
| } |
| |
| waitForBodyRead := make(chan bool, 2) |
| body := &bodyEOFSignal{ |
| body: resp.Body, |
| earlyCloseFn: func() error { |
| waitForBodyRead <- false |
| <-eofc // will be closed by deferred call at the end of the function |
| return nil |
| |
| }, |
| fn: func(err error) error { |
| isEOF := err == io.EOF |
| waitForBodyRead <- isEOF |
| if isEOF { |
| <-eofc // see comment above eofc declaration |
| } else if err != nil { |
| if cerr := pc.canceled(); cerr != nil { |
| return cerr |
| } |
| } |
| return err |
| }, |
| } |
| |
| resp.Body = body |
| if rc.addedGzip && strings.EqualFold(resp.Header.Get("Content-Encoding"), "gzip") { |
| resp.Body = &gzipReader{body: body} |
| resp.Header.Del("Content-Encoding") |
| resp.Header.Del("Content-Length") |
| resp.ContentLength = -1 |
| resp.Uncompressed = true |
| } |
| |
| select { |
| case rc.ch <- responseAndError{res: resp}: |
| case <-rc.callerGone: |
| return |
| } |
| |
| // Before looping back to the top of this function and peeking on |
| // the bufio.Reader, wait for the caller goroutine to finish |
| // reading the response body. (or for cancellation or death) |
| select { |
| case bodyEOF := <-waitForBodyRead: |
| replaced := pc.t.replaceReqCanceler(rc.cancelKey, nil) // before pc might return to idle pool |
| alive = alive && |
| bodyEOF && |
| !pc.sawEOF && |
| pc.wroteRequest() && |
| replaced && tryPutIdleConn(trace) |
| if bodyEOF { |
| eofc <- struct{}{} |
| } |
| case <-rc.req.Cancel: |
| alive = false |
| pc.t.CancelRequest(rc.req) |
| case <-rc.req.Context().Done(): |
| alive = false |
| pc.t.cancelRequest(rc.cancelKey, rc.req.Context().Err()) |
| case <-pc.closech: |
| alive = false |
| } |
| |
| testHookReadLoopBeforeNextRead() |
| } |
| } |
| |
| func (pc *persistConn) readLoopPeekFailLocked(peekErr error) { |
| if pc.closed != nil { |
| return |
| } |
| if n := pc.br.Buffered(); n > 0 { |
| buf, _ := pc.br.Peek(n) |
| if is408Message(buf) { |
| pc.closeLocked(errServerClosedIdle) |
| return |
| } else { |
| log.Printf("Unsolicited response received on idle HTTP channel starting with %q; err=%v", buf, peekErr) |
| } |
| } |
| if peekErr == io.EOF { |
| // common case. |
| pc.closeLocked(errServerClosedIdle) |
| } else { |
| pc.closeLocked(fmt.Errorf("readLoopPeekFailLocked: %v", peekErr)) |
| } |
| } |
| |
| // is408Message reports whether buf has the prefix of an |
| // HTTP 408 Request Timeout response. |
| // See golang.org/issue/32310. |
| func is408Message(buf []byte) bool { |
| if len(buf) < len("HTTP/1.x 408") { |
| return false |
| } |
| if string(buf[:7]) != "HTTP/1." { |
| return false |
| } |
| return string(buf[8:12]) == " 408" |
| } |
| |
| // readResponse reads an HTTP response (or two, in the case of "Expect: |
| // 100-continue") from the server. It returns the final non-100 one. |
| // trace is optional. |
| func (pc *persistConn) readResponse(rc requestAndChan, trace *httptrace.ClientTrace) (resp *Response, err error) { |
| if trace != nil && trace.GotFirstResponseByte != nil { |
| if peek, err := pc.br.Peek(1); err == nil && len(peek) == 1 { |
| trace.GotFirstResponseByte() |
| } |
| } |
| num1xx := 0 // number of informational 1xx headers received |
| const max1xxResponses = 5 // arbitrary bound on number of informational responses |
| |
| continueCh := rc.continueCh |
| for { |
| resp, err = ReadResponse(pc.br, rc.req) |
| if err != nil { |
| return |
| } |
| resCode := resp.StatusCode |
| if continueCh != nil { |
| if resCode == 100 { |
| if trace != nil && trace.Got100Continue != nil { |
| trace.Got100Continue() |
| } |
| continueCh <- struct{}{} |
| continueCh = nil |
| } else if resCode >= 200 { |
| close(continueCh) |
| continueCh = nil |
| } |
| } |
| is1xx := 100 <= resCode && resCode <= 199 |
| // treat 101 as a terminal status, see issue 26161 |
| is1xxNonTerminal := is1xx && resCode != StatusSwitchingProtocols |
| if is1xxNonTerminal { |
| num1xx++ |
| if num1xx > max1xxResponses { |
| return nil, errors.New("net/http: too many 1xx informational responses") |
| } |
| pc.readLimit = pc.maxHeaderResponseSize() // reset the limit |
| if trace != nil && trace.Got1xxResponse != nil { |
| if err := trace.Got1xxResponse(resCode, textproto.MIMEHeader(resp.Header)); err != nil { |
| return nil, err |
| } |
| } |
| continue |
| } |
| break |
| } |
| if resp.isProtocolSwitch() { |
| resp.Body = newReadWriteCloserBody(pc.br, pc.conn) |
| } |
| |
| resp.TLS = pc.tlsState |
| return |
| } |
| |
| // waitForContinue returns the function to block until |
| // any response, timeout or connection close. After any of them, |
| // the function returns a bool which indicates if the body should be sent. |
| func (pc *persistConn) waitForContinue(continueCh <-chan struct{}) func() bool { |
| if continueCh == nil { |
| return nil |
| } |
| return func() bool { |
| timer := time.NewTimer(pc.t.ExpectContinueTimeout) |
| defer timer.Stop() |
| |
| select { |
| case _, ok := <-continueCh: |
| return ok |
| case <-timer.C: |
| return true |
| case <-pc.closech: |
| return false |
| } |
| } |
| } |
| |
| func newReadWriteCloserBody(br *bufio.Reader, rwc io.ReadWriteCloser) io.ReadWriteCloser { |
| body := &readWriteCloserBody{ReadWriteCloser: rwc} |
| if br.Buffered() != 0 { |
| body.br = br |
| } |
| return body |
| } |
| |
| // readWriteCloserBody is the Response.Body type used when we want to |
| // give users write access to the Body through the underlying |
| // connection (TCP, unless using custom dialers). This is then |
| // the concrete type for a Response.Body on the 101 Switching |
| // Protocols response, as used by WebSockets, h2c, etc. |
| type readWriteCloserBody struct { |
| _ incomparable |
| br *bufio.Reader // used until empty |
| io.ReadWriteCloser |
| } |
| |
| func (b *readWriteCloserBody) Read(p []byte) (n int, err error) { |
| if b.br != nil { |
| if n := b.br.Buffered(); len(p) > n { |
| p = p[:n] |
| } |
| n, err = b.br.Read(p) |
| if b.br.Buffered() == 0 { |
| b.br = nil |
| } |
| return n, err |
| } |
| return b.ReadWriteCloser.Read(p) |
| } |
| |
| // nothingWrittenError wraps a write errors which ended up writing zero bytes. |
| type nothingWrittenError struct { |
| error |
| } |
| |
| func (pc *persistConn) writeLoop() { |
| defer close(pc.writeLoopDone) |
| for { |
| select { |
| case wr := <-pc.writech: |
| startBytesWritten := pc.nwrite |
| err := wr.req.Request.write(pc.bw, pc.isProxy, wr.req.extra, pc.waitForContinue(wr.continueCh)) |
| if bre, ok := err.(requestBodyReadError); ok { |
| err = bre.error |
| // Errors reading from the user's |
| // Request.Body are high priority. |
| // Set it here before sending on the |
| // channels below or calling |
| // pc.close() which tears down |
| // connections and causes other |
| // errors. |
| wr.req.setError(err) |
| } |
| if err == nil { |
| err = pc.bw.Flush() |
| } |
| if err != nil { |
| if pc.nwrite == startBytesWritten { |
| err = nothingWrittenError{err} |
| } |
| } |
| pc.writeErrCh <- err // to the body reader, which might recycle us |
| wr.ch <- err // to the roundTrip function |
| if err != nil { |
| pc.close(err) |
| return |
| } |
| case <-pc.closech: |
| return |
| } |
| } |
| } |
| |
| // maxWriteWaitBeforeConnReuse is how long the a Transport RoundTrip |
| // will wait to see the Request's Body.Write result after getting a |
| // response from the server. See comments in (*persistConn).wroteRequest. |
| const maxWriteWaitBeforeConnReuse = 50 * time.Millisecond |
| |
| // wroteRequest is a check before recycling a connection that the previous write |
| // (from writeLoop above) happened and was successful. |
| func (pc *persistConn) wroteRequest() bool { |
| select { |
| case err := <-pc.writeErrCh: |
| // Common case: the write happened well before the response, so |
| // avoid creating a timer. |
| return err == nil |
| default: |
| // Rare case: the request was written in writeLoop above but |
| // before it could send to pc.writeErrCh, the reader read it |
| // all, processed it, and called us here. In this case, give the |
| // write goroutine a bit of time to finish its send. |
| // |
| // Less rare case: We also get here in the legitimate case of |
| // Issue 7569, where the writer is still writing (or stalled), |
| // but the server has already replied. In this case, we don't |
| // want to wait too long, and we want to return false so this |
| // connection isn't re-used. |
| t := time.NewTimer(maxWriteWaitBeforeConnReuse) |
| defer t.Stop() |
| select { |
| case err := <-pc.writeErrCh: |
| return err == nil |
| case <-t.C: |
| return false |
| } |
| } |
| } |
| |
| // responseAndError is how the goroutine reading from an HTTP/1 server |
| // communicates with the goroutine doing the RoundTrip. |
| type responseAndError struct { |
| _ incomparable |
| res *Response // else use this response (see res method) |
| err error |
| } |
| |
| type requestAndChan struct { |
| _ incomparable |
| req *Request |
| cancelKey cancelKey |
| ch chan responseAndError // unbuffered; always send in select on callerGone |
| |
| // whether the Transport (as opposed to the user client code) |
| // added the Accept-Encoding gzip header. If the Transport |
| // set it, only then do we transparently decode the gzip. |
| addedGzip bool |
| |
| // Optional blocking chan for Expect: 100-continue (for send). |
| // If the request has an "Expect: 100-continue" header and |
| // the server responds 100 Continue, readLoop send a value |
| // to writeLoop via this chan. |
| continueCh chan<- struct{} |
| |
| callerGone <-chan struct{} // closed when roundTrip caller has returned |
| } |
| |
| // A writeRequest is sent by the readLoop's goroutine to the |
| // writeLoop's goroutine to write a request while the read loop |
| // concurrently waits on both the write response and the server's |
| // reply. |
| type writeRequest struct { |
| req *transportRequest |
| ch chan<- error |
| |
| // Optional blocking chan for Expect: 100-continue (for receive). |
| // If not nil, writeLoop blocks sending request body until |
| // it receives from this chan. |
| continueCh <-chan struct{} |
| } |
| |
| type httpError struct { |
| err string |
| timeout bool |
| } |
| |
| func (e *httpError) Error() string { return e.err } |
| func (e *httpError) Timeout() bool { return e.timeout } |
| func (e *httpError) Temporary() bool { return true } |
| |
| var errTimeout error = &httpError{err: "net/http: timeout awaiting response headers", timeout: true} |
| |
| // errRequestCanceled is set to be identical to the one from h2 to facilitate |
| // testing. |
| var errRequestCanceled = http2errRequestCanceled |
| var errRequestCanceledConn = errors.New("net/http: request canceled while waiting for connection") // TODO: unify? |
| |
| func nop() {} |
| |
| // testHooks. Always non-nil. |
| var ( |
| testHookEnterRoundTrip = nop |
| testHookWaitResLoop = nop |
| testHookRoundTripRetried = nop |
| testHookPrePendingDial = nop |
| testHookPostPendingDial = nop |
| |
| testHookMu sync.Locker = fakeLocker{} // guards following |
| testHookReadLoopBeforeNextRead = nop |
| ) |
| |
| func (pc *persistConn) roundTrip(req *transportRequest) (resp *Response, err error) { |
| testHookEnterRoundTrip() |
| if !pc.t.replaceReqCanceler(req.cancelKey, pc.cancelRequest) { |
| pc.t.putOrCloseIdleConn(pc) |
| return nil, errRequestCanceled |
| } |
| pc.mu.Lock() |
| pc.numExpectedResponses++ |
| headerFn := pc.mutateHeaderFunc |
| pc.mu.Unlock() |
| |
| if headerFn != nil { |
| headerFn(req.extraHeaders()) |
| } |
| |
| // Ask for a compressed version if the caller didn't set their |
| // own value for Accept-Encoding. We only attempt to |
| // uncompress the gzip stream if we were the layer that |
| // requested it. |
| requestedGzip := false |
| if !pc.t.DisableCompression && |
| req.Header.Get("Accept-Encoding") == "" && |
| req.Header.Get("Range") == "" && |
| req.Method != "HEAD" { |
| // Request gzip only, not deflate. Deflate is ambiguous and |
| // not as universally supported anyway. |
| // See: https://zlib.net/zlib_faq.html#faq39 |
| // |
| // Note that we don't request this for HEAD requests, |
| // due to a bug in nginx: |
| // https://trac.nginx.org/nginx/ticket/358 |
| // https://golang.org/issue/5522 |
| // |
| // We don't request gzip if the request is for a range, since |
| // auto-decoding a portion of a gzipped document will just fail |
| // anyway. See https://golang.org/issue/8923 |
| requestedGzip = true |
| req.extraHeaders().Set("Accept-Encoding", "gzip") |
| } |
| |
| var continueCh chan struct{} |
| if req.ProtoAtLeast(1, 1) && req.Body != nil && req.expectsContinue() { |
| continueCh = make(chan struct{}, 1) |
| } |
| |
| if pc.t.DisableKeepAlives && |
| !req.wantsClose() && |
| !isProtocolSwitchHeader(req.Header) { |
| req.extraHeaders().Set("Connection", "close") |
| } |
| |
| gone := make(chan struct{}) |
| defer close(gone) |
| |
| defer func() { |
| if err != nil { |
| pc.t.setReqCanceler(req.cancelKey, nil) |
| } |
| }() |
| |
| const debugRoundTrip = false |
| |
| // Write the request concurrently with waiting for a response, |
| // in case the server decides to reply before reading our full |
| // request body. |
| startBytesWritten := pc.nwrite |
| writeErrCh := make(chan error, 1) |
| pc.writech <- writeRequest{req, writeErrCh, continueCh} |
| |
| resc := make(chan responseAndError) |
| pc.reqch <- requestAndChan{ |
| req: req.Request, |
| cancelKey: req.cancelKey, |
| ch: resc, |
| addedGzip: requestedGzip, |
| continueCh: continueCh, |
| callerGone: gone, |
| } |
| |
| var respHeaderTimer <-chan time.Time |
| cancelChan := req.Request.Cancel |
| ctxDoneChan := req.Context().Done() |
| pcClosed := pc.closech |
| canceled := false |
| for { |
| testHookWaitResLoop() |
| select { |
| case err := <-writeErrCh: |
| if debugRoundTrip { |
| req.logf("writeErrCh resv: %T/%#v", err, err) |
| } |
| if err != nil { |
| pc.close(fmt.Errorf("write error: %v", err)) |
| return nil, pc.mapRoundTripError(req, startBytesWritten, err) |
| } |
| if d := pc.t.ResponseHeaderTimeout; d > 0 { |
| if debugRoundTrip { |
| req.logf("starting timer for %v", d) |
| } |
| timer := time.NewTimer(d) |
| defer timer.Stop() // prevent leaks |
| respHeaderTimer = timer.C |
| } |
| case <-pcClosed: |
| pcClosed = nil |
| if canceled || pc.t.replaceReqCanceler(req.cancelKey, nil) { |
| if debugRoundTrip { |
| req.logf("closech recv: %T %#v", pc.closed, pc.closed) |
| } |
| return nil, pc.mapRoundTripError(req, startBytesWritten, pc.closed) |
| } |
| case <-respHeaderTimer: |
| if debugRoundTrip { |
| req.logf("timeout waiting for response headers.") |
| } |
| pc.close(errTimeout) |
| return nil, errTimeout |
| case re := <-resc: |
| if (re.res == nil) == (re.err == nil) { |
| panic(fmt.Sprintf("internal error: exactly one of res or err should be set; nil=%v", re.res == nil)) |
| } |
| if debugRoundTrip { |
| req.logf("resc recv: %p, %T/%#v", re.res, re.err, re.err) |
| } |
| if re.err != nil { |
| return nil, pc.mapRoundTripError(req, startBytesWritten, re.err) |
| } |
| return re.res, nil |
| case <-cancelChan: |
| canceled = pc.t.cancelRequest(req.cancelKey, errRequestCanceled) |
| cancelChan = nil |
| case <-ctxDoneChan: |
| canceled = pc.t.cancelRequest(req.cancelKey, req.Context().Err()) |
| cancelChan = nil |
| ctxDoneChan = nil |
| } |
| } |
| } |
| |
| // tLogKey is a context WithValue key for test debugging contexts containing |
| // a t.Logf func. See export_test.go's Request.WithT method. |
| type tLogKey struct{} |
| |
| func (tr *transportRequest) logf(format string, args ...interface{}) { |
| if logf, ok := tr.Request.Context().Value(tLogKey{}).(func(string, ...interface{})); ok { |
| logf(time.Now().Format(time.RFC3339Nano)+": "+format, args...) |
| } |
| } |
| |
| // markReused marks this connection as having been successfully used for a |
| // request and response. |
| func (pc *persistConn) markReused() { |
| pc.mu.Lock() |
| pc.reused = true |
| pc.mu.Unlock() |
| } |
| |
| // close closes the underlying TCP connection and closes |
| // the pc.closech channel. |
| // |
| // The provided err is only for testing and debugging; in normal |
| // circumstances it should never be seen by users. |
| func (pc *persistConn) close(err error) { |
| pc.mu.Lock() |
| defer pc.mu.Unlock() |
| pc.closeLocked(err) |
| } |
| |
| func (pc *persistConn) closeLocked(err error) { |
| if err == nil { |
| panic("nil error") |
| } |
| pc.broken = true |
| if pc.closed == nil { |
| pc.closed = err |
| pc.t.decConnsPerHost(pc.cacheKey) |
| // Close HTTP/1 (pc.alt == nil) connection. |
| // HTTP/2 closes its connection itself. |
| if pc.alt == nil { |
| if err != errCallerOwnsConn { |
| pc.conn.Close() |
| } |
| close(pc.closech) |
| } |
| } |
| pc.mutateHeaderFunc = nil |
| } |
| |
| var portMap = map[string]string{ |
| "http": "80", |
| "https": "443", |
| "socks5": "1080", |
| } |
| |
| // canonicalAddr returns url.Host but always with a ":port" suffix |
| func canonicalAddr(url *url.URL) string { |
| addr := url.Hostname() |
| if v, err := idnaASCII(addr); err == nil { |
| addr = v |
| } |
| port := url.Port() |
| if port == "" { |
| port = portMap[url.Scheme] |
| } |
| return net.JoinHostPort(addr, port) |
| } |
| |
| // bodyEOFSignal is used by the HTTP/1 transport when reading response |
| // bodies to make sure we see the end of a response body before |
| // proceeding and reading on the connection again. |
| // |
| // It wraps a ReadCloser but runs fn (if non-nil) at most |
| // once, right before its final (error-producing) Read or Close call |
| // returns. fn should return the new error to return from Read or Close. |
| // |
| // If earlyCloseFn is non-nil and Close is called before io.EOF is |
| // seen, earlyCloseFn is called instead of fn, and its return value is |
| // the return value from Close. |
| type bodyEOFSignal struct { |
| body io.ReadCloser |
| mu sync.Mutex // guards following 4 fields |
| closed bool // whether Close has been called |
| rerr error // sticky Read error |
| fn func(error) error // err will be nil on Read io.EOF |
| earlyCloseFn func() error // optional alt Close func used if io.EOF not seen |
| } |
| |
| var errReadOnClosedResBody = errors.New("http: read on closed response body") |
| |
| func (es *bodyEOFSignal) Read(p []byte) (n int, err error) { |
| es.mu.Lock() |
| closed, rerr := es.closed, es.rerr |
| es.mu.Unlock() |
| if closed { |
| return 0, errReadOnClosedResBody |
| } |
| if rerr != nil { |
| return 0, rerr |
| } |
| |
| n, err = es.body.Read(p) |
| if err != nil { |
| es.mu.Lock() |
| defer es.mu.Unlock() |
| if es.rerr == nil { |
| es.rerr = err |
| } |
| err = es.condfn(err) |
| } |
| return |
| } |
| |
| func (es *bodyEOFSignal) Close() error { |
| es.mu.Lock() |
| defer es.mu.Unlock() |
| if es.closed { |
| return nil |
| } |
| es.closed = true |
| if es.earlyCloseFn != nil && es.rerr != io.EOF { |
| return es.earlyCloseFn() |
| } |
| err := es.body.Close() |
| return es.condfn(err) |
| } |
| |
| // caller must hold es.mu. |
| func (es *bodyEOFSignal) condfn(err error) error { |
| if es.fn == nil { |
| return err |
| } |
| err = es.fn(err) |
| es.fn = nil |
| return err |
| } |
| |
| // gzipReader wraps a response body so it can lazily |
| // call gzip.NewReader on the first call to Read |
| type gzipReader struct { |
| _ incomparable |
| body *bodyEOFSignal // underlying HTTP/1 response body framing |
| zr *gzip.Reader // lazily-initialized gzip reader |
| zerr error // any error from gzip.NewReader; sticky |
| } |
| |
| func (gz *gzipReader) Read(p []byte) (n int, err error) { |
| if gz.zr == nil { |
| if gz.zerr == nil { |
| gz.zr, gz.zerr = gzip.NewReader(gz.body) |
| } |
| if gz.zerr != nil { |
| return 0, gz.zerr |
| } |
| } |
| |
| gz.body.mu.Lock() |
| if gz.body.closed { |
| err = errReadOnClosedResBody |
| } |
| gz.body.mu.Unlock() |
| |
| if err != nil { |
| return 0, err |
| } |
| return gz.zr.Read(p) |
| } |
| |
| func (gz *gzipReader) Close() error { |
| return gz.body.Close() |
| } |
| |
| type tlsHandshakeTimeoutError struct{} |
| |
| func (tlsHandshakeTimeoutError) Timeout() bool { return true } |
| func (tlsHandshakeTimeoutError) Temporary() bool { return true } |
| func (tlsHandshakeTimeoutError) Error() string { return "net/http: TLS handshake timeout" } |
| |
| // fakeLocker is a sync.Locker which does nothing. It's used to guard |
| // test-only fields when not under test, to avoid runtime atomic |
| // overhead. |
| type fakeLocker struct{} |
| |
| func (fakeLocker) Lock() {} |
| func (fakeLocker) Unlock() {} |
| |
| // cloneTLSConfig returns a shallow clone of cfg, or a new zero tls.Config if |
| // cfg is nil. This is safe to call even if cfg is in active use by a TLS |
| // client or server. |
| func cloneTLSConfig(cfg *tls.Config) *tls.Config { |
| if cfg == nil { |
| return &tls.Config{} |
| } |
| return cfg.Clone() |
| } |
| |
| type connLRU struct { |
| ll *list.List // list.Element.Value type is of *persistConn |
| m map[*persistConn]*list.Element |
| } |
| |
| // add adds pc to the head of the linked list. |
| func (cl *connLRU) add(pc *persistConn) { |
| if cl.ll == nil { |
| cl.ll = list.New() |
| cl.m = make(map[*persistConn]*list.Element) |
| } |
| ele := cl.ll.PushFront(pc) |
| if _, ok := cl.m[pc]; ok { |
| panic("persistConn was already in LRU") |
| } |
| cl.m[pc] = ele |
| } |
| |
| func (cl *connLRU) removeOldest() *persistConn { |
| ele := cl.ll.Back() |
| pc := ele.Value.(*persistConn) |
| cl.ll.Remove(ele) |
| delete(cl.m, pc) |
| return pc |
| } |
| |
| // remove removes pc from cl. |
| func (cl *connLRU) remove(pc *persistConn) { |
| if ele, ok := cl.m[pc]; ok { |
| cl.ll.Remove(ele) |
| delete(cl.m, pc) |
| } |
| } |
| |
| // len returns the number of items in the cache. |
| func (cl *connLRU) len() int { |
| return len(cl.m) |
| } |