http2: support net/http.Transport.NewClientConn

Permit net/http to create new HTTP/2 client connections.

We do this by adding a NewClientConn method to the type the HTTP/2 client
registers with net/http.Transport.RegisterProtocol, which creates a
persistent connection from a net.Conn.

No tests in this CL. Tests will be in net/http, and will cover
both the HTTP/1 and HTTP/2 paths for NewClientConn.

For golang/go#75772

Change-Id: Ib1a06b4d13fdd6008e5db9a090c6e9632029a2a4
Reviewed-on: https://go-review.googlesource.com/c/net/+/722200
Reviewed-by: Nicholas Husin <husin@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Nicholas Husin <nsh@golang.org>
diff --git a/http2/clientconn_test.go b/http2/clientconn_test.go
index a9200a0..365e3b9 100644
--- a/http2/clientconn_test.go
+++ b/http2/clientconn_test.go
@@ -170,7 +170,7 @@
 
 	tt := newTestTransport(t, opts...)
 	const singleUse = false
-	_, err := tt.tr.newClientConn(nil, singleUse)
+	_, err := tt.tr.newClientConn(nil, singleUse, nil)
 	if err != nil {
 		t.Fatalf("newClientConn: %v", err)
 	}
diff --git a/http2/transport.go b/http2/transport.go
index 1965913..f1f0359 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -381,6 +381,12 @@
 	// Lock reqmu BEFORE mu or wmu.
 	reqHeaderMu chan struct{}
 
+	// internalStateHook reports state changes back to the net/http.ClientConn.
+	// Note that this is different from the user state hook registered by
+	// net/http.ClientConn.SetStateHook: The internal hook calls ClientConn,
+	// which calls the user hook.
+	internalStateHook func()
+
 	// wmu is held while writing.
 	// Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
 	// Only acquire both at the same time when changing peer settings.
@@ -710,7 +716,7 @@
 
 func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
 	if t.transportTestHooks != nil {
-		return t.newClientConn(nil, singleUse)
+		return t.newClientConn(nil, singleUse, nil)
 	}
 	host, _, err := net.SplitHostPort(addr)
 	if err != nil {
@@ -720,7 +726,7 @@
 	if err != nil {
 		return nil, err
 	}
-	return t.newClientConn(tconn, singleUse)
+	return t.newClientConn(tconn, singleUse, nil)
 }
 
 func (t *Transport) newTLSConfig(host string) *tls.Config {
@@ -772,10 +778,10 @@
 }
 
 func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
-	return t.newClientConn(c, t.disableKeepAlives())
+	return t.newClientConn(c, t.disableKeepAlives(), nil)
 }
 
-func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
+func (t *Transport) newClientConn(c net.Conn, singleUse bool, internalStateHook func()) (*ClientConn, error) {
 	conf := configFromTransport(t)
 	cc := &ClientConn{
 		t:                           t,
@@ -797,6 +803,7 @@
 		pings:                       make(map[[8]byte]chan struct{}),
 		reqHeaderMu:                 make(chan struct{}, 1),
 		lastActive:                  time.Now(),
+		internalStateHook:           internalStateHook,
 	}
 	if t.transportTestHooks != nil {
 		t.transportTestHooks.newclientconn(cc)
@@ -1037,10 +1044,7 @@
 		maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
 	}
 
-	st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
-		!cc.doNotReuse &&
-		int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
-		!cc.tooIdleLocked()
+	st.canTakeNewRequest = maxConcurrentOkay && cc.isUsableLocked()
 
 	// If this connection has never been used for a request and is closed,
 	// then let it take a request (which will fail).
@@ -1056,6 +1060,31 @@
 	return
 }
 
+func (cc *ClientConn) isUsableLocked() bool {
+	return cc.goAway == nil &&
+		!cc.closed &&
+		!cc.closing &&
+		!cc.doNotReuse &&
+		int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
+		!cc.tooIdleLocked()
+}
+
+// canReserveLocked reports whether a net/http.ClientConn can reserve a slot on this conn.
+//
+// This follows slightly different rules than clientConnIdleState.canTakeNewRequest.
+// We only permit reservations up to the conn's concurrency limit.
+// This differs from ClientConn.ReserveNewRequest, which permits reservations
+// past the limit when StrictMaxConcurrentStreams is set.
+func (cc *ClientConn) canReserveLocked() bool {
+	if cc.currentRequestCountLocked() >= int(cc.maxConcurrentStreams) {
+		return false
+	}
+	if !cc.isUsableLocked() {
+		return false
+	}
+	return true
+}
+
 // currentRequestCountLocked reports the number of concurrency slots currently in use,
 // including active streams, reserved slots, and reset streams waiting for acknowledgement.
 func (cc *ClientConn) currentRequestCountLocked() int {
@@ -1067,6 +1096,14 @@
 	return st.canTakeNewRequest
 }
 
