| // Copyright 2023 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| //go:build go1.21 |
| |
| package quic |
| |
| import ( |
| "context" |
| "sync" |
| "sync/atomic" |
| "time" |
| ) |
| |
| type streamsState struct { |
| queue queue[*Stream] // new, peer-created streams |
| |
| streamsMu sync.Mutex |
| streams map[streamID]*Stream |
| opened [streamTypeCount]int64 // number of streams opened by us |
| |
| // Peer configuration provided in transport parameters. |
| peerInitialMaxStreamDataRemote [streamTypeCount]int64 // streams opened by us |
| peerInitialMaxStreamDataBidiLocal int64 // streams opened by them |
| |
| // Streams with frames to send are stored in a circular linked list. |
| // sendHead is the next stream to write, or nil if there are no streams |
| // with data to send. sendTail is the last stream to write. |
| needSend atomic.Bool |
| sendMu sync.Mutex |
| sendHead *Stream |
| sendTail *Stream |
| } |
| |
| func (c *Conn) streamsInit() { |
| c.streams.streams = make(map[streamID]*Stream) |
| c.streams.queue = newQueue[*Stream]() |
| } |
| |
| // AcceptStream waits for and returns the next stream created by the peer. |
| func (c *Conn) AcceptStream(ctx context.Context) (*Stream, error) { |
| return c.streams.queue.getWithHooks(ctx, c.testHooks) |
| } |
| |
| // NewStream creates a stream. |
| // |
| // If the peer's maximum stream limit for the connection has been reached, |
| // NewStream blocks until the limit is increased or the context expires. |
| func (c *Conn) NewStream(ctx context.Context) (*Stream, error) { |
| return c.newLocalStream(ctx, bidiStream) |
| } |
| |
| // NewSendOnlyStream creates a unidirectional, send-only stream. |
| // |
| // If the peer's maximum stream limit for the connection has been reached, |
| // NewSendOnlyStream blocks until the limit is increased or the context expires. |
| func (c *Conn) NewSendOnlyStream(ctx context.Context) (*Stream, error) { |
| return c.newLocalStream(ctx, uniStream) |
| } |
| |
| func (c *Conn) newLocalStream(ctx context.Context, styp streamType) (*Stream, error) { |
| // TODO: Stream limits. |
| c.streams.streamsMu.Lock() |
| defer c.streams.streamsMu.Unlock() |
| |
| num := c.streams.opened[styp] |
| c.streams.opened[styp]++ |
| |
| s := newStream(c, newStreamID(c.side, styp, num)) |
| s.outmaxbuf = c.config.streamWriteBufferSize() |
| s.outwin = c.streams.peerInitialMaxStreamDataRemote[styp] |
| if styp == bidiStream { |
| s.inmaxbuf = c.config.streamReadBufferSize() |
| s.inwin = c.config.streamReadBufferSize() |
| } |
| s.inUnlock() |
| s.outUnlock() |
| |
| c.streams.streams[s.id] = s |
| return s, nil |
| } |
| |
| // streamFrameType identifies which direction of a stream, |
| // from the local perspective, a frame is associated with. |
| // |
| // For example, STREAM is a recvStream frame, |
| // because it carries data from the peer to us. |
| type streamFrameType uint8 |
| |
| const ( |
| sendStream = streamFrameType(iota) // for example, MAX_DATA |
| recvStream // for example, STREAM_DATA_BLOCKED |
| ) |
| |
| // streamForID returns the stream with the given id. |
| // If the stream does not exist, it returns nil. |
| func (c *Conn) streamForID(id streamID) *Stream { |
| c.streams.streamsMu.Lock() |
| defer c.streams.streamsMu.Unlock() |
| return c.streams.streams[id] |
| } |
| |
| // streamForFrame returns the stream with the given id. |
| // If the stream does not exist, it may be created. |
| // |
| // streamForFrame aborts the connection if the stream id, state, and frame type don't align. |
| // For example, it aborts the connection with a STREAM_STATE error if a MAX_DATA frame |
| // is received for a receive-only stream, or if the peer attempts to create a stream that |
| // should be originated locally. |
| // |
| // streamForFrame returns nil if the stream no longer exists or if an error occurred. |
| func (c *Conn) streamForFrame(now time.Time, id streamID, ftype streamFrameType) *Stream { |
| if id.streamType() == uniStream { |
| if (id.initiator() == c.side) != (ftype == sendStream) { |
| // Received an invalid frame for unidirectional stream. |
| // For example, a RESET_STREAM frame for a send-only stream. |
| c.abort(now, localTransportError(errStreamState)) |
| return nil |
| } |
| } |
| |
| c.streams.streamsMu.Lock() |
| defer c.streams.streamsMu.Unlock() |
| if s := c.streams.streams[id]; s != nil { |
| return s |
| } |
| // TODO: Check for closed streams, once we support closing streams. |
| if id.initiator() == c.side { |
| c.abort(now, localTransportError(errStreamState)) |
| return nil |
| } |
| |
| s := newStream(c, id) |
| s.inmaxbuf = c.config.streamReadBufferSize() |
| s.inwin = c.config.streamReadBufferSize() |
| if id.streamType() == bidiStream { |
| s.outmaxbuf = c.config.streamWriteBufferSize() |
| s.outwin = c.streams.peerInitialMaxStreamDataBidiLocal |
| } |
| s.inUnlock() |
| s.outUnlock() |
| |
| c.streams.streams[id] = s |
| c.streams.queue.put(s) |
| return s |
| } |
| |
| // queueStreamForSend marks a stream as containing frames that need sending. |
| func (c *Conn) queueStreamForSend(s *Stream) { |
| c.streams.sendMu.Lock() |
| defer c.streams.sendMu.Unlock() |
| if s.next != nil { |
| // Already in the queue. |
| return |
| } |
| if c.streams.sendHead == nil { |
| // The queue was empty. |
| c.streams.sendHead = s |
| c.streams.sendTail = s |
| s.next = s |
| } else { |
| // Insert this stream at the end of the queue. |
| c.streams.sendTail.next = s |
| c.streams.sendTail = s |
| s.next = c.streams.sendHead |
| } |
| c.streams.needSend.Store(true) |
| c.wake() |
| } |
| |
| // appendStreamFrames writes stream-related frames to the current packet. |
| // |
| // It returns true if no more frames need appending, |
| // false if not everything fit in the current packet. |
| func (c *Conn) appendStreamFrames(w *packetWriter, pnum packetNumber, pto bool) bool { |
| if pto { |
| return c.appendStreamFramesPTO(w, pnum) |
| } |
| if !c.streams.needSend.Load() { |
| return true |
| } |
| c.streams.sendMu.Lock() |
| defer c.streams.sendMu.Unlock() |
| for { |
| s := c.streams.sendHead |
| const pto = false |
| |
| state := s.state.load() |
| if state&streamInSend != 0 { |
| s.ingate.lock() |
| ok := s.appendInFramesLocked(w, pnum, pto) |
| state = s.inUnlockNoQueue() |
| if !ok { |
| return false |
| } |
| } |
| |
| if state&streamOutSend != 0 { |
| avail := w.avail() |
| s.outgate.lock() |
| ok := s.appendOutFramesLocked(w, pnum, pto) |
| state = s.outUnlockNoQueue() |
| if !ok { |
| // We've sent some data for this stream, but it still has more to send. |
| // If the stream got a reasonable chance to put data in a packet, |
| // advance sendHead to the next stream in line, to avoid starvation. |
| // We'll come back to this stream after going through the others. |
| // |
| // If the packet was already mostly out of space, leave sendHead alone |
| // and come back to this stream again on the next packet. |
| if avail > 512 { |
| c.streams.sendHead = s.next |
| c.streams.sendTail = s |
| } |
| return false |
| } |
| } |
| |
| if state == streamInDone|streamOutDone { |
| // Stream is finished, remove it from the conn. |
| s.state.set(streamConnRemoved, streamConnRemoved) |
| delete(c.streams.streams, s.id) |
| |
| // TODO: Provide the peer with additional stream quota (MAX_STREAMS). |
| } |
| |
| next := s.next |
| s.next = nil |
| if (next == s) != (s == c.streams.sendTail) { |
| panic("BUG: sendable stream list state is inconsistent") |
| } |
| if s == c.streams.sendTail { |
| // This was the last stream. |
| c.streams.sendHead = nil |
| c.streams.sendTail = nil |
| c.streams.needSend.Store(false) |
| return true |
| } |
| // We've sent all data for this stream, so remove it from the list. |
| c.streams.sendTail.next = next |
| c.streams.sendHead = next |
| } |
| } |
| |
| // appendStreamFramesPTO writes stream-related frames to the current packet |
| // for a PTO probe. |
| // |
| // It returns true if no more frames need appending, |
| // false if not everything fit in the current packet. |
| func (c *Conn) appendStreamFramesPTO(w *packetWriter, pnum packetNumber) bool { |
| c.streams.sendMu.Lock() |
| defer c.streams.sendMu.Unlock() |
| for _, s := range c.streams.streams { |
| const pto = true |
| s.ingate.lock() |
| inOK := s.appendInFramesLocked(w, pnum, pto) |
| s.inUnlockNoQueue() |
| if !inOK { |
| return false |
| } |
| s.outgate.lock() |
| outOK := s.appendOutFramesLocked(w, pnum, pto) |
| s.outUnlockNoQueue() |
| if !outOK { |
| return false |
| } |
| } |
| return true |
| } |