quic: avoid deadlock when updating inbound conn-level flow control
handleStreamBytesReadOffLoop sends a message to the conn
indicating that we need to send a MAX_DATA update.
Calling this with a stream's gate locked can lead to a deadlock,
when the conn's loop is processing an inbound frame for the
same stream: The conn can't acquire the stream's ingate, and
the gate won't be unlocked until the conn processes another
event from its queue.
Move the handleStreamBytesReadOffLoop calls out of the gate.
No test in this CL, but a following CL contains a test which
reliably exercises the condition.
For golang/go#58547
Change-Id: Ic98888947f67408a4a1f6f4a3aaf68c3a2fe8e7f
Reviewed-on: https://go-review.googlesource.com/c/net/+/527580
Reviewed-by: Jonathan Amsterdam <jba@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/internal/quic/conn_flow.go b/internal/quic/conn_flow.go
index 265fdaf..cd9a6a9 100644
--- a/internal/quic/conn_flow.go
+++ b/internal/quic/conn_flow.go
@@ -47,6 +47,9 @@
//
// This is called indirectly by the user, via Read or CloseRead.
func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
+ if n == 0 {
+ return
+ }
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
// We should send a MAX_DATA update to the peer.
// Record this on the Conn's main loop.
diff --git a/internal/quic/conn_flow_test.go b/internal/quic/conn_flow_test.go
index 28559b4..2cd4e62 100644
--- a/internal/quic/conn_flow_test.go
+++ b/internal/quic/conn_flow_test.go
@@ -180,6 +180,7 @@
max: 128 + 32 + 1 + 1 + 1,
})
+ tc.ignoreFrame(frameTypeStopSending)
streams[2].CloseRead()
tc.wantFrame("closed stream triggers another MAX_DATA update",
packetType1RTT, debugFrameMaxData{
diff --git a/internal/quic/stream.go b/internal/quic/stream.go
index 923ff23..9310811 100644
--- a/internal/quic/stream.go
+++ b/internal/quic/stream.go
@@ -181,11 +181,13 @@
if s.IsWriteOnly() {
return 0, errors.New("read from write-only stream")
}
- // Wait until data is available.
if err := s.ingate.waitAndLock(ctx, s.conn.testHooks); err != nil {
return 0, err
}
- defer s.inUnlock()
+ defer func() {
+ s.inUnlock()
+ s.conn.handleStreamBytesReadOffLoop(int64(n)) // must be done with ingate unlocked
+ }()
if s.inresetcode != -1 {
return 0, fmt.Errorf("stream reset by peer: %w", StreamErrorCode(s.inresetcode))
}
@@ -205,7 +207,6 @@
start := s.in.start
end := start + int64(len(b))
s.in.copy(start, b)
- s.conn.handleStreamBytesReadOffLoop(int64(len(b)))
s.in.discardBefore(end)
if s.insize == -1 || s.insize > s.inwin {
if shouldUpdateFlowControl(s.inmaxbuf, s.in.start+s.inmaxbuf-s.inwin) {
@@ -334,7 +335,6 @@
return
}
s.ingate.lock()
- defer s.inUnlock()
if s.inset.isrange(0, s.insize) || s.inresetcode != -1 {
// We've already received all data from the peer,
// so there's no need to send STOP_SENDING.
@@ -343,8 +343,10 @@
} else {
s.inclosed.set()
}
- s.conn.handleStreamBytesReadOffLoop(s.in.end - s.in.start)
+ discarded := s.in.end - s.in.start
s.in.discardBefore(s.in.end)
+ s.inUnlock()
+ s.conn.handleStreamBytesReadOffLoop(discarded) // must be done with ingate unlocked
}
// CloseWrite aborts writes on the stream.