blob: 87cfd297edc96205c3b889c58c47744e5b4718f2 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.21
package quic
import (
"context"
"sync"
"sync/atomic"
"time"
)
type streamsState struct {
queue queue[*Stream] // new, peer-created streams
// All peer-created streams.
//
// Implicitly created streams are included as an empty entry in the map.
// (For example, if we receive a frame for stream 4, we implicitly create stream 0 and
// insert an empty entry for it to the map.)
//
// The map value is maybeStream rather than *Stream as a reminder that values can be nil.
streams map[streamID]maybeStream
// Limits on the number of streams, indexed by streamType.
localLimit [streamTypeCount]localStreamLimits
remoteLimit [streamTypeCount]remoteStreamLimits
// Peer configuration provided in transport parameters.
peerInitialMaxStreamDataRemote [streamTypeCount]int64 // streams opened by us
peerInitialMaxStreamDataBidiLocal int64 // streams opened by them
// Connection-level flow control.
inflow connInflow
outflow connOutflow
// Streams with frames to send are stored in one of two circular linked lists,
// depending on whether they require connection-level flow control.
needSend atomic.Bool
sendMu sync.Mutex
queueMeta streamRing // streams with any non-flow-controlled frames
queueData streamRing // streams with only flow-controlled frames
}
// maybeStream is a possibly nil *Stream. See streamsState.streams.
type maybeStream struct {
s *Stream
}
func (c *Conn) streamsInit() {
c.streams.streams = make(map[streamID]maybeStream)
c.streams.queue = newQueue[*Stream]()
c.streams.localLimit[bidiStream].init()
c.streams.localLimit[uniStream].init()
c.streams.remoteLimit[bidiStream].init(c.config.maxBidiRemoteStreams())
c.streams.remoteLimit[uniStream].init(c.config.maxUniRemoteStreams())
c.inflowInit()
}
func (c *Conn) streamsCleanup() {
c.streams.queue.close(errConnClosed)
c.streams.localLimit[bidiStream].connHasClosed()
c.streams.localLimit[uniStream].connHasClosed()
for _, s := range c.streams.streams {
if s.s != nil {
s.s.connHasClosed()
}
}
}
// AcceptStream waits for and returns the next stream created by the peer.
func (c *Conn) AcceptStream(ctx context.Context) (*Stream, error) {
return c.streams.queue.get(ctx, c.testHooks)
}
// NewStream creates a stream.
//
// If the peer's maximum stream limit for the connection has been reached,
// NewStream blocks until the limit is increased or the context expires.
func (c *Conn) NewStream(ctx context.Context) (*Stream, error) {
return c.newLocalStream(ctx, bidiStream)
}
// NewSendOnlyStream creates a unidirectional, send-only stream.
//
// If the peer's maximum stream limit for the connection has been reached,
// NewSendOnlyStream blocks until the limit is increased or the context expires.
func (c *Conn) NewSendOnlyStream(ctx context.Context) (*Stream, error) {
return c.newLocalStream(ctx, uniStream)
}
func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, error) {
num, err := c.streams.localLimit[styp].open(ctx, c)
if err != nil {
return nil, err
}
s := newStream(c, newStreamID(c.side, styp, num))
s.outmaxbuf = c.config.maxStreamWriteBufferSize()
s.outwin = c.streams.peerInitialMaxStreamDataRemote[styp]
if styp == bidiStream {
s.inmaxbuf = c.config.maxStreamReadBufferSize()
s.inwin = c.config.maxStreamReadBufferSize()
}
s.inUnlock()
s.outUnlock()
// Modify c.streams on the conn's loop.
if err := c.runOnLoop(ctx, func(now time.Time, c *Conn) {
c.streams.streams[s.id] = maybeStream{s}
}); err != nil {
return nil, err
}
return s, nil
}
// streamFrameType identifies which direction of a stream,
// from the local perspective, a frame is associated with.
//
// For example, STREAM is a recvStream frame,
// because it carries data from the peer to us.
type streamFrameType uint8
const (
sendStream = streamFrameType(iota) // for example, MAX_DATA
recvStream // for example, STREAM_DATA_BLOCKED
)
// streamForID returns the stream with the given id.
// If the stream does not exist, it returns nil.
func (c *Conn) streamForID(id streamID) *Stream {
return c.streams.streams[id].s
}
// streamForFrame returns the stream with the given id.
// If the stream does not exist, it may be created.
//
// streamForFrame aborts the connection if the stream id, state, and frame type don't align.
// For example, it aborts the connection with a STREAM_STATE error if a MAX_DATA frame
// is received for a receive-only stream, or if the peer attempts to create a stream that
// should be originated locally.
//
// streamForFrame returns nil if the stream no longer exists or if an error occurred.
func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) *Stream {
if id.streamType() == uniStream {
if (id.initiator() == c.side) != (ftype == sendStream) {
// Received an invalid frame for unidirectional stream.
// For example, a RESET_STREAM frame for a send-only stream.
c.abort(now, localTransportError{
code: errStreamState,
reason: "invalid frame for unidirectional stream",
})
return nil
}
}
ms, isOpen := c.streams.streams[id]
if ms.s != nil {
return ms.s
}
num := id.num()
styp := id.streamType()
if id.initiator() == c.side {
if num < c.streams.localLimit[styp].opened {
// This stream was created by us, and has been closed.
return nil
}
// Received a frame for a stream that should be originated by us,
// but which we never created.
c.abort(now, localTransportError{
code: errStreamState,
reason: "received frame for unknown stream",
})
return nil
} else {
// if isOpen, this is a stream that was implicitly opened by a
// previous frame for a larger-numbered stream, but we haven't
// actually created it yet.
if !isOpen && num < c.streams.remoteLimit[styp].opened {
// This stream was created by the peer, and has been closed.
return nil
}
}
prevOpened := c.streams.remoteLimit[styp].opened
if err := c.streams.remoteLimit[styp].open(id); err != nil {
c.abort(now, err)
return nil
}
// Receiving a frame for a stream implicitly creates all streams
// with the same initiator and type and a lower number.
// Add a nil entry to the streams map for each implicitly created stream.
for n := newStreamID(id.initiator(), id.streamType(), prevOpened); n < id; n += 4 {
c.streams.streams[n] = maybeStream{}
}
s := newStream(c, id)
s.inmaxbuf = c.config.maxStreamReadBufferSize()
s.inwin = c.config.maxStreamReadBufferSize()
if id.streamType() == bidiStream {
s.outmaxbuf = c.config.maxStreamWriteBufferSize()
s.outwin = c.streams.peerInitialMaxStreamDataBidiLocal
}
s.inUnlock()
s.outUnlock()
c.streams.streams[id] = maybeStream{s}
c.streams.queue.put(s)
return s
}
// maybeQueueStreamForSend marks a stream as containing frames that need sending.
func (c *Conn) maybeQueueStreamForSend(s *Stream, state streamState) {
if state.wantQueue() == state.inQueue() {
return // already on the right queue
}
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
state = s.state.load() // may have changed while waiting
c.queueStreamForSendLocked(s, state)
c.streams.needSend.Store(true)
c.wake()
}
// queueStreamForSendLocked moves a stream to the correct send queue,
// or removes it from all queues.
//
// state is the last known stream state.
func (c *Conn) queueStreamForSendLocked(s *Stream, state streamState) {
for {
wantQueue := state.wantQueue()
inQueue := state.inQueue()
if inQueue == wantQueue {
return // already on the right queue
}
switch inQueue {
case metaQueue:
c.streams.queueMeta.remove(s)
case dataQueue:
c.streams.queueData.remove(s)
}
switch wantQueue {
case metaQueue:
c.streams.queueMeta.append(s)
state = s.state.set(streamQueueMeta, streamQueueMeta|streamQueueData)
case dataQueue:
c.streams.queueData.append(s)
state = s.state.set(streamQueueData, streamQueueMeta|streamQueueData)
case noQueue:
state = s.state.set(0, streamQueueMeta|streamQueueData)
}
// If the stream state changed while we were moving the stream,
// we might now be on the wrong queue.
//
// For example:
// - stream has data to send: streamOutSendData|streamQueueData
// - appendStreamFrames sends all the data: streamQueueData
// - concurrently, more data is written: streamOutSendData|streamQueueData
// - appendStreamFrames calls us with the last state it observed
// (streamQueueData).
// - We remove the stream from the queue and observe the updated state:
// streamOutSendData
// - We realize that the stream needs to go back on the data queue.
//
// Go back around the loop to confirm we're on the correct queue.
}
}
// appendStreamFrames writes stream-related frames to the current packet.
//
// It returns true if no more frames need appending,
// false if not everything fit in the current packet.
func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool {
// MAX_DATA
if !c.appendMaxDataFrame(w, pnum, pto) {
return false
}
// MAX_STREAM_DATA
if !c.streams.remoteLimit[uniStream].appendFrame(w, uniStream, pnum, pto) {
return false
}
if !c.streams.remoteLimit[bidiStream].appendFrame(w, bidiStream, pnum, pto) {
return false
}
if pto {
return c.appendStreamFramesPTO(w, pnum)
}
if !c.streams.needSend.Load() {
return true
}
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
// queueMeta contains streams with non-flow-controlled frames to send.
for c.streams.queueMeta.head != nil {
s := c.streams.queueMeta.head
state := s.state.load()
if state&(streamQueueMeta|streamConnRemoved) != streamQueueMeta {
panic("BUG: queueMeta stream is not streamQueueMeta")
}
if state&streamInSendMeta != 0 {
s.ingate.lock()
ok := s.appendInFramesLocked(w, pnum, pto)
state = s.inUnlockNoQueue()
if !ok {
return false
}
if state&streamInSendMeta != 0 {
panic("BUG: streamInSendMeta set after successfully appending frames")
}
}
if state&streamOutSendMeta != 0 {
s.outgate.lock()
// This might also append flow-controlled frames if we have any
// and available conn-level quota. That's fine.
ok := s.appendOutFramesLocked(w, pnum, pto)
state = s.outUnlockNoQueue()
// We're checking both ok and state, because appendOutFramesLocked
// might have filled up the packet with flow-controlled data.
// If so, we want to move the stream to queueData for any remaining frames.
if !ok && state&streamOutSendMeta != 0 {
return false
}
if state&streamOutSendMeta != 0 {
panic("BUG: streamOutSendMeta set after successfully appending frames")
}
}
// We've sent all frames for this stream, so remove it from the send queue.
c.streams.queueMeta.remove(s)
if state&(streamInDone|streamOutDone) == streamInDone|streamOutDone {
// Stream is finished, remove it from the conn.
state = s.state.set(streamConnRemoved, streamQueueMeta|streamConnRemoved)
delete(c.streams.streams, s.id)
// Record finalization of remote streams, to know when
// to extend the peer's stream limit.
if s.id.initiator() != c.side {
c.streams.remoteLimit[s.id.streamType()].close()
}
} else {
state = s.state.set(0, streamQueueMeta|streamConnRemoved)
}
// The stream may have flow-controlled data to send,
// or something might have added non-flow-controlled frames after we
// unlocked the stream.
// If so, put the stream back on a queue.
c.queueStreamForSendLocked(s, state)
}
// queueData contains streams with flow-controlled frames.
for c.streams.queueData.head != nil {
avail := c.streams.outflow.avail()
if avail == 0 {
break // no flow control quota available
}
s := c.streams.queueData.head
s.outgate.lock()
ok := s.appendOutFramesLocked(w, pnum, pto)
state := s.outUnlockNoQueue()
if !ok {
// We've sent some data for this stream, but it still has more to send.
// If the stream got a reasonable chance to put data in a packet,
// advance sendHead to the next stream in line, to avoid starvation.
// We'll come back to this stream after going through the others.
//
// If the packet was already mostly out of space, leave sendHead alone
// and come back to this stream again on the next packet.
if avail > 512 {
c.streams.queueData.head = s.next
}
return false
}
if state&streamQueueData == 0 {
panic("BUG: queueData stream is not streamQueueData")
}
if state&streamOutSendData != 0 {
// We must have run out of connection-level flow control:
// appendOutFramesLocked says it wrote all it can, but there's
// still data to send.
//
// Advance sendHead to the next stream in line to avoid starvation.
if c.streams.outflow.avail() != 0 {
panic("BUG: streamOutSendData set and flow control available after send")
}
c.streams.queueData.head = s.next
return true
}
c.streams.queueData.remove(s)
state = s.state.set(0, streamQueueData)
c.queueStreamForSendLocked(s, state)
}
if c.streams.queueMeta.head == nil && c.streams.queueData.head == nil {
c.streams.needSend.Store(false)
}
return true
}
// appendStreamFramesPTO writes stream-related frames to the current packet
// for a PTO probe.
//
// It returns true if no more frames need appending,
// false if not everything fit in the current packet.
func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool {
c.streams.sendMu.Lock()
defer c.streams.sendMu.Unlock()
const pto = true
for _, ms := range c.streams.streams {
s := ms.s
if s == nil {
continue
}
const pto = true
s.ingate.lock()
inOK := s.appendInFramesLocked(w, pnum, pto)
s.inUnlockNoQueue()
if !inOK {
return false
}
s.outgate.lock()
outOK := s.appendOutFramesLocked(w, pnum, pto)
s.outUnlockNoQueue()
if !outOK {
return false
}
}
return true
}
// A streamRing is a circular linked list of streams.
type streamRing struct {
head *Stream
}
// remove removes s from the ring.
// s must be on the ring.
func (r *streamRing) remove(s *Stream) {
if s.next == s {
r.head = nil // s was the last stream in the ring
} else {
s.prev.next = s.next
s.next.prev = s.prev
if r.head == s {
r.head = s.next
}
}
}
// append places s at the last position in the ring.
// s must not be attached to any ring.
func (r *streamRing) append(s *Stream) {
if r.head == nil {
r.head = s
s.next = s
s.prev = s
} else {
s.prev = r.head.prev
s.next = r.head
s.prev.next = s
s.next.prev = s
}
}