blob: 8b69ef7dba18ca16c8509982071a9d9928e2d357 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.21
package quic
import (
"sync/atomic"
"time"
)
// connInflow tracks connection-level flow control for data sent by the peer to us.
//
// There are four byte offsets of significance in the stream of data received from the peer,
// each >= to the previous:
//
// - bytes read by the user
// - bytes received from the peer
// - limit sent to the peer in a MAX_DATA frame
// - potential new limit to sent to the peer
//
// We maintain a flow control window, so as bytes are read by the user
// the potential limit is extended correspondingly.
//
// We keep an atomic counter of bytes read by the user and not yet applied to the
// potential limit (credit). When this count grows large enough, we update the
// new limit to send and mark that we need to send a new MAX_DATA frame.
type connInflow struct {
sent sentVal // set when we need to send a MAX_DATA update to the peer
usedLimit int64 // total bytes sent by the peer, must be less than sentLimit
sentLimit int64 // last MAX_DATA sent to the peer
newLimit int64 // new MAX_DATA to send
credit atomic.Int64 // bytes read but not yet applied to extending the flow-control window
}
func (c *Conn) inflowInit() {
// The initial MAX_DATA limit is sent as a transport parameter.
c.streams.inflow.sentLimit = c.config.maxConnReadBufferSize()
c.streams.inflow.newLimit = c.streams.inflow.sentLimit
}
// handleStreamBytesReadOffLoop records that the user has consumed bytes from a stream.
// We may extend the peer's flow control window.
//
// This is called indirectly by the user, via Read or CloseRead.
func (c *Conn) handleStreamBytesReadOffLoop(n int64) {
if n == 0 {
return
}
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
// We should send a MAX_DATA update to the peer.
// Record this on the Conn's main loop.
c.sendMsg(func(now time.Time, c *Conn) {
// A MAX_DATA update may have already happened, so check again.
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Load()) {
c.sendMaxDataUpdate()
}
})
}
}
// handleStreamBytesReadOnLoop extends the peer's flow control window after
// data has been discarded due to a RESET_STREAM frame.
//
// This is called on the conn's loop.
func (c *Conn) handleStreamBytesReadOnLoop(n int64) {
if c.shouldUpdateFlowControl(c.streams.inflow.credit.Add(n)) {
c.sendMaxDataUpdate()
}
}
func (c *Conn) sendMaxDataUpdate() {
c.streams.inflow.sent.setUnsent()
// Apply current credit to the limit.
// We don't strictly need to do this here
// since appendMaxDataFrame will do so as well,
// but this avoids redundant trips down this path
// if the MAX_DATA frame doesn't go out right away.
c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
}
func (c *Conn) shouldUpdateFlowControl(credit int64) bool {
return shouldUpdateFlowControl(c.config.maxConnReadBufferSize(), credit)
}
// handleStreamBytesReceived records that the peer has sent us stream data.
func (c *Conn) handleStreamBytesReceived(n int64) error {
c.streams.inflow.usedLimit += n
if c.streams.inflow.usedLimit > c.streams.inflow.sentLimit {
return localTransportError{
code: errFlowControl,
reason: "stream exceeded flow control limit",
}
}
return nil
}
// appendMaxDataFrame appends a MAX_DATA frame to the current packet.
//
// It returns true if no more frames need appending,
// false if it could not fit a frame in the current packet.
func (c *Conn) appendMaxDataFrame(w *packetWriter, pnum packetNumber, pto bool) bool {
if c.streams.inflow.sent.shouldSendPTO(pto) {
// Add any unapplied credit to the new limit now.
c.streams.inflow.newLimit += c.streams.inflow.credit.Swap(0)
if !w.appendMaxDataFrame(c.streams.inflow.newLimit) {
return false
}
c.streams.inflow.sentLimit += c.streams.inflow.newLimit
c.streams.inflow.sent.setSent(pnum)
}
return true
}
// ackOrLossMaxData records the fate of a MAX_DATA frame.
func (c *Conn) ackOrLossMaxData(pnum packetNumber, fate packetFate) {
c.streams.inflow.sent.ackLatestOrLoss(pnum, fate)
}
// connOutflow tracks connection-level flow control for data sent by us to the peer.
type connOutflow struct {
max int64 // largest MAX_DATA received from peer
used int64 // total bytes of STREAM data sent to peer
}
// setMaxData updates the connection-level flow control limit
// with the initial limit conveyed in transport parameters
// or an update from a MAX_DATA frame.
func (f *connOutflow) setMaxData(maxData int64) {
f.max = max(f.max, maxData)
}
// avail returns the number of connection-level flow control bytes available.
func (f *connOutflow) avail() int64 {
return f.max - f.used
}
// consume records consumption of n bytes of flow.
func (f *connOutflow) consume(n int64) {
f.used += n
}