http2: buffer the most recently received PRIORITY_UPDATE frame
Per RFC 9218, servers should buffer the most recently received
PRIORITY_UPDATE frame. This CL implements said buffering within the RFC
9218 priority write scheduler.
For golang/go#75500
Change-Id: I259f4f6787053de6388ec513086cfa1b294fa607
Reviewed-on: https://go-review.googlesource.com/c/net/+/728401
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-by: Nicholas Husin <husin@google.com>
diff --git a/http2/writesched_priority_rfc9218.go b/http2/writesched_priority_rfc9218.go
index cb4cadc..dfbfc1e 100644
--- a/http2/writesched_priority_rfc9218.go
+++ b/http2/writesched_priority_rfc9218.go
@@ -37,6 +37,15 @@
// incremental streams or not, when urgency is the same in a given Pop()
// call.
prioritizeIncremental bool
+
+ // priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we
+ // receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
+ priorityUpdateBuf struct {
+ // streamID being 0 means that the buffer is empty. This is a safe
+ // assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR.
+ streamID uint32
+ priority PriorityParam
+ }
}
func newPriorityWriteSchedulerRFC9218() WriteScheduler {
@@ -50,6 +59,10 @@
if ws.streams[streamID].location != nil {
panic(fmt.Errorf("stream %d already opened", streamID))
}
+ if streamID == ws.priorityUpdateBuf.streamID {
+ ws.priorityUpdateBuf.streamID = 0
+ opt.priority = ws.priorityUpdateBuf.priority
+ }
q := ws.queuePool.get()
ws.streams[streamID] = streamMetadata{
location: q,
@@ -95,6 +108,8 @@
metadata := ws.streams[streamID]
q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
if q == nil {
+ ws.priorityUpdateBuf.streamID = streamID
+ ws.priorityUpdateBuf.priority = priority
return
}
diff --git a/http2/writesched_priority_rfc9218_test.go b/http2/writesched_priority_rfc9218_test.go
index 28a8201..f8fd9c8 100644
--- a/http2/writesched_priority_rfc9218_test.go
+++ b/http2/writesched_priority_rfc9218_test.go
@@ -324,3 +324,66 @@
t.Fatalf("popped streams %v, want %v", got, want)
}
}
+
+func TestPrioritySchedulerBuffersPriorityUpdate(t *testing.T) {
+ const maxFrameSize = 16
+ sc := &serverConn{maxFrameSize: maxFrameSize}
+ ws := newPriorityWriteSchedulerRFC9218()
+
+ // Priorities are adjusted for streams that are not open yet.
+ ws.AdjustStream(1, PriorityParam{urgency: 0})
+ ws.AdjustStream(5, PriorityParam{urgency: 0})
+ for _, streamID := range []uint32{1, 3, 5} {
+ stream := &stream{
+ id: streamID,
+ sc: sc,
+ }
+ stream.flow.add(1 << 20) // arbitrary large value
+ ws.OpenStream(streamID, OpenStreamOptions{
+ priority: PriorityParam{
+ urgency: 7,
+ incremental: 1,
+ },
+ })
+ wr := FrameWriteRequest{
+ write: &writeData{
+ streamID: streamID,
+ p: make([]byte, maxFrameSize*(3)),
+ endStream: false,
+ },
+ stream: stream,
+ }
+ ws.Push(wr)
+ }
+
+ const controlFrames = 2
+ for range controlFrames {
+ ws.Push(makeWriteNonStreamRequest())
+ }
+
+ // We should get the control frames first.
+ for range controlFrames {
+ wr, ok := ws.Pop()
+ if !ok || wr.StreamID() != 0 {
+ t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
+ }
+ }
+
+ // The most recent priority adjustment is buffered and applied. Older ones
+ // are ignored.
+ want := []uint32{5, 5, 5, 1, 3, 1, 3, 1, 3}
+ var got []uint32
+ for {
+ wr, ok := ws.Pop()
+ if !ok {
+ break
+ }
+ if wr.DataSize() != maxFrameSize {
+ t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
+ }
+ got = append(got, wr.StreamID())
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Fatalf("popped streams %v, want %v", got, want)
+ }
+}