http2: change default frame scheduler to round robin
The priority scheduler allows stream starvation (see golang/go#58804)
and is CPU intensive. In addition, the RFC 7540 prioritization
scheme it implements was deprecated in RFC 9113 and does not appear
to have ever had significant adoption.
Add a simple round-robin scheduler and enable it by default.
For golang/go#58804
Change-Id: I5c5143aa9bc339fc0894f70d773fa7c0d7d87eef
Reviewed-on: https://go-review.googlesource.com/c/net/+/478735
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Bryan Mills <bcmills@google.com>
Run-TryBot: Damien Neil <dneil@google.com>
diff --git a/http2/server.go b/http2/server.go
index 0c5bdee..396d53b 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -441,7 +441,7 @@
if s.NewWriteScheduler != nil {
sc.writeSched = s.NewWriteScheduler()
} else {
- sc.writeSched = NewPriorityWriteScheduler(nil)
+ sc.writeSched = newRoundRobinWriteScheduler()
}
// These start at the RFC-specified defaults. If there is a higher
diff --git a/http2/writesched.go b/http2/writesched.go
index c7cd001..cc893ad 100644
--- a/http2/writesched.go
+++ b/http2/writesched.go
@@ -184,7 +184,8 @@
// writeQueue is used by implementations of WriteScheduler.
type writeQueue struct {
- s []FrameWriteRequest
+ s []FrameWriteRequest
+ prev, next *writeQueue
}
func (q *writeQueue) empty() bool { return len(q.s) == 0 }
diff --git a/http2/writesched_roundrobin.go b/http2/writesched_roundrobin.go
new file mode 100644
index 0000000..54fe863
--- /dev/null
+++ b/http2/writesched_roundrobin.go
@@ -0,0 +1,119 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package http2
+
+import (
+ "fmt"
+ "math"
+)
+
+type roundRobinWriteScheduler struct {
+ // control contains control frames (SETTINGS, PING, etc.).
+ control writeQueue
+
+ // streams maps stream ID to a queue.
+ streams map[uint32]*writeQueue
+
+ // stream queues are stored in a circular linked list.
+ // head is the next stream to write, or nil if there are no streams open.
+ head *writeQueue
+
+ // pool of empty queues for reuse.
+ queuePool writeQueuePool
+}
+
+// newRoundRobinWriteScheduler constructs a new write scheduler.
+// The round robin scheduler priorizes control frames
+// like SETTINGS and PING over DATA frames.
+// When there are no control frames to send, it performs a round-robin
+// selection from the ready streams.
+func newRoundRobinWriteScheduler() WriteScheduler {
+ ws := &roundRobinWriteScheduler{
+ streams: make(map[uint32]*writeQueue),
+ }
+ return ws
+}
+
+func (ws *roundRobinWriteScheduler) OpenStream(streamID uint32, options OpenStreamOptions) {
+ if ws.streams[streamID] != nil {
+ panic(fmt.Errorf("stream %d already opened", streamID))
+ }
+ q := ws.queuePool.get()
+ ws.streams[streamID] = q
+ if ws.head == nil {
+ ws.head = q
+ q.next = q
+ q.prev = q
+ } else {
+ // Queues are stored in a ring.
+ // Insert the new stream before ws.head, putting it at the end of the list.
+ q.prev = ws.head.prev
+ q.next = ws.head
+ q.prev.next = q
+ q.next.prev = q
+ }
+}
+
+func (ws *roundRobinWriteScheduler) CloseStream(streamID uint32) {
+ q := ws.streams[streamID]
+ if q == nil {
+ return
+ }
+ if q.next == q {
+ // This was the only open stream.
+ ws.head = nil
+ } else {
+ q.prev.next = q.next
+ q.next.prev = q.prev
+ if ws.head == q {
+ ws.head = q.next
+ }
+ }
+ delete(ws.streams, streamID)
+ ws.queuePool.put(q)
+}
+
+func (ws *roundRobinWriteScheduler) AdjustStream(streamID uint32, priority PriorityParam) {}
+
+func (ws *roundRobinWriteScheduler) Push(wr FrameWriteRequest) {
+ if wr.isControl() {
+ ws.control.push(wr)
+ return
+ }
+ q := ws.streams[wr.StreamID()]
+ if q == nil {
+ // This is a closed stream.
+ // wr should not be a HEADERS or DATA frame.
+ // We push the request onto the control queue.
+ if wr.DataSize() > 0 {
+ panic("add DATA on non-open stream")
+ }
+ ws.control.push(wr)
+ return
+ }
+ q.push(wr)
+}
+
+func (ws *roundRobinWriteScheduler) Pop() (FrameWriteRequest, bool) {
+ // Control and RST_STREAM frames first.
+ if !ws.control.empty() {
+ return ws.control.shift(), true
+ }
+ if ws.head == nil {
+ return FrameWriteRequest{}, false
+ }
+ q := ws.head
+ for {
+ if wr, ok := q.consume(math.MaxInt32); ok {
+ ws.head = q.next
+ return wr, true
+ }
+ q = q.next
+ if q == ws.head {
+ break
+ }
+ }
+ return FrameWriteRequest{}, false
+}
diff --git a/http2/writesched_roundrobin_test.go b/http2/writesched_roundrobin_test.go
new file mode 100644
index 0000000..032b2bc
--- /dev/null
+++ b/http2/writesched_roundrobin_test.go
@@ -0,0 +1,65 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package http2
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestRoundRobinScheduler(t *testing.T) {
+ const maxFrameSize = 16
+ sc := &serverConn{maxFrameSize: maxFrameSize}
+ ws := newRoundRobinWriteScheduler()
+ streams := make([]*stream, 4)
+ for i := range streams {
+ streamID := uint32(i) + 1
+ streams[i] = &stream{
+ id: streamID,
+ sc: sc,
+ }
+ streams[i].flow.add(1 << 20) // arbitrary large value
+ ws.OpenStream(streamID, OpenStreamOptions{})
+ wr := FrameWriteRequest{
+ write: &writeData{
+ streamID: streamID,
+ p: make([]byte, maxFrameSize*(i+1)),
+ endStream: false,
+ },
+ stream: streams[i],
+ }
+ ws.Push(wr)
+ }
+ const controlFrames = 2
+ for i := 0; i < controlFrames; i++ {
+ ws.Push(makeWriteNonStreamRequest())
+ }
+
+ // We should get the control frames first.
+ for i := 0; i < controlFrames; i++ {
+ wr, ok := ws.Pop()
+ if !ok || wr.StreamID() != 0 {
+ t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
+ }
+ }
+
+ // Each stream should write maxFrameSize bytes until it runs out of data.
+ // Stream 1 has one frame of data, 2 has two frames, etc.
+ want := []uint32{1, 2, 3, 4, 2, 3, 4, 3, 4, 4}
+ var got []uint32
+ for {
+ wr, ok := ws.Pop()
+ if !ok {
+ break
+ }
+ if wr.DataSize() != maxFrameSize {
+ t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
+ }
+ got = append(got, wr.StreamID())
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("popped streams %v, want %v", got, want)
+ }
+}