http2: buffer the most recently received PRIORITY_UPDATE frame

Per RFC 9218, servers should buffer the most recently received
PRIORITY_UPDATE frame. This CL implements said buffering within the RFC
9218 priority write scheduler.

For golang/go#75500

Change-Id: I259f4f6787053de6388ec513086cfa1b294fa607
Reviewed-on: https://go-review.googlesource.com/c/net/+/728401
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Damien Neil <dneil@google.com>
Reviewed-by: Nicholas Husin <husin@google.com>
diff --git a/http2/writesched_priority_rfc9218.go b/http2/writesched_priority_rfc9218.go
index cb4cadc..dfbfc1e 100644
--- a/http2/writesched_priority_rfc9218.go
+++ b/http2/writesched_priority_rfc9218.go
@@ -37,6 +37,15 @@
 	// incremental streams or not, when urgency is the same in a given Pop()
 	// call.
 	prioritizeIncremental bool
+
+	// priorityUpdateBuf is used to buffer the most recent PRIORITY_UPDATE we
+	// receive per https://www.rfc-editor.org/rfc/rfc9218.html#name-the-priority_update-frame.
+	priorityUpdateBuf struct {
+		// streamID being 0 means that the buffer is empty. This is a safe
+		// assumption as PRIORITY_UPDATE for stream 0 is a PROTOCOL_ERROR.
+		streamID uint32
+		priority PriorityParam
+	}
 }
 
 func newPriorityWriteSchedulerRFC9218() WriteScheduler {
@@ -50,6 +59,10 @@
 	if ws.streams[streamID].location != nil {
 		panic(fmt.Errorf("stream %d already opened", streamID))
 	}
+	if streamID == ws.priorityUpdateBuf.streamID {
+		ws.priorityUpdateBuf.streamID = 0
+		opt.priority = ws.priorityUpdateBuf.priority
+	}
 	q := ws.queuePool.get()
 	ws.streams[streamID] = streamMetadata{
 		location: q,
@@ -95,6 +108,8 @@
 	metadata := ws.streams[streamID]
 	q, u, i := metadata.location, metadata.priority.urgency, metadata.priority.incremental
 	if q == nil {
+		ws.priorityUpdateBuf.streamID = streamID
+		ws.priorityUpdateBuf.priority = priority
 		return
 	}
 
diff --git a/http2/writesched_priority_rfc9218_test.go b/http2/writesched_priority_rfc9218_test.go
index 28a8201..f8fd9c8 100644
--- a/http2/writesched_priority_rfc9218_test.go
+++ b/http2/writesched_priority_rfc9218_test.go
@@ -324,3 +324,66 @@
 		t.Fatalf("popped streams %v, want %v", got, want)
 	}
 }
+
+func TestPrioritySchedulerBuffersPriorityUpdate(t *testing.T) {
+	const maxFrameSize = 16
+	sc := &serverConn{maxFrameSize: maxFrameSize}
+	ws := newPriorityWriteSchedulerRFC9218()
+
+	// Priorities are adjusted for streams that are not open yet.
+	ws.AdjustStream(1, PriorityParam{urgency: 0})
+	ws.AdjustStream(5, PriorityParam{urgency: 0})
+	for _, streamID := range []uint32{1, 3, 5} {
+		stream := &stream{
+			id: streamID,
+			sc: sc,
+		}
+		stream.flow.add(1 << 20) // arbitrary large value
+		ws.OpenStream(streamID, OpenStreamOptions{
+			priority: PriorityParam{
+				urgency:     7,
+				incremental: 1,
+			},
+		})
+		wr := FrameWriteRequest{
+			write: &writeData{
+				streamID:  streamID,
+				p:         make([]byte, maxFrameSize*(3)),
+				endStream: false,
+			},
+			stream: stream,
+		}
+		ws.Push(wr)
+	}
+
+	const controlFrames = 2
+	for range controlFrames {
+		ws.Push(makeWriteNonStreamRequest())
+	}
+
+	// We should get the control frames first.
+	for range controlFrames {
+		wr, ok := ws.Pop()
+		if !ok || wr.StreamID() != 0 {
+			t.Fatalf("wr.Pop() = stream %v, %v; want 0, true", wr.StreamID(), ok)
+		}
+	}
+
+	// The most recent priority adjustment is buffered and applied. Older ones
+	// are ignored.
+	want := []uint32{5, 5, 5, 1, 3, 1, 3, 1, 3}
+	var got []uint32
+	for {
+		wr, ok := ws.Pop()
+		if !ok {
+			break
+		}
+		if wr.DataSize() != maxFrameSize {
+			t.Fatalf("wr.Pop() = %v data bytes, want %v", wr.DataSize(), maxFrameSize)
+		}
+		got = append(got, wr.StreamID())
+	}
+	if !reflect.DeepEqual(got, want) {
+		t.Fatalf("popped streams %v, want %v", got, want)
+	}
+}