Shrink frameWriteMsg, add writeFramer interface, remove empty interfaces.
Just cleanups.
diff --git a/server.go b/server.go
index 82cdb28..27cb220 100644
--- a/server.go
+++ b/server.go
@@ -424,12 +424,12 @@
// At most one goroutine can be running writeFrameAsync at a time per
// serverConn.
func (sc *serverConn) writeFrameAsync(wm frameWriteMsg) {
- err := wm.write(sc, wm.v)
+ err := wm.write.writeFrame(sc)
if ch := wm.done; ch != nil {
select {
case ch <- err:
default:
- panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.v))
+ panic(fmt.Sprintf("unbuffered done channel passed in for type %T", wm.write))
}
}
sc.wroteFrameCh <- struct{}{} // tickle frame selection scheduler
@@ -459,8 +459,7 @@
sc.vlogf("HTTP/2 connection from %v on %p", sc.conn.RemoteAddr(), sc.hs)
sc.writeFrame(frameWriteMsg{
- write: writeSettings,
- v: []Setting{
+ write: writeSettings{
{SettingMaxFrameSize, sc.srv.maxReadFrameSize()},
{SettingMaxConcurrentStreams, sc.advMaxStreams},
/* TODO: more actual settings */
@@ -533,10 +532,10 @@
}
}
-// writeData writes the data described in req to stream.id.
+// writeDataFromHandler writes the data described in req to stream.id.
//
// The provided ch is used to avoid allocating new channels for each
-// write operation. It's expected that the caller reuses req and ch
+// write operation. It's expected that the caller reuses writeData and ch
// over time.
//
// The flow control currently happens in the Handler where it waits
@@ -545,14 +544,11 @@
// change when priority is implemented, so the serve goroutine knows
// the total amount of bytes waiting to be sent and can can have more
// scheduling decisions available.
-func (sc *serverConn) writeData(stream *stream, data *dataWriteParams, ch chan error) error {
+func (sc *serverConn) writeDataFromHandler(stream *stream, writeData *writeData, ch chan error) error {
sc.writeFrameFromHandler(frameWriteMsg{
- write: writeDataFrame,
- cost: uint32(len(data.p)),
- stream: stream,
- endStream: data.end,
- v: data,
- done: ch,
+ write: writeData,
+ stream: stream,
+ done: ch,
})
select {
case err := <-ch:
@@ -620,9 +616,9 @@
sc.writingFrame = true
sc.needsFrameFlush = true
- if wm.endStream {
+ if endsStream(wm.write) {
if st == nil {
- panic("nil stream with endStream set")
+ panic("internal error: expecting non-nil stream")
}
switch st.state {
case stateOpen:
@@ -654,8 +650,7 @@
if sc.needToSendGoAway {
sc.needToSendGoAway = false
sc.startFrameWrite(frameWriteMsg{
- write: writeGoAwayFrame,
- v: &goAwayParams{
+ write: &writeGoAway{
maxStreamID: sc.maxStreamID,
code: sc.goAwayCode,
},
@@ -663,7 +658,7 @@
return
}
if sc.writeSched.empty() && sc.needsFrameFlush {
- sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter})
+ sc.startFrameWrite(frameWriteMsg{write: flushFrameWriter{}})
sc.needsFrameFlush = false // after startFrameWrite, since it sets this true
return
}
@@ -673,7 +668,7 @@
}
if sc.needToSendSettingsAck {
sc.needToSendSettingsAck = false
- sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck})
+ sc.startFrameWrite(frameWriteMsg{write: writeSettingsAck{}})
return
}
if sc.writeSched.empty() {
@@ -715,10 +710,7 @@
if !ok {
panic("internal package error; resetStream called on non-existent stream")
}
- sc.writeFrame(frameWriteMsg{
- write: writeRSTStreamFrame,
- v: &se,
- })
+ sc.writeFrame(frameWriteMsg{write: se})
st.sentReset = true
sc.closeStream(st, se)
}
@@ -845,10 +837,7 @@
// PROTOCOL_ERROR."
return ConnectionError(ErrCodeProtocol)
}
- sc.writeFrame(frameWriteMsg{
- write: writePingAck,
- v: f,
- })
+ sc.writeFrame(frameWriteMsg{write: writePingAck{f}})
return nil
}
@@ -1227,23 +1216,12 @@
sc.handler.ServeHTTP(rw, req)
}
-// headerWriteReq is a request to write an HTTP response header from a server Handler.
-type headerWriteReq struct {
- stream *stream
- httpResCode int
- h http.Header // may be nil
- endStream bool
-
- contentType string
- contentLength string
-}
-
// called from handler goroutines.
// h may be nil.
-func (sc *serverConn) writeHeaders(req headerWriteReq, tempCh chan error) {
+func (sc *serverConn) writeHeaders(st *stream, headerData *writeResHeaders, tempCh chan error) {
sc.serveG.checkNotOn() // NOT on
var errc chan error
- if req.h != nil {
+ if headerData.h != nil {
// If there's a header map (which we don't own), so we have to block on
// waiting for this frame to be written, so an http.Flush mid-handler
// writes out the correct value of keys, before a handler later potentially
@@ -1251,11 +1229,9 @@
errc = tempCh
}
sc.writeFrameFromHandler(frameWriteMsg{
- write: writeHeadersFrame,
- v: req,
- stream: req.stream,
- done: errc,
- endStream: req.endStream,
+ write: headerData,
+ stream: st,
+ done: errc,
})
if errc != nil {
select {
@@ -1271,8 +1247,7 @@
// called from handler goroutines.
func (sc *serverConn) write100ContinueHeaders(st *stream) {
sc.writeFrameFromHandler(frameWriteMsg{
- write: write100ContinueHeadersFrame,
- v: st,
+ write: write100ContinueHeadersFrame{st.id},
stream: st,
})
}
@@ -1285,16 +1260,14 @@
const maxUint32 = 2147483647
for n >= maxUint32 {
sc.writeFrameFromHandler(frameWriteMsg{
- write: writeWindowUpdate,
- v: windowUpdateReq{streamID: st.id, n: maxUint32},
+ write: writeWindowUpdate{streamID: st.id, n: maxUint32},
stream: st,
})
n -= maxUint32
}
if n > 0 {
sc.writeFrameFromHandler(frameWriteMsg{
- write: writeWindowUpdate,
- v: windowUpdateReq{streamID: st.id, n: uint32(n)},
+ write: writeWindowUpdate{streamID: st.id, n: uint32(n)},
stream: st,
})
}
@@ -1363,7 +1336,7 @@
wroteHeader bool // WriteHeader called (explicitly or implicitly). Not necessarily sent to user yet.
sentHeader bool // have we sent the header frame?
handlerDone bool // handler has finished
- curWrite dataWriteParams
+ curWrite writeData
frameWriteCh chan error // re-used whenever we need to block on a frame being written
closeNotifierMu sync.Mutex // guards closeNotifierCh
@@ -1373,8 +1346,8 @@
func (rws *responseWriterState) writeData(p []byte, end bool) error {
rws.curWrite.streamID = rws.stream.id
rws.curWrite.p = p
- rws.curWrite.end = end
- return rws.stream.conn.writeData(rws.stream, &rws.curWrite, rws.frameWriteCh)
+ rws.curWrite.endStream = end
+ return rws.stream.conn.writeDataFromHandler(rws.stream, &rws.curWrite, rws.frameWriteCh)
}
type chunkWriter struct{ rws *responseWriterState }
@@ -1401,8 +1374,8 @@
ctype = http.DetectContentType(p)
}
endStream := rws.handlerDone && len(p) == 0
- rws.stream.conn.writeHeaders(headerWriteReq{
- stream: rws.stream,
+ rws.stream.conn.writeHeaders(rws.stream, &writeResHeaders{
+ streamID: rws.stream.id,
httpResCode: rws.status,
h: rws.snapHeader,
endStream: endStream,
diff --git a/write.go b/write.go
index 9711960..8e7c8cf 100644
--- a/write.go
+++ b/write.go
@@ -9,17 +9,23 @@
import (
"bytes"
+ "net/http"
"time"
"github.com/bradfitz/http2/hpack"
)
-// writeContext is the interface needed by the various frame writing
-// functions below. All the functions below are scheduled via the
+// writeFramer is implemented by any type that is used to write frames.
+type writeFramer interface {
+ writeFrame(writeContext) error
+}
+
+// writeContext is the interface needed by the various frame writer
+// types below. All the writeFrame methods below are scheduled via the
// frame writing scheduler (see writeScheduler in writesched.go).
//
// This interface is implemented by *serverConn.
-// TODO: use it from the client code, once it exists.
+// TODO: use it from the client code too, once it exists.
type writeContext interface {
Framer() *Framer
Flush() error
@@ -29,22 +35,36 @@
HeaderEncoder() (*hpack.Encoder, *bytes.Buffer)
}
-func flushFrameWriter(ctx writeContext, _ interface{}) error {
+// endsStream reports whether the given frame writer w will locally
+// close the stream.
+func endsStream(w writeFramer) bool {
+ switch v := w.(type) {
+ case *writeData:
+ return v.endStream
+ case *writeResHeaders:
+ return v.endStream
+ }
+ return false
+}
+
+type flushFrameWriter struct{}
+
+func (flushFrameWriter) writeFrame(ctx writeContext) error {
return ctx.Flush()
}
-func writeSettings(ctx writeContext, v interface{}) error {
- settings := v.([]Setting)
- return ctx.Framer().WriteSettings(settings...)
+type writeSettings []Setting
+
+func (s writeSettings) writeFrame(ctx writeContext) error {
+ return ctx.Framer().WriteSettings([]Setting(s)...)
}
-type goAwayParams struct {
+type writeGoAway struct {
maxStreamID uint32
code ErrCode
}
-func writeGoAwayFrame(ctx writeContext, v interface{}) error {
- p := v.(*goAwayParams)
+func (p *writeGoAway) writeFrame(ctx writeContext) error {
err := ctx.Framer().WriteGoAway(p.maxStreamID, p.code, nil)
if p.code != 0 {
ctx.Flush() // ignore error: we're hanging up on them anyway
@@ -54,32 +74,49 @@
return err
}
-type dataWriteParams struct {
- streamID uint32
- p []byte
- end bool
+type writeData struct {
+ streamID uint32
+ p []byte
+ endStream bool
}
-func writeRSTStreamFrame(ctx writeContext, v interface{}) error {
- se := v.(*StreamError)
+func (w *writeData) writeFrame(ctx writeContext) error {
+ return ctx.Framer().WriteData(w.streamID, w.endStream, w.p)
+}
+
+func (se StreamError) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteRSTStream(se.StreamID, se.Code)
}
-func writePingAck(ctx writeContext, v interface{}) error {
- pf := v.(*PingFrame) // contains the data we need to write back
- return ctx.Framer().WritePing(true, pf.Data)
+type writePingAck struct{ pf *PingFrame }
+
+func (w writePingAck) writeFrame(ctx writeContext) error {
+ return ctx.Framer().WritePing(true, w.pf.Data)
}
-func writeSettingsAck(ctx writeContext, _ interface{}) error {
+type writeSettingsAck struct{}
+
+func (writeSettingsAck) writeFrame(ctx writeContext) error {
return ctx.Framer().WriteSettingsAck()
}
-func writeHeadersFrame(ctx writeContext, v interface{}) error {
- req := v.(headerWriteReq)
+// writeResHeaders is a request to write a HEADERS and 0+ CONTINUATION frames
+// for HTTP response headers from a server handler.
+type writeResHeaders struct {
+ streamID uint32
+ httpResCode int
+ h http.Header // may be nil
+ endStream bool
+
+ contentType string
+ contentLength string
+}
+
+func (w *writeResHeaders) writeFrame(ctx writeContext) error {
enc, buf := ctx.HeaderEncoder()
buf.Reset()
- enc.WriteField(hpack.HeaderField{Name: ":status", Value: httpCodeString(req.httpResCode)})
- for k, vv := range req.h {
+ enc.WriteField(hpack.HeaderField{Name: ":status", Value: httpCodeString(w.httpResCode)})
+ for k, vv := range w.h {
k = lowerHeader(k)
for _, v := range vv {
// TODO: more of "8.1.2.2 Connection-Specific Header Fields"
@@ -89,11 +126,11 @@
enc.WriteField(hpack.HeaderField{Name: k, Value: v})
}
}
- if req.contentType != "" {
- enc.WriteField(hpack.HeaderField{Name: "content-type", Value: req.contentType})
+ if w.contentType != "" {
+ enc.WriteField(hpack.HeaderField{Name: "content-type", Value: w.contentType})
}
- if req.contentLength != "" {
- enc.WriteField(hpack.HeaderField{Name: "content-length", Value: req.contentLength})
+ if w.contentLength != "" {
+ enc.WriteField(hpack.HeaderField{Name: "content-length", Value: w.contentLength})
}
headerBlock := buf.Bytes()
@@ -121,13 +158,13 @@
if first {
first = false
err = ctx.Framer().WriteHeaders(HeadersFrameParam{
- StreamID: req.stream.id,
+ StreamID: w.streamID,
BlockFragment: frag,
- EndStream: req.endStream,
+ EndStream: w.endStream,
EndHeaders: endHeaders,
})
} else {
- err = ctx.Framer().WriteContinuation(req.stream.id, endHeaders, frag)
+ err = ctx.Framer().WriteContinuation(w.streamID, endHeaders, frag)
}
if err != nil {
return err
@@ -136,31 +173,28 @@
return nil
}
-func write100ContinueHeadersFrame(ctx writeContext, v interface{}) error {
- st := v.(*stream)
+type write100ContinueHeadersFrame struct {
+ streamID uint32
+}
+
+func (w write100ContinueHeadersFrame) writeFrame(ctx writeContext) error {
enc, buf := ctx.HeaderEncoder()
buf.Reset()
enc.WriteField(hpack.HeaderField{Name: ":status", Value: "100"})
return ctx.Framer().WriteHeaders(HeadersFrameParam{
- StreamID: st.id,
+ StreamID: w.streamID,
BlockFragment: buf.Bytes(),
EndStream: false,
EndHeaders: true,
})
}
-func writeDataFrame(ctx writeContext, v interface{}) error {
- req := v.(*dataWriteParams)
- return ctx.Framer().WriteData(req.streamID, req.end, req.p)
-}
-
-type windowUpdateReq struct {
+type writeWindowUpdate struct {
streamID uint32
n uint32
}
-func writeWindowUpdate(ctx writeContext, v interface{}) error {
- wu := v.(windowUpdateReq)
+func (wu writeWindowUpdate) writeFrame(ctx writeContext) error {
fr := ctx.Framer()
if err := fr.WriteWindowUpdate(0, wu.n); err != nil {
return err
diff --git a/writesched.go b/writesched.go
index 6365ec0..0393f62 100644
--- a/writesched.go
+++ b/writesched.go
@@ -9,19 +9,12 @@
// frameWriteMsg is a request to write a frame.
type frameWriteMsg struct {
- // write is the function that does the writing, once the
+ // write is the interface value that does the writing, once the
// writeScheduler (below) has decided to select this frame
// to write. The write functions are all defined in write.go.
- write func(ctx writeContext, v interface{}) error
+ write writeFramer
- // v is the argument passed to the write function. See each
- // function in write.go to see which type they should be,
- // depending on what write is.
- v interface{}
-
- cost uint32 // if DATA, number of flow control bytes required
- stream *stream // used for prioritization
- endStream bool // stream is being closed locally
+ stream *stream // used for prioritization. nil for non-stream frames.
// done, if non-nil, must be a buffered channel with space for
// 1 message and is sent the return value from write (or an
@@ -36,6 +29,8 @@
// They're sent before any stream-specific freams.
zero writeQueue
+ // sq contains the stream-specific queues, keyed by stream ID.
+ // when a stream is idle, it's deleted from the map.
sq map[uint32]*writeQueue
}
@@ -123,4 +118,9 @@
return wm
}
-func (q *writeQueue) firstIsNoCost() bool { return q.s[0].cost == 0 }
+func (q *writeQueue) firstIsNoCost() bool {
+ if df, ok := q.s[0].write.(*writeData); ok {
+ return len(df.p) == 0
+ }
+ return true
+}