+// availableLocked reports the number of concurrency slots available.
+func (cc *ClientConn) availableLocked() int {
+	if !cc.canTakeNewRequestLocked() {
+		return 0
+	}
+	return max(0, int(cc.maxConcurrentStreams)-cc.currentRequestCountLocked())
+}
+
 // tooIdleLocked reports whether this connection has been been sitting idle
 // for too much wall time.
 func (cc *ClientConn) tooIdleLocked() bool {
@@ -1091,6 +1128,7 @@
 	t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
 	defer t.Stop()
 	cc.tconn.Close()
+	cc.maybeCallStateHook()
 }
 
 // A tls.Conn.Close can hang for a long time if the peer is unresponsive.
@@ -1693,6 +1731,7 @@
 	}
 
 	close(cs.donec)
+	cc.maybeCallStateHook()
 }
 
 // awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
@@ -2795,6 +2834,7 @@
 
 func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
 	cc := rl.cc
+	defer cc.maybeCallStateHook()
 	cc.mu.Lock()
 	defer cc.mu.Unlock()
 
@@ -2975,6 +3015,7 @@
 func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
 	if f.IsAck() {
 		cc := rl.cc
+		defer cc.maybeCallStateHook()
 		cc.mu.Lock()
 		defer cc.mu.Unlock()
 		// If ack, notify listener if any
@@ -3198,9 +3239,13 @@
 }
 
 // noDialH2RoundTripper is a RoundTripper which only tries to complete the request
-// if there's already has a cached connection to the host.
+// if there's already a cached connection to the host.
 // (The field is exported so it can be accessed via reflect from net/http; tested
 // by TestNoDialH2RoundTripperType)
+//
+// A noDialH2RoundTripper is registered with http1.Transport.RegisterProtocol,
+// and the http1.Transport can use type assertions to call non-RoundTrip methods on it.
+// This lets us expose, for example, NewClientConn to net/http.
 type noDialH2RoundTripper struct{ *Transport }
 
 func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
@@ -3211,6 +3256,85 @@
 	return res, err
 }
 
+func (rt noDialH2RoundTripper) NewClientConn(conn net.Conn, internalStateHook func()) (http.RoundTripper, error) {
+	tr := rt.Transport
+	cc, err := tr.newClientConn(conn, tr.disableKeepAlives(), internalStateHook)
+	if err != nil {
+		return nil, err
+	}
+
+	// RoundTrip should block when the conn is at its concurrency limit,
+	// not return an error. Setting strictMaxConcurrentStreams enables this.
+	cc.strictMaxConcurrentStreams = true
+
+	return netHTTPClientConn{cc}, nil
+}
+
+// netHTTPClientConn wraps ClientConn and implements the interface net/http expects from
+// the RoundTripper returned by NewClientConn.
+type netHTTPClientConn struct {
+	cc *ClientConn
+}
+
+func (cc netHTTPClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
+	return cc.cc.RoundTrip(req)
+}
+
+func (cc netHTTPClientConn) Close() error {
+	return cc.cc.Close()
+}
+
+func (cc netHTTPClientConn) Err() error {
+	cc.cc.mu.Lock()
+	defer cc.cc.mu.Unlock()
+	if cc.cc.closed {
+		return errors.New("connection closed")
+	}
+	return nil
+}
+
+func (cc netHTTPClientConn) Reserve() error {
+	defer cc.cc.maybeCallStateHook()
+	cc.cc.mu.Lock()
+	defer cc.cc.mu.Unlock()
+	if !cc.cc.canReserveLocked() {
+		return errors.New("connection is unavailable")
+	}
+	cc.cc.streamsReserved++
+	return nil
+}
+
+func (cc netHTTPClientConn) Release() {
+	defer cc.cc.maybeCallStateHook()
+	cc.cc.mu.Lock()
+	defer cc.cc.mu.Unlock()
+	// We don't complain if streamsReserved is 0.
+	//
+	// This is consistent with RoundTrip: both Release and RoundTrip will
+	// consume a reservation iff one exists.
+	if cc.cc.streamsReserved > 0 {
+		cc.cc.streamsReserved--
+	}
+}
+
+func (cc netHTTPClientConn) Available() int {
+	cc.cc.mu.Lock()
+	defer cc.cc.mu.Unlock()
+	return cc.cc.availableLocked()
+}
+
+func (cc netHTTPClientConn) InFlight() int {
+	cc.cc.mu.Lock()
+	defer cc.cc.mu.Unlock()
+	return cc.cc.currentRequestCountLocked()
+}
+
+func (cc *ClientConn) maybeCallStateHook() {
+	if cc.internalStateHook != nil {
+		cc.internalStateHook()
+	}
+}
+
 func (t *Transport) idleConnTimeout() time.Duration {
 	// to keep things backwards compatible, we use non-zero values of
 	// IdleConnTimeout, followed by using the IdleConnTimeout on the underlying