http2: fix Transport race sending RST_STREAM while reading DATA on cancels

If the client canceled a request, the Transport would then send a
RST_STREAM, but also would close the Response.Body's pipe. Meanwhile,
the server's DATA response could already be on the wire in
flight. We'd then read it, attempt to write its bytes to the buffer,
find it already closed, bubble up that error, and ultimately close the
whole TCP connection (breaking all other open streams).

So, don't do that.

Keep track of which connections we've sent RST_STREAM to, and ignore
DATA frames on those streams.

Updates golang/go#16974 (fixes after bundle to std)

Change-Id: Ic29a3aefff5241146cd2ca80aafa35fc4fb18b6e
Reviewed-on: https://go-review.googlesource.com/32571
Reviewed-by: Tom Bergan <tombergan@google.com>
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/http2/transport.go b/http2/transport.go
index 129c8e0..8f5f844 100644
--- a/http2/transport.go
+++ b/http2/transport.go
@@ -199,6 +199,7 @@
 	bytesRemain int64 // -1 means unknown; owned by transportResponseBody.Read
 	readErr     error // sticky read error; owned by transportResponseBody.Read
 	stopReqBody error // if non-nil, stop writing req body; guarded by cc.mu
+	didReset    bool  // whether we sent a RST_STREAM to the server; guarded by cc.mu
 
 	peerReset chan struct{} // closed on peer reset
 	resetErr  error         // populated before peerReset is closed
@@ -226,15 +227,26 @@
 	}
 	select {
 	case <-req.Cancel:
+		cs.cancelStream()
 		cs.bufPipe.CloseWithError(errRequestCanceled)
-		cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
 	case <-ctx.Done():
+		cs.cancelStream()
 		cs.bufPipe.CloseWithError(ctx.Err())
-		cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
 	case <-cs.done:
 	}
 }
 
+func (cs *clientStream) cancelStream() {
+	cs.cc.mu.Lock()
+	didReset := cs.didReset
+	cs.didReset = true
+	cs.cc.mu.Unlock()
+
+	if !didReset {
+		cs.cc.writeStreamReset(cs.ID, ErrCodeCancel, nil)
+	}
+}
+
 // checkResetOrDone reports any error sent in a RST_STREAM frame by the
 // server, or errStreamClosed if the stream is complete.
 func (cs *clientStream) checkResetOrDone() error {
@@ -1666,9 +1678,10 @@
 			cc.bw.Flush()
 			cc.wmu.Unlock()
 		}
+		didReset := cs.didReset
 		cc.mu.Unlock()
 
-		if len(data) > 0 {
+		if len(data) > 0 && !didReset {
 			if _, err := cs.bufPipe.Write(data); err != nil {
 				rl.endStreamError(cs, err)
 				return err
diff --git a/http2/transport_test.go b/http2/transport_test.go
index 2006a3d..f9287e5 100644
--- a/http2/transport_test.go
+++ b/http2/transport_test.go
@@ -2690,3 +2690,58 @@
 		t.Fatal(err)
 	}
 }
+
+// Issue 16974: if the server sent a DATA frame after the user
+// canceled the Transport's Request, the Transport previously wrote to a
+// closed pipe, got an error, and ended up closing the whole TCP
+// connection.
+func TestTransportCancelDataResponseRace(t *testing.T) {
+	cancel := make(chan struct{})
+	clientGotError := make(chan bool, 1)
+
+	const msg = "Hello."
+	st := newServerTester(t, func(w http.ResponseWriter, r *http.Request) {
+		if strings.Contains(r.URL.Path, "/hello") {
+			time.Sleep(50 * time.Millisecond)
+			io.WriteString(w, msg)
+			return
+		}
+		for i := 0; i < 50; i++ {
+			io.WriteString(w, "Some data.")
+			w.(http.Flusher).Flush()
+			if i == 2 {
+				close(cancel)
+				<-clientGotError
+			}
+			time.Sleep(10 * time.Millisecond)
+		}
+	}, optOnlyServer)
+	defer st.Close()
+
+	tr := &Transport{TLSClientConfig: tlsConfigInsecure}
+	defer tr.CloseIdleConnections()
+
+	c := &http.Client{Transport: tr}
+	req, _ := http.NewRequest("GET", st.ts.URL, nil)
+	req.Cancel = cancel
+	res, err := c.Do(req)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if _, err = io.Copy(ioutil.Discard, res.Body); err == nil {
+		t.Fatal("unexpected success")
+	}
+	clientGotError <- true
+
+	res, err = c.Get(st.ts.URL + "/hello")
+	if err != nil {
+		t.Fatal(err)
+	}
+	slurp, err := ioutil.ReadAll(res.Body)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if string(slurp) != msg {
+		t.Errorf("Got = %q; want %q", slurp, msg)
+	}
+}