blob: dfbfc1eb341bbc0b89bd6bc9a2dde7cf6d0de23e [file] [log] [blame]
// Copyright 2025 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package http2
import (
"fmt"
"math"
)
type streamMetadata struct {
location *writeQueue
priority PriorityParam
}
type priorityWriteSchedulerRFC9218 struct {
// control contains control frames (SETTINGS, PING, etc.).
control writeQueue
// heads contain the head of a circular list of streams.
// We put these heads within a nested array that represents urgency and
// incremental, as defined in
// https://www.rfc-editor.org/rfc/rfc9218.html#name-priority-parameters.
// 8 represents u=0 up to u=7, and 2 represents i=false and i=true.
heads [8][2]*writeQueue
// streams contains a mapping between each stream ID and their metadata, so
// we can quickly locate them when needing to, for example, adjust their
// priority.
streams map[uint32]streamMetadata
// queuePool are empty queues for reuse.
queuePool writeQueuePool
// prioritizeIncremental is used to determine whether we should prioritize
// incremental streams or not, when urgency is the same in a given Pop()
// call.
prioritizeIncremental bool
// priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we
// receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
priorityUpdateBuf struct {
// streamID being 0 means that the buffer is empty. This is a safe
// assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR.
streamID uint32
priority PriorityParam
}
}
func newPriorityWriteSchedulerRFC9218() WriteScheduler {
ws := &priorityWriteSchedulerRFC9218{
streams: make(map[uint32]streamMetadata),
}
return ws
}
func (ws *priorityWriteSchedulerRFC9218) OpenStream(streamID uint32, opt OpenStreamOptions) {
if ws.streams[streamID].location != nil {
panic(fmt.Errorf("stream %d already opened", streamID))
}
if streamID == ws.priorityUpdateBuf.streamID {
ws.priorityUpdateBuf.streamID = 0
opt.priority = ws.priorityUpdateBuf.priority
}
q := ws.queuePool.get()
ws.streams[streamID] = streamMetadata{
location: q,
priority: opt.priority,
}
u, i := opt.priority.urgency, opt.priority.incremental
if ws.heads[u][i] == nil {
ws.heads[u][i] = q
q.next = q
q.prev = q
} else {
// Queues are stored in a ring.
// Insert the new stream before ws.head, putting it at the end of the list.
q.prev = ws.heads[u][i].prev
q.next = ws.heads[u][i]
q.prev.next = q
q.next.prev = q
}
}
func (ws *priorityWriteSchedulerRFC9218) CloseStream(streamID uint32) {
metadata := ws.streams[streamID]
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
if q == nil {
return
}
if q.next == q {
// This was the only open stream.
ws.heads[u][i] = nil
} else {
q.prev.next = q.next
q.next.prev = q.prev
if ws.heads[u][i] == q {
ws.heads[u][i] = q.next
}
}
delete(ws.streams, streamID)
ws.queuePool.put(q)
}
func (ws *priorityWriteSchedulerRFC9218) AdjustStream(streamID uint32, priority PriorityParam) {
metadata := ws.streams[streamID]
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
if q == nil {
ws.priorityUpdateBuf.streamID = streamID
ws.priorityUpdateBuf.priority = priority
return
}
// Remove stream from current location.
if q.next == q {
// This was the only open stream.
ws.heads[u][i] = nil
} else {
q.prev.next = q.next
q.next.prev = q.prev
if ws.heads[u][i] == q {
ws.heads[u][i] = q.next
}
}
// Insert stream to the new queue.
u, i = priority.urgency, priority.incremental
if ws.heads[u][i] == nil {
ws.heads[u][i] = q
q.next = q
q.prev = q
} else {
// Queues are stored in a ring.
// Insert the new stream before ws.head, putting it at the end of the list.
q.prev = ws.heads[u][i].prev
q.next = ws.heads[u][i]
q.prev.next = q
q.next.prev = q
}
// Update the metadata.
ws.streams[streamID] = streamMetadata{
location: q,
priority: priority,
}
}
func (ws *priorityWriteSchedulerRFC9218) Push(wr FrameWriteRequest) {
if wr.isControl() {
ws.control.push(wr)
return
}
q := ws.streams[wr.StreamID()].location
if q == nil {
// This is a closed stream.
// wr should not be a HEADERS or DATA frame.
// We push the request onto the control queue.
if wr.DataSize() > 0 {
panic("add DATA on non-open stream")
}
ws.control.push(wr)
return
}
q.push(wr)
}
func (ws *priorityWriteSchedulerRFC9218) Pop() (FrameWriteRequest, bool) {
// Control and RST_STREAM frames first.
if !ws.control.empty() {
return ws.control.shift(), true
}
// On the next Pop(), we want to prioritize incremental if we prioritized
// non-incremental request of the same urgency this time. Vice-versa.
// i.e. when there are incremental and non-incremental requests at the same
// priority, we give 50% of our bandwidth to the incremental ones in
// aggregate and 50% to the first non-incremental one (since
// non-incremental streams do not use round-robin writes).
ws.prioritizeIncremental = !ws.prioritizeIncremental
// Always prioritize lowest u (i.e. highest urgency level).
for u := range ws.heads {
for i := range ws.heads[u] {
// When we want to prioritize incremental, we try to pop i=true
// first before i=false when u is the same.
if ws.prioritizeIncremental {
i = (i + 1) % 2
}
q := ws.heads[u][i]
if q == nil {
continue
}
for {
if wr, ok := q.consume(math.MaxInt32); ok {
if i == 1 {
// For incremental streams, we update head to q.next so
// we can round-robin between multiple streams that can
// immediately benefit from partial writes.
ws.heads[u][i] = q.next
} else {
// For non-incremental streams, we try to finish one to
// completion rather than doing round-robin. However,
// we update head here so that if q.consume() is !ok
// (e.g. the stream has no more frame to consume), head
// is updated to the next q that has frames to consume
// on future iterations. This way, we do not prioritize
// writing to unavailable stream on next Pop() calls,
// preventing head-of-line blocking.
ws.heads[u][i] = q
}
return wr, true
}
q = q.next
if q == ws.heads[u][i] {
break
}
}
}
}
return FrameWriteRequest{}, false
}