http2: support Request.Cancel in Transport

Tests are in a separate change, part of the net/http package in the
main go repo.

Updates golang/go#13159

Change-Id: I236dea7cd076910e908df7e7160d490da56014c8
Reviewed-on: https://go-review.googlesource.com/17757
Reviewed-by: Ian Lance Taylor <iant@golang.org>
diff --git a/http2/pipe.go b/http2/pipe.go
index 96a3eb8..e30661c 100644
--- a/http2/pipe.go
+++ b/http2/pipe.go
@@ -14,10 +14,11 @@
 // io.Pipe except there are no PipeReader/PipeWriter halves, and the
 // underlying buffer is an interface. (io.Pipe is always unbuffered)
 type pipe struct {
-	mu  sync.Mutex
-	c   sync.Cond // c.L must point to
-	b   pipeBuffer
-	err error // read error once empty. non-nil means closed.
+	mu    sync.Mutex
+	c     sync.Cond // c.L must point to
+	b     pipeBuffer
+	err   error         // read error once empty. non-nil means closed.
+	donec chan struct{} // closed on error
 }
 
 type pipeBuffer interface {
@@ -78,6 +79,9 @@
 	defer p.c.Signal()
 	if p.err == nil {
 		p.err = err
+		if p.donec != nil {
+			close(p.donec)
+		}
 	}
 }
 
@@ -88,3 +92,18 @@
 	defer p.mu.Unlock()
 	return p.err
 }
+
+// Done returns a channel which is closed if and when this pipe is closed
+// with CloseWithError.
+func (p *pipe) Done() <-chan struct{} {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.donec == nil {
+		p.donec = make(chan struct{})
+		if p.err != nil {
+			// Already hit an error.
+			close(p.donec)
+		}
+	}
+	return p.donec
+}
diff --git a/http2/pipe_test.go b/http2/pipe_test.go
index 002ce05..b35b2df 100644
--- a/http2/pipe_test.go
+++ b/http2/pipe_test.go
@@ -7,6 +7,7 @@
 import (
 	"bytes"
 	"errors"
+	"io"
 	"testing"
 )
 
@@ -22,3 +23,30 @@
 		t.Errorf("err = %v want %v", err, a)
 	}
 }
+
+func TestPipeDoneChan(t *testing.T) {
+	var p pipe
+	done := p.Done()
+	select {
+	case <-done:
+		t.Fatal("done too soon")
+	default:
+	}
+	p.CloseWithError(io.EOF)
+	select {
+	case <-done:
+	default:
+		t.Fatal("should be done")
+	}
+}
+
+func TestPipeDoneChan_ErrFirst(t *testing.T) {
+	var p pipe
+	p.CloseWithError(io.EOF)
+	done := p.Done()
+	select {
+	case <-done:
+	default:
+		t.Fatal("should be done")
+	}
+}
diff --git a/http2/transport.go b/http2/transport.go
index 3327a6d..d9baa66 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -161,6 +161,18 @@
 	resetErr  error         // populated before peerReset is closed
 }
 
+// awaitRequestCancel runs in its own goroutine and waits for the user's
+func (cs *clientStream) awaitRequestCancel(cancel <-chan struct{}) {
+	if cancel == nil {
+		return
+	}
+	select {
+	case <-cancel:
+		cs.bufPipe.CloseWithError(errRequestCanceled)
+	case <-cs.bufPipe.Done():
+	}
+}
+
 // checkReset reports any error sent in a RST_STREAM frame by the
 // server.
 func (cs *clientStream) checkReset() error {
@@ -465,6 +477,10 @@
 	// forget about it.
 }
 
+// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
+// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
+var errRequestCanceled = errors.New("net/http: request canceled")
+
 func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
 	cc.mu.Lock()
 
@@ -522,6 +538,10 @@
 			cc.fr.WriteContinuation(cs.ID, endHeaders, chunk)
 		}
 	}
+	// TODO(bradfitz): this Flush could potentially block (as
+	// could the WriteHeaders call(s) above), which means they
+	// wouldn't respond to Request.Cancel being readable. That's
+	// rare, but this should probably be in a goroutine.
 	cc.bw.Flush()
 	werr := cc.werr
 	cc.wmu.Unlock()
@@ -561,6 +581,9 @@
 			res.Request = req
 			res.TLS = cc.tlsState
 			return res, nil
+		case <-req.Cancel:
+			cs.abortRequestBodyWrite()
+			return nil, errRequestCanceled
 		case err := <-bodyCopyErrc:
 			if err != nil {
 				return nil, err
@@ -935,6 +958,7 @@
 		cs.bufPipe = pipe{b: buf}
 		cs.bytesRemain = res.ContentLength
 		res.Body = transportResponseBody{cs}
+		go cs.awaitRequestCancel(cs.req.Cancel)
 
 		if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
 			res.Header.Del("Content-Encoding")