http2: support Request.Cancel in Transport
Tests are in a separate change, part of the net/http package in the
main go repo.
Updates golang/go#13159
Change-Id: I236dea7cd076910e908df7e7160d490da56014c8
Reviewed-on: https://go-review.googlesource.com/17757
Reviewed-by: Ian Lance Taylor <iant@golang.org>
diff --git a/http2/pipe.go b/http2/pipe.go
index 96a3eb8..e30661c 100644
--- a/http2/pipe.go
+++ b/http2/pipe.go
@@ -14,10 +14,11 @@
// io.Pipe except there are no PipeReader/PipeWriter halves, and the
// underlying buffer is an interface. (io.Pipe is always unbuffered)
type pipe struct {
- mu sync.Mutex
- c sync.Cond // c.L must point to
- b pipeBuffer
- err error // read error once empty. non-nil means closed.
+ mu sync.Mutex
+ c sync.Cond // c.L must point to
+ b pipeBuffer
+ err error // read error once empty. non-nil means closed.
+ donec chan struct{} // closed on error
}
type pipeBuffer interface {
@@ -78,6 +79,9 @@
defer p.c.Signal()
if p.err == nil {
p.err = err
+ if p.donec != nil {
+ close(p.donec)
+ }
}
}
@@ -88,3 +92,18 @@
defer p.mu.Unlock()
return p.err
}
+
+// Done returns a channel which is closed if and when this pipe is closed
+// with CloseWithError.
+func (p *pipe) Done() <-chan struct{} {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.donec == nil {
+ p.donec = make(chan struct{})
+ if p.err != nil {
+ // Already hit an error.
+ close(p.donec)
+ }
+ }
+ return p.donec
+}
diff --git a/http2/pipe_test.go b/http2/pipe_test.go
index 002ce05..b35b2df 100644
--- a/http2/pipe_test.go
+++ b/http2/pipe_test.go
@@ -7,6 +7,7 @@
import (
"bytes"
"errors"
+ "io"
"testing"
)
@@ -22,3 +23,30 @@
t.Errorf("err = %v want %v", err, a)
}
}
+
+func TestPipeDoneChan(t *testing.T) {
+ var p pipe
+ done := p.Done()
+ select {
+ case <-done:
+ t.Fatal("done too soon")
+ default:
+ }
+ p.CloseWithError(io.EOF)
+ select {
+ case <-done:
+ default:
+ t.Fatal("should be done")
+ }
+}
+
+func TestPipeDoneChan_ErrFirst(t *testing.T) {
+ var p pipe
+ p.CloseWithError(io.EOF)
+ done := p.Done()
+ select {
+ case <-done:
+ default:
+ t.Fatal("should be done")
+ }
+}
diff --git a/http2/transport.go b/http2/transport.go
index 3327a6d..d9baa66 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -161,6 +161,18 @@
resetErr error // populated before peerReset is closed
}
+// awaitRequestCancel runs in its own goroutine and waits for the user's
+func (cs *clientStream) awaitRequestCancel(cancel <-chan struct{}) {
+ if cancel == nil {
+ return
+ }
+ select {
+ case <-cancel:
+ cs.bufPipe.CloseWithError(errRequestCanceled)
+ case <-cs.bufPipe.Done():
+ }
+}
+
// checkReset reports any error sent in a RST_STREAM frame by the
// server.
func (cs *clientStream) checkReset() error {
@@ -465,6 +477,10 @@
// forget about it.
}
+// errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
+// exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
+var errRequestCanceled = errors.New("net/http: request canceled")
+
func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
cc.mu.Lock()
@@ -522,6 +538,10 @@
cc.fr.WriteContinuation(cs.ID, endHeaders, chunk)
}
}
+ // TODO(bradfitz): this Flush could potentially block (as
+ // could the WriteHeaders call(s) above), which means they
+ // wouldn't respond to Request.Cancel being readable. That's
+ // rare, but this should probably be in a goroutine.
cc.bw.Flush()
werr := cc.werr
cc.wmu.Unlock()
@@ -561,6 +581,9 @@
res.Request = req
res.TLS = cc.tlsState
return res, nil
+ case <-req.Cancel:
+ cs.abortRequestBodyWrite()
+ return nil, errRequestCanceled
case err := <-bodyCopyErrc:
if err != nil {
return nil, err
@@ -935,6 +958,7 @@
cs.bufPipe = pipe{b: buf}
cs.bytesRemain = res.ContentLength
res.Body = transportResponseBody{cs}
+ go cs.awaitRequestCancel(cs.req.Cancel)
if cs.requestedGzip && res.Header.Get("Content-Encoding") == "gzip" {
res.Header.Del("Content-Encoding")