| // Copyright 2014 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| // See https://code.google.com/p/go/source/browse/CONTRIBUTORS |
| // Licensed under the same terms as Go itself: |
| // https://code.google.com/p/go/source/browse/LICENSE |
| |
| package http2 |
| |
| // frameWriteMsg is a request to write a frame. |
| type frameWriteMsg struct { |
| // write is the interface value that does the writing, once the |
| // writeScheduler (below) has decided to select this frame |
| // to write. The write functions are all defined in write.go. |
| write writeFramer |
| |
| stream *stream // used for prioritization. nil for non-stream frames. |
| |
| // done, if non-nil, must be a buffered channel with space for |
| // 1 message and is sent the return value from write (or an |
| // earlier error) when the frame has been written. |
| done chan error |
| } |
| |
| // writeScheduler tracks pending frames to write, priorities, and decides |
| // the next one to use. It is not thread-safe. |
| type writeScheduler struct { |
| // zero are frames not associated with a specific stream. |
| // They're sent before any stream-specific freams. |
| zero writeQueue |
| |
| // maxFrameSize is the maximum size of a DATA frame |
| // we'll write. |
| maxFrameSize uint32 |
| |
| // sq contains the stream-specific queues, keyed by stream ID. |
| // when a stream is idle, it's deleted from the map. |
| sq map[uint32]*writeQueue |
| } |
| |
| func (ws *writeScheduler) empty() bool { return ws.zero.empty() && len(ws.sq) == 0 } |
| |
| func (ws *writeScheduler) add(wm frameWriteMsg) { |
| st := wm.stream |
| if st == nil { |
| ws.zero.push(wm) |
| } else { |
| ws.streamQueue(st.id).push(wm) |
| } |
| } |
| |
| func (ws *writeScheduler) streamQueue(streamID uint32) *writeQueue { |
| if q, ok := ws.sq[streamID]; ok { |
| return q |
| } |
| if ws.sq == nil { |
| ws.sq = make(map[uint32]*writeQueue) |
| } |
| q := new(writeQueue) |
| ws.sq[streamID] = q |
| return q |
| } |
| |
| // take returns the most important frame to write and removes it from the scheduler. |
| // It is illegal to call this if the scheduler is empty or if there are no connection-level |
| // flow control bytes available. |
| func (ws *writeScheduler) take() (wm frameWriteMsg, ok bool) { |
| // If there any frames not associated with streams, prefer those first. |
| // These are usually SETTINGS, etc. |
| if !ws.zero.empty() { |
| return ws.zero.shift(), true |
| } |
| if len(ws.sq) == 0 { |
| return |
| } |
| |
| // Next, prioritize frames on streams that aren't DATA frames (no cost). |
| for id, q := range ws.sq { |
| if q.firstIsNoCost() { |
| return ws.takeFrom(id, q) |
| } |
| } |
| |
| // Now, all that remains are DATA frames. So pick the best one. |
| // TODO: do that. For now, pick a random one. |
| for id, q := range ws.sq { |
| if wm, ok := ws.takeFrom(id, q); ok { |
| return wm, true |
| } |
| } |
| return |
| } |
| |
| func (ws *writeScheduler) takeFrom(id uint32, q *writeQueue) (wm frameWriteMsg, ok bool) { |
| wm = q.head() |
| // If the first item in this queue costs flow control tokens |
| // and we don't have enough, write as much as we can. |
| if wd, ok := wm.write.(*writeData); ok { |
| allowed := wm.stream.flow.available() // max we can write |
| if allowed == 0 { |
| // No quota available. Caller can try the next stream. |
| return wm, false |
| } |
| if int32(ws.maxFrameSize) < allowed { |
| allowed = int32(ws.maxFrameSize) |
| } |
| if allowed == 0 { |
| panic("internal error: ws.maxFrameSize not initialized or invalid") |
| } |
| // TODO: further restrict the allowed size, because even if |
| // the peer says it's okay to write 16MB data frames, we might |
| // want to write smaller ones to properly weight competing |
| // streams' priorities. |
| if len(wd.p) > int(allowed) { |
| wm.stream.flow.take(allowed) |
| chunk := wd.p[:allowed] |
| wd.p = wd.p[allowed:] |
| // Make up a new write message of a valid size, rather |
| // than shifting one off the queue. |
| return frameWriteMsg{ |
| stream: wm.stream, |
| write: &writeData{ |
| streamID: wd.streamID, |
| p: chunk, |
| // even if the original was true, there are bytes |
| // remaining because len(wd.p) > allowed, so we |
| // know endStream is false: |
| endStream: false, |
| }, |
| // completeness. our caller is blocking on the final |
| // DATA frame, not these intermediates: |
| done: nil, |
| }, true |
| } |
| } |
| |
| q.shift() |
| if q.empty() { |
| // TODO: reclaim its slice and use it for future allocations |
| // in the writeScheduler.streamQueue method above when making |
| // the writeQueue. |
| delete(ws.sq, id) |
| } |
| return wm, true |
| } |
| |
| type writeQueue struct { |
| s []frameWriteMsg |
| } |
| |
| func (q *writeQueue) empty() bool { return len(q.s) == 0 } |
| |
| func (q *writeQueue) push(wm frameWriteMsg) { |
| q.s = append(q.s, wm) |
| } |
| |
| // head returns the next item that would be removed by shift. |
| func (q *writeQueue) head() frameWriteMsg { |
| if len(q.s) == 0 { |
| panic("invalid use of queue") |
| } |
| return q.s[0] |
| } |
| |
| func (q *writeQueue) shift() frameWriteMsg { |
| if len(q.s) == 0 { |
| panic("invalid use of queue") |
| } |
| wm := q.s[0] |
| // TODO: less copy-happy queue. |
| copy(q.s, q.s[1:]) |
| q.s[len(q.s)-1] = frameWriteMsg{} |
| q.s = q.s[:len(q.s)-1] |
| return wm |
| } |
| |
| func (q *writeQueue) firstIsNoCost() bool { |
| if df, ok := q.s[0].write.(*writeData); ok { |
| return len(df.p) == 0 |
| } |
| return true |
| } |