quic: avoid panic when PTO expires and implicitly-created streams exist
The streams map contains nil entries for implicitly-created streams.
(Receiving a packet for stream N implicitly creates all streams of the
same type <N.)
We weren't checking for nil entries when iterating the map on PTO,
resulting in a panic.
Change the map value to be a wrapper type to make it more explicit that
nil entries exist.
Change-Id: I070c6d60631744018a6e6f2645c95a2f3d3d24b6
Reviewed-on: https://go-review.googlesource.com/c/net/+/550798
Reviewed-by: Jonathan Amsterdam <jba@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
diff --git a/internal/quic/conn_streams.go b/internal/quic/conn_streams.go
index dc82f8b..87cfd29 100644
--- a/internal/quic/conn_streams.go
+++ b/internal/quic/conn_streams.go
@@ -14,8 +14,16 @@
)
type streamsState struct {
- queue queue[*Stream] // new, peer-created streams
- streams map[streamID]*Stream
+ queue queue[*Stream] // new, peer-created streams
+
+ // All peer-created streams.
+ //
+ // Implicitly created streams are included as an empty entry in the map.
+ // (For example, if we receive a frame for stream 4, we implicitly create stream 0 and
+ // insert an empty entry for it to the map.)
+ //
+ // The map value is maybeStream rather than *Stream as a reminder that values can be nil.
+ streams map[streamID]maybeStream
// Limits on the number of streams, indexed by streamType.
localLimit [streamTypeCount]localStreamLimits
@@ -37,8 +45,13 @@
queueData streamRing // streams with only flow-controlled frames
}
+// maybeStream is a possibly nil *Stream. See streamsState.streams.
+type maybeStream struct {
+ s *Stream
+}
+
func (c *Conn) streamsInit() {
- c.streams.streams = make(map[streamID]*Stream)
+ c.streams.streams = make(map[streamID]maybeStream)
c.streams.queue = newQueue[*Stream]()
c.streams.localLimit[bidiStream].init()
c.streams.localLimit[uniStream].init()
@@ -52,8 +65,8 @@
c.streams.localLimit[bidiStream].connHasClosed()
c.streams.localLimit[uniStream].connHasClosed()
for _, s := range c.streams.streams {
- if s != nil {
- s.connHasClosed()
+ if s.s != nil {
+ s.s.connHasClosed()
}
}
}
@@ -97,7 +110,7 @@
// Modify c.streams on the conn's loop.
if err := c.runOnLoop(ctx, func(now time.Time, c *Conn) {
- c.streams.streams[s.id] = s
+ c.streams.streams[s.id] = maybeStream{s}
}); err != nil {
return nil, err
}
@@ -119,7 +132,7 @@
// streamForID returns the stream with the given id.
// If the stream does not exist, it returns nil.
func (c *Conn) streamForID(id streamID) *Stream {
- return c.streams.streams[id]
+ return c.streams.streams[id].s
}
// streamForFrame returns the stream with the given id.
@@ -144,9 +157,9 @@
}
}
- s, isOpen := c.streams.streams[id]
- if s != nil {
- return s
+ ms, isOpen := c.streams.streams[id]
+ if ms.s != nil {
+ return ms.s
}
num := id.num()
@@ -183,10 +196,10 @@
// with the same initiator and type and a lower number.
// Add a nil entry to the streams map for each implicitly created stream.
for n := newStreamID(id.initiator(), id.streamType(), prevOpened); n < id; n += 4 {
- c.streams.streams[n] = nil
+ c.streams.streams[n] = maybeStream{}
}
- s = newStream(c, id)
+ s := newStream(c, id)
s.inmaxbuf = c.config.maxStreamReadBufferSize()
s.inwin = c.config.maxStreamReadBufferSize()
if id.streamType() == bidiStream {
@@ -196,7 +209,7 @@
s.inUnlock()
s.outUnlock()
- c.streams.streams[id] = s
+ c.streams.streams[id] = maybeStream{s}
c.streams.queue.put(s)
return s
}
@@ -400,7 +413,11 @@
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
const pto = true
- for _, s := range c.streams.streams {
+ for _, ms := range c.streams.streams {
+ s := ms.s
+ if s == nil {
+ continue
+ }
const pto = true
s.ingate.lock()
inOK := s.appendInFramesLocked(w, pnum, pto)
diff --git a/internal/quic/conn_streams_test.go b/internal/quic/conn_streams_test.go
index 90f5cb7..fb9af47 100644
--- a/internal/quic/conn_streams_test.go
+++ b/internal/quic/conn_streams_test.go
@@ -522,3 +522,38 @@
t.Errorf("accepted %v streams, want %v", got, want)
}
}
+
+func TestStreamsPTOWithImplicitStream(t *testing.T) {
+ ctx := canceledContext()
+ tc := newTestConn(t, serverSide, permissiveTransportParameters)
+ tc.handshake()
+ tc.ignoreFrame(frameTypeAck)
+
+ // Peer creates stream 1, and implicitly creates stream 0.
+ tc.writeFrames(packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, bidiStream, 1),
+ })
+
+ // We accept stream 1 and write data to it.
+ data := []byte("data")
+ s, err := tc.conn.AcceptStream(ctx)
+ if err != nil {
+ t.Fatalf("conn.AcceptStream() = %v, want stream", err)
+ }
+ s.Write(data)
+ s.Flush()
+ tc.wantFrame("data written to stream",
+ packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, bidiStream, 1),
+ data: data,
+ })
+
+ // PTO expires, and the data is resent.
+ const pto = true
+ tc.triggerLossOrPTO(packetType1RTT, true)
+ tc.wantFrame("data resent after PTO expires",
+ packetType1RTT, debugFrameStream{
+ id: newStreamID(clientSide, bidiStream, 1),
+ data: data,
+ })
+}