http2: add wrapped ClientConn

Implement ClientConn in terms of net/http,
including support for user-provided Transport.ConnPools.

For #78508

Change-Id: I5245620594538ab9befad888823ed40c6a6a6964
Reviewed-on: https://go-review.googlesource.com/c/net/+/771142
LUCI-TryBot-Result: golang-scoped@luci-project-accounts.iam.gserviceaccount.com <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Nicholas Husin <husin@google.com>
Reviewed-by: Nicholas Husin <nsh@golang.org>
Auto-Submit: Damien Neil <dneil@google.com>
diff --git a/http2/transport_api_test.go b/http2/transport_api_test.go
index b146adb..5116e08 100644
--- a/http2/transport_api_test.go
+++ b/http2/transport_api_test.go
@@ -650,6 +650,70 @@
 	})
 }
 
+// TestAPITransportClientConnPending tests the ClientConnState.Pending state.
+func TestAPITransportClientConnPending(t *testing.T) {
+	synctest.Test(t, func(t *testing.T) {
+		tt := newTestTransport(t, roundTripXNetHTTP2, func(tr2 *http2.Transport) {
+			tr2.StrictMaxConcurrentStreams = true
+		})
+
+		nc := tt.li.newConn()
+		cc, err := tt.tr.NewClientConn(nc)
+		if err != nil {
+			t.Fatalf("NewClientConn: %v", err)
+		}
+
+		tc1 := tt.getConn()
+		tc1.wantFrameType(http2.FrameSettings)
+		tc1.wantFrameType(http2.FrameWindowUpdate)
+		tc1.writeSettings(http2.Setting{
+			ID:  http2.SettingMaxConcurrentStreams,
+			Val: 1,
+		})
+		tc1.wantFrameType(http2.FrameSettings) // ACK
+
+		synctest.Wait()
+		wantClientConnState(t, cc.State(), http2.ClientConnState{
+			MaxConcurrentStreams: 1,
+		})
+
+		// Send a request, consuming the concurrency slot.
+		req1, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
+		rt1 := newTestRoundTrip(t, req1, cc.RoundTrip)
+		tc1.wantFrameType(http2.FrameHeaders)
+		wantClientConnState(t, cc.State(), http2.ClientConnState{
+			MaxConcurrentStreams: 1,
+			StreamsActive:        1,
+		})
+
+		// Send another request, which enters the Pending state.
+		req2, _ := http.NewRequest("GET", "https://dummy.tld/", nil)
+		_ = newTestRoundTrip(t, req2, cc.RoundTrip)
+		tc1.wantIdle()
+		wantClientConnState(t, cc.State(), http2.ClientConnState{
+			MaxConcurrentStreams: 1,
+			StreamsActive:        1,
+			StreamsPending:       1,
+		})
+
+		// First request completes, second starts.
+		tc1.writeHeaders(http2.HeadersFrameParam{
+			StreamID:   1,
+			EndHeaders: true,
+			EndStream:  true,
+			BlockFragment: tc1.makeHeaderBlockFragment(
+				":status", "200",
+			),
+		})
+		rt1.wantStatus(200)
+		tc1.wantFrameType(http2.FrameHeaders)
+		wantClientConnState(t, cc.State(), http2.ClientConnState{
+			MaxConcurrentStreams: 1,
+			StreamsActive:        1,
+		})
+	})
+}
+
 // TestAPIClientConnPing tests the ClientConn.Ping method.
 func TestAPIClientConnPing(t *testing.T) {
 	synctest.Test(t, func(t *testing.T) {
diff --git a/http2/transport_wrap.go b/http2/transport_wrap.go
index f669a4c..69496e4 100644
--- a/http2/transport_wrap.go
+++ b/http2/transport_wrap.go
@@ -12,8 +12,10 @@
 	"context"
 	"crypto/tls"
 	"errors"
+	"math"
 	"net"
 	"net/http"
+	"net/http/httptrace"
 	"slices"
 	"sync"
 	"time"
@@ -163,7 +165,7 @@
 	// When the Transport has a user-provided connection pool (unusual, deprecated),
 	// we need to handle picking a connection, retrys, etc.
 	if t.ConnPool != nil {
-		return nil, errors.New("TODO")
+		return t.roundTripViaPool(req, opt, t.ConnPool)
 	}
 
 	// Setting this context key lets net/http know that if it is necessary to dial
@@ -184,47 +186,196 @@
 }
 
 func (t *Transport) newUserClientConn(c net.Conn) (*ClientConn, error) {
-	return nil, errors.New("TODO")
+	// http.Transport's NewClientConn doesn't provide a supported way to create
+	// a connection from a net.Conn. (This might be useful to add in the future?)
+	// We're going to craftily sneak one in via the context key, with the
+	// scheme of "http/2" telling NewClientConn to look for it.
+	ctx := context.WithValue(context.Background(), netConnContextKey{}, c)
+
+	nhcc, err := t.t1.NewClientConn(ctx, "http/2", "")
+	if err != nil {
+		return nil, err
+	}
+	cc := &ClientConn{cc: nhcc, tr: t, tconn: c}
+	nhcc.SetStateHook(cc.stateHook)
+	return cc, nil
 }
 
 // ClientConn is the state of a single HTTP/2 client connection to an
 // HTTP/2 server.
 type ClientConn struct {
+	cc         *http.ClientConn
+	tconn      net.Conn
+	tr         *Transport
+	doNotReuse bool
+
+	mu            sync.Mutex
+	closing       bool
+	closed        bool
+	roundTrips    int
+	reserved      int
+	starting      int
+	pending       int
+	maxConcurrent int
+	lastIdle      time.Time
+	shutdownc     chan struct{}
+
 	atomicReused uint32 // whether conn is being reused; atomic
 }
 
 func (cc *ClientConn) roundTrip(req *http.Request) (*http.Response, error) {
-	return nil, nil
+	err := func() error {
+		cc.mu.Lock()
+		defer cc.mu.Unlock()
+		if cc.doNotReuse {
+			return errClientConnUnusable
+		}
+		cc.roundTrips++
+		if cc.reserved > 0 {
+			// We've already reserved a concurrency slot for this request.
+			cc.reserved--
+		} else if cc.cc.Reserve() != nil {
+			// We don't seem to have an available concurrency slot,
+			// so bump the pending count (requests waiting for a slot).
+			cc.pending++
+		}
+		// ClientConn.Shutdown will not shut down the conn while
+		// cc.starting > 0 or cc.cc.InFlight() > 0.
+		//
+		// The starting state covers the gap between us deciding to
+		// start sending the request, and actually sending it.
+		cc.starting++
+		return nil
+	}()
+	if err != nil {
+		return nil, err
+	}
+	resp, err := cc.cc.RoundTrip(req)
+	cc.mu.Lock()
+	cc.starting--
+	if cc.pending > 0 {
+		// A request completing frees up a concurrency slot for
+		// a pending request to start.
+		cc.pending--
+	}
+	cc.updateStateLocked()
+	cc.mu.Unlock()
+	return resp, err
 }
 
 func (cc *ClientConn) canTakeNewRequest() bool {
-	return false
+	return cc.cc.Available() > 0 && !cc.doNotReuse
 }
 
 func (cc *ClientConn) close() error {
-	return nil
+	return cc.cc.Close()
 }
 
 func (cc *ClientConn) ping(ctx context.Context) error {
-	return nil
+	// Ask net/http to ping its connection by sending a request with a method of ":ping".
+	_, err := cc.cc.RoundTrip((&http.Request{
+		Method: ":ping",
+	}).WithContext(ctx))
+	return err
 }
 
 func (cc *ClientConn) reserveNewRequest() bool {
-	return false
+	cc.mu.Lock()
+	defer cc.mu.Unlock()
+	if cc.doNotReuse {
+		return false
+	}
+	if err := cc.cc.Reserve(); err != nil {
+		return false
+	}
+	cc.reserved++
+	return true
 }
 
 func (cc *ClientConn) setDoNotReuse() {
+	cc.mu.Lock()
+	defer cc.mu.Unlock()
+	cc.doNotReuse = true
+	cc.closing = true
 }
 
 func (cc *ClientConn) shutdown(ctx context.Context) error {
+	cc.mu.Lock()
+	inFlight := cc.cc.InFlight() + cc.starting
+	if inFlight > 0 && cc.shutdownc == nil {
+		cc.shutdownc = make(chan struct{})
+	}
+	shutdownc := cc.shutdownc
+	cc.mu.Unlock()
+	if shutdownc != nil {
+		// Wait for in-flight requests to finish.
+		select {
+		case <-shutdownc:
+		case <-ctx.Done():
+			return ctx.Err()
+		}
+	}
+	cc.cc.Close()
 	return nil
 }
 
 func (cc *ClientConn) state() ClientConnState {
-	return ClientConnState{}
+	cc.mu.Lock()
+	defer cc.mu.Unlock()
+	cc.updateStateLocked()
+	return ClientConnState{
+		Closed:               cc.closed,
+		Closing:              cc.closing,
+		StreamsActive:        cc.cc.InFlight() - cc.reserved,
+		StreamsReserved:      cc.reserved,
+		StreamsPending:       cc.pending,
+		MaxConcurrentStreams: uint32(min(cc.maxConcurrent, math.MaxUint32)),
+		LastIdle:             cc.lastIdle,
+	}
+}
+
+// stateHook is the http.ClientConn's state hook.
+func (cc *ClientConn) stateHook(*http.ClientConn) {
+	cc.mu.Lock()
+	defer cc.mu.Unlock()
+	cc.updateStateLocked()
+}
+
+func (cc *ClientConn) updateStateLocked() {
+	if cc.cc.Err() != nil && !cc.closed {
+		cc.closing = true
+		cc.closed = true
+		if cc.tr.ConnPool != nil {
+			// Do the ConnPool update in another goroutine,
+			// to avoid holding the conn mutex while it runs.
+			go cc.tr.ConnPool.MarkDead(cc)
+		}
+	}
+	if cc.cc.InFlight() == 0 && cc.roundTrips > 0 && cc.starting == 0 {
+		cc.lastIdle = time.Now()
+	}
+	if !cc.closed {
+		// This is slightly racy (a request could start or finish in between
+		// the Available and InFlight calls), but the best we can do given that
+		// the net/http ClientConn API doesn't expose the conn's max concurrency.
+		cc.maxConcurrent = cc.cc.Available() + cc.cc.InFlight()
+	}
+	if cc.shutdownc != nil && cc.cc.InFlight()+cc.starting == 0 {
+		close(cc.shutdownc)
+		cc.shutdownc = nil
+	}
 }
 
 func (cc *ClientConn) stopIdleTimer() {}
 
+// traceGotConn is (when http2wrap is enabled) only used for tracing
+// connections acquired while using a user-provided ClientConnPool.
 func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
+	trace := httptrace.ContextClientTrace(req.Context())
+	if trace == nil || trace.GotConn == nil {
+		return
+	}
+	ci := httptrace.GotConnInfo{Conn: cc.tconn}
+	ci.Reused = reused
+	trace.GotConn(ci)
 }