http2: add wrapped ClientConn Implement ClientConn in terms of net/http, including support for user-provided Transport.ConnPools. For #78508 Change-Id: I5245620594538ab9befad888823ed40c6a6a6964 Reviewed-on: https://go-review.googlesource.com/c/net/+/771142 LUCI-TryBot-Result: golang-scoped@luci-project-accounts.iam.gserviceaccount.com <golang-scoped@luci-project-accounts.iam.gserviceaccount.com> Reviewed-by: Nicholas Husin <husin@google.com> Reviewed-by: Nicholas Husin <nsh@golang.org> Auto-Submit: Damien Neil <dneil@google.com>
diff --git a/http2/transport_api_test.go b/http2/transport_api_test.go index b146adb..5116e08 100644 --- a/http2/transport_api_test.go +++ b/http2/transport_api_test.go
@@ -650,6 +650,70 @@ }) } +// TestAPITransportClientConnPending tests the ClientConnState.Pending state. +func TestAPITransportClientConnPending(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + tt := newTestTransport(t, roundTripXNetHTTP2, func(tr2 *http2.Transport) { + tr2.StrictMaxConcurrentStreams = true + }) + + nc := tt.li.newConn() + cc, err := tt.tr.NewClientConn(nc) + if err != nil { + t.Fatalf("NewClientConn: %v", err) + } + + tc1 := tt.getConn() + tc1.wantFrameType(http2.FrameSettings) + tc1.wantFrameType(http2.FrameWindowUpdate) + tc1.writeSettings(http2.Setting{ + ID: http2.SettingMaxConcurrentStreams, + Val: 1, + }) + tc1.wantFrameType(http2.FrameSettings) // ACK + + synctest.Wait() + wantClientConnState(t, cc.State(), http2.ClientConnState{ + MaxConcurrentStreams: 1, + }) + + // Send a request, consuming the concurrency slot. + req1, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + rt1 := newTestRoundTrip(t, req1, cc.RoundTrip) + tc1.wantFrameType(http2.FrameHeaders) + wantClientConnState(t, cc.State(), http2.ClientConnState{ + MaxConcurrentStreams: 1, + StreamsActive: 1, + }) + + // Send another request, which enters the Pending state. + req2, _ := http.NewRequest("GET", "https://dummy.tld/", nil) + _ = newTestRoundTrip(t, req2, cc.RoundTrip) + tc1.wantIdle() + wantClientConnState(t, cc.State(), http2.ClientConnState{ + MaxConcurrentStreams: 1, + StreamsActive: 1, + StreamsPending: 1, + }) + + // First request completes, second starts. + tc1.writeHeaders(http2.HeadersFrameParam{ + StreamID: 1, + EndHeaders: true, + EndStream: true, + BlockFragment: tc1.makeHeaderBlockFragment( + ":status", "200", + ), + }) + rt1.wantStatus(200) + tc1.wantFrameType(http2.FrameHeaders) + wantClientConnState(t, cc.State(), http2.ClientConnState{ + MaxConcurrentStreams: 1, + StreamsActive: 1, + }) + }) +} + // TestAPIClientConnPing tests the ClientConn.Ping method. func TestAPIClientConnPing(t *testing.T) { synctest.Test(t, func(t *testing.T) {
diff --git a/http2/transport_wrap.go b/http2/transport_wrap.go index f669a4c..69496e4 100644 --- a/http2/transport_wrap.go +++ b/http2/transport_wrap.go
@@ -12,8 +12,10 @@ "context" "crypto/tls" "errors" + "math" "net" "net/http" + "net/http/httptrace" "slices" "sync" "time" @@ -163,7 +165,7 @@ // When the Transport has a user-provided connection pool (unusual, deprecated), // we need to handle picking a connection, retrys, etc. if t.ConnPool != nil { - return nil, errors.New("TODO") + return t.roundTripViaPool(req, opt, t.ConnPool) } // Setting this context key lets net/http know that if it is necessary to dial @@ -184,47 +186,196 @@ } func (t *Transport) newUserClientConn(c net.Conn) (*ClientConn, error) { - return nil, errors.New("TODO") + // http.Transport's NewClientConn doesn't provide a supported way to create + // a connection from a net.Conn. (This might be useful to add in the future?) + // We're going to craftily sneak one in via the context key, with the + // scheme of "http/2" telling NewClientConn to look for it. + ctx := context.WithValue(context.Background(), netConnContextKey{}, c) + + nhcc, err := t.t1.NewClientConn(ctx, "http/2", "") + if err != nil { + return nil, err + } + cc := &ClientConn{cc: nhcc, tr: t, tconn: c} + nhcc.SetStateHook(cc.stateHook) + return cc, nil } // ClientConn is the state of a single HTTP/2 client connection to an // HTTP/2 server. type ClientConn struct { + cc *http.ClientConn + tconn net.Conn + tr *Transport + doNotReuse bool + + mu sync.Mutex + closing bool + closed bool + roundTrips int + reserved int + starting int + pending int + maxConcurrent int + lastIdle time.Time + shutdownc chan struct{} + atomicReused uint32 // whether conn is being reused; atomic } func (cc *ClientConn) roundTrip(req *http.Request) (*http.Response, error) { - return nil, nil + err := func() error { + cc.mu.Lock() + defer cc.mu.Unlock() + if cc.doNotReuse { + return errClientConnUnusable + } + cc.roundTrips++ + if cc.reserved > 0 { + // We've already reserved a concurrency slot for this request. + cc.reserved-- + } else if cc.cc.Reserve() != nil { + // We don't seem to have an available concurrency slot, + // so bump the pending count (requests waiting for a slot). + cc.pending++ + } + // ClientConn.Shutdown will not shut down the conn while + // cc.starting > 0 or cc.cc.InFlight() > 0. + // + // The starting state covers the gap between us deciding to + // start sending the request, and actually sending it. + cc.starting++ + return nil + }() + if err != nil { + return nil, err + } + resp, err := cc.cc.RoundTrip(req) + cc.mu.Lock() + cc.starting-- + if cc.pending > 0 { + // A request completing frees up a concurrency slot for + // a pending request to start. + cc.pending-- + } + cc.updateStateLocked() + cc.mu.Unlock() + return resp, err } func (cc *ClientConn) canTakeNewRequest() bool { - return false + return cc.cc.Available() > 0 && !cc.doNotReuse } func (cc *ClientConn) close() error { - return nil + return cc.cc.Close() } func (cc *ClientConn) ping(ctx context.Context) error { - return nil + // Ask net/http to ping its connection by sending a request with a method of ":ping". + _, err := cc.cc.RoundTrip((&http.Request{ + Method: ":ping", + }).WithContext(ctx)) + return err } func (cc *ClientConn) reserveNewRequest() bool { - return false + cc.mu.Lock() + defer cc.mu.Unlock() + if cc.doNotReuse { + return false + } + if err := cc.cc.Reserve(); err != nil { + return false + } + cc.reserved++ + return true } func (cc *ClientConn) setDoNotReuse() { + cc.mu.Lock() + defer cc.mu.Unlock() + cc.doNotReuse = true + cc.closing = true } func (cc *ClientConn) shutdown(ctx context.Context) error { + cc.mu.Lock() + inFlight := cc.cc.InFlight() + cc.starting + if inFlight > 0 && cc.shutdownc == nil { + cc.shutdownc = make(chan struct{}) + } + shutdownc := cc.shutdownc + cc.mu.Unlock() + if shutdownc != nil { + // Wait for in-flight requests to finish. + select { + case <-shutdownc: + case <-ctx.Done(): + return ctx.Err() + } + } + cc.cc.Close() return nil } func (cc *ClientConn) state() ClientConnState { - return ClientConnState{} + cc.mu.Lock() + defer cc.mu.Unlock() + cc.updateStateLocked() + return ClientConnState{ + Closed: cc.closed, + Closing: cc.closing, + StreamsActive: cc.cc.InFlight() - cc.reserved, + StreamsReserved: cc.reserved, + StreamsPending: cc.pending, + MaxConcurrentStreams: uint32(min(cc.maxConcurrent, math.MaxUint32)), + LastIdle: cc.lastIdle, + } +} + +// stateHook is the http.ClientConn's state hook. +func (cc *ClientConn) stateHook(*http.ClientConn) { + cc.mu.Lock() + defer cc.mu.Unlock() + cc.updateStateLocked() +} + +func (cc *ClientConn) updateStateLocked() { + if cc.cc.Err() != nil && !cc.closed { + cc.closing = true + cc.closed = true + if cc.tr.ConnPool != nil { + // Do the ConnPool update in another goroutine, + // to avoid holding the conn mutex while it runs. + go cc.tr.ConnPool.MarkDead(cc) + } + } + if cc.cc.InFlight() == 0 && cc.roundTrips > 0 && cc.starting == 0 { + cc.lastIdle = time.Now() + } + if !cc.closed { + // This is slightly racy (a request could start or finish in between + // the Available and InFlight calls), but the best we can do given that + // the net/http ClientConn API doesn't expose the conn's max concurrency. + cc.maxConcurrent = cc.cc.Available() + cc.cc.InFlight() + } + if cc.shutdownc != nil && cc.cc.InFlight()+cc.starting == 0 { + close(cc.shutdownc) + cc.shutdownc = nil + } } func (cc *ClientConn) stopIdleTimer() {} +// traceGotConn is (when http2wrap is enabled) only used for tracing +// connections acquired while using a user-provided ClientConnPool. func traceGotConn(req *http.Request, cc *ClientConn, reused bool) { + trace := httptrace.ContextClientTrace(req.Context()) + if trace == nil || trace.GotConn == nil { + return + } + ci := httptrace.GotConnInfo{Conn: cc.tconn} + ci.Reused = reused + trace.GotConn(ci) }