| // Copyright 2024 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package http_test |
| |
| import ( |
| "context" |
| "crypto/tls" |
| "errors" |
| "io" |
| "net" |
| "net/http" |
| "net/http/httptrace" |
| "strings" |
| "sync" |
| "testing" |
| "testing/synctest" |
| ) |
| |
| // Successive requests use the same HTTP/1 connection. |
| func TestTransportPoolConnReusePriorConnection(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http1Mode) |
| |
| // First request creates a new connection. |
| rt1 := dt.roundTrip() |
| c1 := dt.wantDial() |
| c1.finish(nil) |
| rt1.wantDone(c1, "HTTP/1.1") |
| rt1.finish() |
| |
| // Second request reuses the first connection. |
| rt2 := dt.roundTrip() |
| rt2.wantDone(c1, "HTTP/1.1") |
| rt2.finish() |
| }) |
| } |
| |
| // Two HTTP/1 requests made at the same time use different connections. |
| func TestTransportPoolConnCannotReuseConnectionInUse(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http1Mode) |
| |
| // First request creates a new connection. |
| rt1 := dt.roundTrip() |
| c1 := dt.wantDial() |
| c1.finish(nil) |
| rt1.wantDone(c1, "HTTP/1.1") |
| |
| // Second request is made while the first request is still using its connection, |
| // so it goes on a new connection. |
| rt2 := dt.roundTrip() |
| c2 := dt.wantDial() |
| c2.finish(nil) |
| rt2.wantDone(c2, "HTTP/1.1") |
| }) |
| } |
| |
| // When an HTTP/2 connection is at its stream limit |
| // a new request is made on a new connection. |
| func testTransportPoolConnHTTP2NoStrictMaxConcurrentRequests(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http2Mode, func(srv *http.Server) { |
| srv.HTTP2 = &http.HTTP2Config{ |
| MaxConcurrentStreams: 2, |
| } |
| }) |
| |
| // First request dials an HTTP/2 connection. |
| rt1 := dt.roundTrip() |
| c1 := dt.wantDial() |
| c1.finish(nil) |
| rt1.wantDone(c1, "HTTP/2.0") |
| |
| // Second request uses the existing connection. |
| rt2 := dt.roundTrip() |
| rt2.wantDone(c1, "HTTP/2.0") |
| |
| // Third request creates a new connection |
| rt3 := dt.roundTrip() |
| c2 := dt.wantDial() |
| c2.finish(nil) |
| rt3.wantDone(c2, "HTTP/2.0") |
| |
| rt1.finish() |
| rt2.finish() |
| rt3.finish() |
| |
| // With slots available on both connections, we prefer the oldest. |
| rt4 := dt.roundTrip() |
| rt4.wantDone(c1, "HTTP/2.0") |
| rt5 := dt.roundTrip() |
| rt5.wantDone(c1, "HTTP/2.0") |
| rt6 := dt.roundTrip() |
| rt6.wantDone(c2, "HTTP/2.0") |
| rt4.finish() |
| rt5.finish() |
| rt6.finish() |
| }) |
| } |
| |
| // When an HTTP/2 connection is at its stream limit |
| // and StrictMaxConcurrentRequests = true, |
| // a new request waits for a slot on the existing connection. |
| func TestTransportPoolConnHTTP2StrictMaxConcurrentRequests(t *testing.T) { |
| t.Skip("skipped until h2_bundle.go includes support for StrictMaxConcurrentRequests") |
| |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http2Mode, func(srv *http.Server) { |
| srv.HTTP2.MaxConcurrentStreams = 2 |
| }, func(tr *http.Transport) { |
| tr.HTTP2 = &http.HTTP2Config{ |
| StrictMaxConcurrentRequests: true, |
| } |
| }) |
| |
| // First request dials an HTTP/2 connection. |
| rt1 := dt.roundTrip() |
| c1 := dt.wantDial() |
| c1.finish(nil) |
| rt1.wantDone(c1, "HTTP/2.0") |
| |
| // Second request uses the existing connection. |
| rt2 := dt.roundTrip() |
| rt2.wantDone(c1, "HTTP/2.0") |
| |
| // Third request blocks waiting for a slot on the existing connection. |
| rt3 := dt.roundTrip() |
| |
| // First request finishing unblocks the thirrd. |
| rt1.finish() |
| rt3.wantDone(c1, "HTTP/2.0") |
| |
| rt2.finish() |
| rt3.finish() |
| }) |
| } |
| |
| // A new request made while an HTTP/2 dial is in progress will start a second dial. |
| func TestTransportPoolConnHTTP2Startup(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http2Mode, func(srv *http.Server) {}) |
| |
| // Two requests start. |
| // Since the second request starts before the first dial finishes, it starts a second dial. |
| rt1 := dt.roundTrip() |
| rt2 := dt.roundTrip() |
| c1 := dt.wantDial() |
| c2 := dt.wantDial() |
| |
| // Both requests use the conn of the first dial to complete. |
| c1.finish(nil) |
| rt1.wantDone(c1, "HTTP/2.0") |
| rt2.wantDone(c1, "HTTP/2.0") |
| |
| rt1.finish() |
| rt2.finish() |
| c2.finish(nil) |
| }) |
| } |
| |
| // When a request finishes using an HTTP/1 connection, |
| // a pending request attempting to dial a new connection will use the newly-available one. |
| func TestTransportPoolConnConnectionBecomesAvailableDuringDial(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http1Mode) |
| |
| // First request creates a new connection. |
| rt1 := dt.roundTrip() |
| c1 := dt.wantDial() |
| c1.finish(nil) |
| rt1.wantDone(c1, "HTTP/1.1") |
| |
| // Second request is made while the first request is still using its connection. |
| // The first connection completes while the second Dial is in progress, so the |
| // second request uses the first connection. |
| rt2 := dt.roundTrip() |
| c2 := dt.wantDial() |
| rt1.finish() |
| rt2.wantDone(c1, "HTTP/1.1") |
| |
| // This section is a bit overfitted to the current Transport implementation: |
| // A third request starts. We have an in-progress dial that was started by rt2, |
| // but this new request (rt3) is going to ignore it and make a dial of its own. |
| // rt3 will use the first of these dials that completes. |
| rt3 := dt.roundTrip() |
| c3 := dt.wantDial() |
| c2.finish(nil) |
| rt3.wantDone(c2, "HTTP/1.1") |
| |
| c3.finish(nil) |
| }) |
| } |
| |
| // Connections are not reused when DisableKeepAlives = true. |
| func TestTransportPoolDisableKeepAlives(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http1Mode, func(tr *http.Transport) { |
| tr.DisableKeepAlives = true |
| }) |
| |
| // Two requests, each uses a separate connection. |
| for range 2 { |
| rt := dt.roundTrip() |
| c := dt.wantDial() |
| c.finish(nil) |
| rt.wantDone(c, "HTTP/1.1") |
| rt.finish() |
| } |
| }) |
| } |
| |
| // Canceling a request before its connection is created returns the conn to the pool. |
| func TestTransportPoolCancelRequestReusesConn(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http1Mode) |
| |
| // First request is canceled before its connection is created. |
| rt1 := dt.roundTrip() |
| c1 := dt.wantDial() |
| rt1.cancel() |
| rt1.wantError() |
| |
| // Second request uses the first connection. |
| rt2 := dt.roundTrip() |
| c2 := dt.wantDial() |
| c1.finish(nil) // first dial finishes |
| rt2.wantDone(c1, "HTTP/1.1") |
| rt2.finish() |
| |
| c2.finish(nil) // second dial finishes |
| }) |
| } |
| |
| // Connections are not reused when DisableKeepAlives = true. |
| func TestTransportPoolCancelRequestWithDisableKeepAlives(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http1Mode, func(tr *http.Transport) { |
| tr.DisableKeepAlives = true |
| }) |
| |
| // First request is canceled before its connection is created. |
| rt1 := dt.roundTrip() |
| c1 := dt.wantDial() |
| rt1.cancel() |
| rt1.wantError() |
| |
| // Dial finishes. DisableKeepAlives = true, so we discard the connection. |
| c1.finish(nil) |
| |
| // Second request is made on a new connection. |
| rt2 := dt.roundTrip() |
| c2 := dt.wantDial() |
| c2.finish(nil) |
| rt2.wantDone(c2, "HTTP/1.1") |
| rt2.finish() |
| }) |
| } |
| |
| // Connections are not reused after an error. |
| func TestTransportPoolConnectionBroken(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http1Mode) |
| |
| // First request creates a new connection. |
| // The connection breaks while sending the response. |
| rt1 := dt.roundTrip() |
| c1 := dt.wantDial() |
| c1.finish(nil) |
| rt1.wantDone(c1, "HTTP/1.1") |
| c1.fakeNetConn.Close() // break the connection |
| rt1.finish() |
| |
| // Second request is made on a new connection, since the first is broken. |
| rt2 := dt.roundTrip() |
| c2 := dt.wantDial() |
| c2.finish(nil) |
| rt2.wantDone(c2, "HTTP/1.1") |
| c2.fakeNetConn.Close() |
| rt2.finish() |
| }) |
| } |
| |
| // MaxIdleConnsPerHost limits the number of idle connections. |
| func TestTransportPoolClosesConnsPastMaxIdleConnsPerHost(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| dt := newTransportDialTester(t, http1Mode, func(tr *http.Transport) { |
| tr.MaxIdleConnsPerHost = 1 |
| }) |
| |
| // First request creates a new connection. |
| rt1 := dt.roundTrip("host1.fake.tld") |
| c1 := dt.wantDial() |
| c1.finish(nil) |
| rt1.wantDone(c1, "HTTP/1.1") |
| |
| // Second request also creates a new connection. |
| rt2 := dt.roundTrip("host1.fake.tld") |
| c2 := dt.wantDial() |
| c2.finish(nil) |
| rt2.wantDone(c2, "HTTP/1.1") |
| |
| // Third request is to a different host. |
| rt3 := dt.roundTrip("host2.fake.tld") |
| c3 := dt.wantDial() |
| c3.finish(nil) |
| rt3.wantDone(c3, "HTTP/1.1") |
| |
| // All requests finish. One conn is in excess of MaxIdleConnsPerHost, and is closed. |
| rt3.finish() |
| rt2.finish() |
| rt1.finish() |
| c1.wantClosed() |
| |
| // Additional requests reuse the remaining connections. |
| rt4 := dt.roundTrip("host1.fake.tld") |
| rt4.wantDone(c2, "HTTP/1.1") |
| rt4.finish() |
| rt5 := dt.roundTrip("host2.fake.tld") |
| rt5.wantDone(c3, "HTTP/1.1") |
| rt5.finish() |
| }) |
| } |
| |
| // Current (but probably wrong) behavior: |
| // MaxIdleConnsPerHost doesn't apply to HTTP/2 connections. |
| func TestTransportPoolMaxIdleConnsPerHostHTTP2(t *testing.T) { |
| synctest.Test(t, func(t *testing.T) { |
| t.Skip("skipped until h2_bundle.go includes support for MaxConcurrentStreams") |
| |
| dt := newTransportDialTester(t, http2Mode, func(srv *http.Server) { |
| srv.HTTP2 = &http.HTTP2Config{ |
| MaxConcurrentStreams: 1, |
| } |
| }, func(tr *http.Transport) { |
| tr.MaxIdleConnsPerHost = 1 |
| }) |
| |
| // First request creates a new connection. |
| rt1 := dt.roundTrip() |
| c1 := dt.wantDial() |
| c1.finish(nil) |
| rt1.wantDone(c1, "HTTP/2.0") |
| |
| // Second request also creates a new connection. |
| rt2 := dt.roundTrip() |
| c2 := dt.wantDial() |
| c2.finish(nil) |
| rt2.wantDone(c2, "HTTP/2.0") |
| |
| // Both requests finish. |
| // We have two idle conns for this host, but we keep them both. |
| rt1.finish() |
| rt2.finish() |
| |
| // Two new requests use the existing connections. |
| rt3 := dt.roundTrip() |
| rt3.wantDone(c1, "HTTP/2.0") |
| rt4 := dt.roundTrip() |
| rt4.wantDone(c2, "HTTP/2.0") |
| }) |
| } |
| |
| // A transportDialTester manages a test of a connection's Dials. |
| type transportDialTester struct { |
| t *testing.T |
| cst *clientServerTest |
| |
| dialsMu sync.Mutex |
| dials []*transportDialTesterConn |
| |
| roundTripCount int |
| dialCount int |
| } |
| |
| // A transportDialTesterRoundTrip is a RoundTrip made as part of a dial test. |
| type transportDialTesterRoundTrip struct { |
| t *testing.T |
| |
| roundTripID int // distinguishes RoundTrips in logs |
| cancel context.CancelFunc // cancels the Request context |
| reqBody io.WriteCloser // write half of the Request.Body |
| respBodyClosed bool // set when the user calls Response.Body.Close |
| returned bool // set when RoundTrip returns |
| |
| res *http.Response |
| err error |
| conn *transportDialTesterConn |
| } |
| |
| // A transportDialTesterConn is a client connection created by the Transport as |
| // part of a dial test. |
| type transportDialTesterConn struct { |
| t *testing.T |
| |
| connID int // distinguished Dials in logs |
| ready chan error // sent on to complete the Dial |
| protos []string |
| closed chan struct{} |
| |
| *fakeNetConn |
| } |
| |
| func newTransportDialTester(t *testing.T, mode testMode, opts ...any) *transportDialTester { |
| t.Helper() |
| dt := &transportDialTester{ |
| t: t, |
| } |
| dialContext := func(_ context.Context, network, address string) (*transportDialTesterConn, error) { |
| c := &transportDialTesterConn{ |
| t: t, |
| ready: make(chan error), |
| closed: make(chan struct{}), |
| } |
| // Notify the test that a Dial has started, |
| // and wait for the test to notify us that it should complete. |
| dt.dialsMu.Lock() |
| dt.dials = append(dt.dials, c) |
| dt.dialsMu.Unlock() |
| |
| select { |
| case err := <-c.ready: |
| if err != nil { |
| return nil, err |
| } |
| case <-t.Context().Done(): |
| t.Errorf("test finished with dial in progress") |
| return nil, errors.New("test finished") |
| } |
| |
| c.fakeNetConn = dt.cst.li.connect() |
| t.Cleanup(func() { |
| c.fakeNetConn.Close() |
| }) |
| // Use the *transportDialTesterConn as the net.Conn, |
| // to let tests associate requests with connections. |
| return c, nil |
| } |
| dt.cst = newClientServerTest(t, mode, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
| // Write response headers when we receive a request. |
| http.NewResponseController(w).EnableFullDuplex() |
| w.WriteHeader(200) |
| http.NewResponseController(w).Flush() |
| // Wait for the client to send the request body, |
| // to synchronize with the rest of the test. |
| io.ReadAll(r.Body) |
| }), append([]any{optFakeNet, func(tr *http.Transport) { |
| tr.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { |
| return dialContext(ctx, network, dt.cst.ts.Listener.Addr().String()) |
| } |
| tr.DialTLSContext = func(ctx context.Context, network, addr string) (net.Conn, error) { |
| conn, err := dialContext(ctx, network, dt.cst.ts.Listener.Addr().String()) |
| if err != nil { |
| return nil, err |
| } |
| config := &tls.Config{ |
| InsecureSkipVerify: true, |
| NextProtos: []string{"h2", "http/1.1"}, |
| } |
| if conn.protos != nil { |
| config.NextProtos = conn.protos |
| } |
| tc := tls.Client(conn, config) |
| if err := tc.Handshake(); err != nil { |
| return nil, err |
| } |
| return tc, nil |
| } |
| }}, opts...)...) |
| return dt |
| } |
| |
| // roundTrip starts a RoundTrip. |
| // It returns immediately, without waiting for the RoundTrip call to complete. |
| func (dt *transportDialTester) roundTrip(opts ...any) *transportDialTesterRoundTrip { |
| dt.t.Helper() |
| host := "fake.tld" |
| for _, o := range opts { |
| switch o := o.(type) { |
| case string: |
| host = o |
| default: |
| dt.t.Fatalf("unknown option type %T", o) |
| } |
| } |
| ctx, cancel := context.WithCancel(context.Background()) |
| pr, pw := io.Pipe() |
| dt.roundTripCount++ |
| rt := &transportDialTesterRoundTrip{ |
| t: dt.t, |
| roundTripID: dt.roundTripCount, |
| reqBody: pw, |
| cancel: cancel, |
| } |
| dt.t.Logf("RoundTrip %v: started", rt.roundTripID) |
| dt.t.Cleanup(func() { |
| rt.cancel() |
| rt.finish() |
| }) |
| go func() { |
| ctx = httptrace.WithClientTrace(ctx, &httptrace.ClientTrace{ |
| GotConn: func(info httptrace.GotConnInfo) { |
| c := info.Conn |
| if tlsConn, ok := c.(*tls.Conn); ok { |
| c = tlsConn.NetConn() |
| } |
| rt.conn = c.(*transportDialTesterConn) |
| }, |
| }) |
| proto, _, _ := strings.Cut(dt.cst.ts.URL, ":") |
| req, _ := http.NewRequestWithContext(ctx, "POST", proto+"://"+host, pr) |
| req.Header.Set("Content-Type", "text/plain") |
| rt.res, rt.err = dt.cst.tr.RoundTrip(req) |
| dt.t.Logf("RoundTrip %v: done (err:%v)", rt.roundTripID, rt.err) |
| rt.returned = true |
| }() |
| return rt |
| } |
| |
| // wantDone indicates that a RoundTrip should have returned. |
| func (rt *transportDialTesterRoundTrip) wantDone(c *transportDialTesterConn, wantProto string) { |
| rt.t.Helper() |
| synctest.Wait() |
| if !rt.returned { |
| rt.t.Fatalf("RoundTrip %v: still running, want to have returned", rt.roundTripID) |
| } |
| if rt.err != nil { |
| rt.t.Fatalf("RoundTrip %v: want success, got err %v", rt.roundTripID, rt.err) |
| } |
| if rt.conn != c { |
| rt.t.Fatalf("RoundTrip %v: want on conn %v, got conn %v", rt.roundTripID, c.connID, rt.conn.connID) |
| } |
| if got, want := rt.conn, c; got != want { |
| rt.t.Fatalf("RoundTrip %v: sent on conn %v, want conn %v", rt.roundTripID, got.connID, want.connID) |
| } |
| if got, want := rt.res.Proto, wantProto; got != want { |
| rt.t.Fatalf("RoundTrip %v: got protocol %q, want %q", rt.roundTripID, got, want) |
| } |
| } |
| |
| // wantError indicates that a RoundTrip should have returned with an error. |
| func (rt *transportDialTesterRoundTrip) wantError() { |
| rt.t.Helper() |
| synctest.Wait() |
| if !rt.returned { |
| rt.t.Fatalf("RoundTrip %v: still running, want to have returned", rt.roundTripID) |
| } |
| if rt.err == nil { |
| rt.t.Fatalf("RoundTrip %v: success, want error", rt.roundTripID) |
| } |
| } |
| |
| // finish completes a RoundTrip by sending the request body, consuming the response body, |
| // and closing the response body. |
| func (rt *transportDialTesterRoundTrip) finish() { |
| rt.t.Helper() |
| |
| synctest.Wait() |
| if !rt.returned { |
| rt.t.Fatalf("RoundTrip %v: still running, want to have returned", rt.roundTripID) |
| } |
| if rt.err != nil { |
| return |
| } |
| |
| if rt.respBodyClosed { |
| return |
| } |
| rt.respBodyClosed = true |
| rt.reqBody.Close() |
| io.ReadAll(rt.res.Body) |
| rt.res.Body.Close() |
| rt.t.Logf("RoundTrip %v: closed request body", rt.roundTripID) |
| } |
| |
| // wantDial waits for the Transport to start a Dial. |
| func (dt *transportDialTester) wantDial() *transportDialTesterConn { |
| dt.t.Helper() |
| synctest.Wait() |
| dt.dialsMu.Lock() |
| defer dt.dialsMu.Unlock() |
| if len(dt.dials) == 0 { |
| dt.t.Fatalf("no dial started, want one") |
| } |
| c := dt.dials[0] |
| dt.dials = dt.dials[1:] |
| dt.dialCount++ |
| c.connID = dt.dialCount |
| dt.t.Logf("Dial %v: started", c.connID) |
| return c |
| } |
| |
| // finish completes a Dial. |
| func (c *transportDialTesterConn) finish(err error) { |
| c.t.Helper() |
| c.t.Logf("Dial %v: finished (err:%v)", c.connID, err) |
| c.ready <- err |
| close(c.ready) |
| } |
| |
| func (c *transportDialTesterConn) wantClosed() { |
| c.t.Helper() |
| <-c.closed |
| } |
| |
| func (c *transportDialTesterConn) Close() error { |
| select { |
| case <-c.closed: |
| default: |
| c.t.Logf("Conn %v: closed", c.connID) |
| close(c.closed) |
| } |
| return nil |
| } |