http2: change the pipe and buffer code
Make the pipe code take an interface as the backing store. Now a pipe
is something that's goroutine-safe and does the Cond waits but its underlying data
is now an interface: anything that's a ReaderWriter with a Len method (such as a
*bytes.Buffer), or a fixedBuffer (renamed in this CL from 'buffer').
This opens the ground to having a non-fixed buffer used with pipe.
This also moves the CloseWithError code up into the pipe code, out of
fixedBuffer.
Change-Id: Ia3b853e8aa8920807b705ff4e41bed934a8c67b7
Reviewed-on: https://go-review.googlesource.com/16312
Reviewed-by: Blake Mizerany <blake.mizerany@gmail.com>
diff --git a/http2/buffer.go b/http2/buffer.go
deleted file mode 100644
index 907e298..0000000
--- a/http2/buffer.go
+++ /dev/null
@@ -1,75 +0,0 @@
-// Copyright 2014 The Go Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package http2
-
-import (
- "errors"
-)
-
-// buffer is an io.ReadWriteCloser backed by a fixed size buffer.
-// It never allocates, but moves old data as new data is written.
-type buffer struct {
- buf []byte
- r, w int
- closed bool
- err error // err to return to reader
-}
-
-var (
- errReadEmpty = errors.New("read from empty buffer")
- errWriteClosed = errors.New("write on closed buffer")
- errWriteFull = errors.New("write on full buffer")
-)
-
-// Read copies bytes from the buffer into p.
-// It is an error to read when no data is available.
-func (b *buffer) Read(p []byte) (n int, err error) {
- n = copy(p, b.buf[b.r:b.w])
- b.r += n
- if b.closed && b.r == b.w {
- err = b.err
- } else if b.r == b.w && n == 0 {
- err = errReadEmpty
- }
- return n, err
-}
-
-// Len returns the number of bytes of the unread portion of the buffer.
-func (b *buffer) Len() int {
- return b.w - b.r
-}
-
-// Write copies bytes from p into the buffer.
-// It is an error to write more data than the buffer can hold.
-func (b *buffer) Write(p []byte) (n int, err error) {
- if b.closed {
- return 0, errWriteClosed
- }
-
- // Slide existing data to beginning.
- if b.r > 0 && len(p) > len(b.buf)-b.w {
- copy(b.buf, b.buf[b.r:b.w])
- b.w -= b.r
- b.r = 0
- }
-
- // Write new data.
- n = copy(b.buf[b.w:], p)
- b.w += n
- if n < len(p) {
- err = errWriteFull
- }
- return n, err
-}
-
-// Close marks the buffer as closed. Future calls to Write will
-// return an error. Future calls to Read, once the buffer is
-// empty, will return err.
-func (b *buffer) Close(err error) {
- if !b.closed {
- b.closed = true
- b.err = err
- }
-}
diff --git a/http2/fixed_buffer.go b/http2/fixed_buffer.go
new file mode 100644
index 0000000..47da0f0
--- /dev/null
+++ b/http2/fixed_buffer.go
@@ -0,0 +1,60 @@
+// Copyright 2014 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package http2
+
+import (
+ "errors"
+)
+
+// fixedBuffer is an io.ReadWriter backed by a fixed size buffer.
+// It never allocates, but moves old data as new data is written.
+type fixedBuffer struct {
+ buf []byte
+ r, w int
+}
+
+var (
+ errReadEmpty = errors.New("read from empty fixedBuffer")
+ errWriteFull = errors.New("write on full fixedBuffer")
+)
+
+// Read copies bytes from the buffer into p.
+// It is an error to read when no data is available.
+func (b *fixedBuffer) Read(p []byte) (n int, err error) {
+ if b.r == b.w {
+ return 0, errReadEmpty
+ }
+ n = copy(p, b.buf[b.r:b.w])
+ b.r += n
+ if b.r == b.w {
+ b.r = 0
+ b.w = 0
+ }
+ return n, nil
+}
+
+// Len returns the number of bytes of the unread portion of the buffer.
+func (b *fixedBuffer) Len() int {
+ return b.w - b.r
+}
+
+// Write copies bytes from p into the buffer.
+// It is an error to write more data than the buffer can hold.
+func (b *fixedBuffer) Write(p []byte) (n int, err error) {
+ // Slide existing data to beginning.
+ if b.r > 0 && len(p) > len(b.buf)-b.w {
+ copy(b.buf, b.buf[b.r:b.w])
+ b.w -= b.r
+ b.r = 0
+ }
+
+ // Write new data.
+ n = copy(b.buf[b.w:], p)
+ b.w += n
+ if n < len(p) {
+ err = errWriteFull
+ }
+ return n, err
+}
diff --git a/http2/buffer_test.go b/http2/fixed_buffer_test.go
similarity index 64%
rename from http2/buffer_test.go
rename to http2/fixed_buffer_test.go
index 9d7d98b..f5432f8 100644
--- a/http2/buffer_test.go
+++ b/http2/fixed_buffer_test.go
@@ -5,47 +5,36 @@
package http2
import (
- "io"
"reflect"
"testing"
)
var bufferReadTests = []struct {
- buf buffer
+ buf fixedBuffer
read, wn int
werr error
wp []byte
- wbuf buffer
+ wbuf fixedBuffer
}{
{
- buffer{[]byte{'a', 0}, 0, 1, false, nil},
+ fixedBuffer{[]byte{'a', 0}, 0, 1},
5, 1, nil, []byte{'a'},
- buffer{[]byte{'a', 0}, 1, 1, false, nil},
+ fixedBuffer{[]byte{'a', 0}, 0, 0},
},
{
- buffer{[]byte{'a', 0}, 0, 1, true, io.EOF},
- 5, 1, io.EOF, []byte{'a'},
- buffer{[]byte{'a', 0}, 1, 1, true, io.EOF},
- },
- {
- buffer{[]byte{0, 'a'}, 1, 2, false, nil},
+ fixedBuffer{[]byte{0, 'a'}, 1, 2},
5, 1, nil, []byte{'a'},
- buffer{[]byte{0, 'a'}, 2, 2, false, nil},
+ fixedBuffer{[]byte{0, 'a'}, 0, 0},
},
{
- buffer{[]byte{0, 'a'}, 1, 2, true, io.EOF},
- 5, 1, io.EOF, []byte{'a'},
- buffer{[]byte{0, 'a'}, 2, 2, true, io.EOF},
+ fixedBuffer{[]byte{'a', 'b'}, 0, 2},
+ 1, 1, nil, []byte{'a'},
+ fixedBuffer{[]byte{'a', 'b'}, 1, 2},
},
{
- buffer{[]byte{}, 0, 0, false, nil},
+ fixedBuffer{[]byte{}, 0, 0},
5, 0, errReadEmpty, []byte{},
- buffer{[]byte{}, 0, 0, false, nil},
- },
- {
- buffer{[]byte{}, 0, 0, true, io.EOF},
- 5, 0, io.EOF, []byte{},
- buffer{[]byte{}, 0, 0, true, io.EOF},
+ fixedBuffer{[]byte{}, 0, 0},
},
}
@@ -72,64 +61,50 @@
}
var bufferWriteTests = []struct {
- buf buffer
+ buf fixedBuffer
write, wn int
werr error
- wbuf buffer
+ wbuf fixedBuffer
}{
{
- buf: buffer{
+ buf: fixedBuffer{
buf: []byte{},
},
- wbuf: buffer{
+ wbuf: fixedBuffer{
buf: []byte{},
},
},
{
- buf: buffer{
+ buf: fixedBuffer{
buf: []byte{1, 'a'},
},
write: 1,
wn: 1,
- wbuf: buffer{
+ wbuf: fixedBuffer{
buf: []byte{0, 'a'},
w: 1,
},
},
{
- buf: buffer{
+ buf: fixedBuffer{
buf: []byte{'a', 1},
r: 1,
w: 1,
},
write: 2,
wn: 2,
- wbuf: buffer{
+ wbuf: fixedBuffer{
buf: []byte{0, 0},
w: 2,
},
},
{
- buf: buffer{
- buf: []byte{},
- r: 1,
- closed: true,
- },
- write: 5,
- werr: errWriteClosed,
- wbuf: buffer{
- buf: []byte{},
- r: 1,
- closed: true,
- },
- },
- {
- buf: buffer{
+ buf: fixedBuffer{
buf: []byte{},
},
write: 5,
werr: errWriteFull,
- wbuf: buffer{
+ wbuf: fixedBuffer{
buf: []byte{},
},
},
diff --git a/http2/pipe.go b/http2/pipe.go
index 51699dc..72a1fdc 100644
--- a/http2/pipe.go
+++ b/http2/pipe.go
@@ -5,38 +5,78 @@
package http2
import (
+ "errors"
+ "io"
"sync"
)
+// pipe is a goroutine-safe io.Reader/io.Writer pair. It's like
+// io.Pipe except there are no PipeReader/PipeWriter halves, and the
+// underlying buffer is an interface. (io.Pipe is always unbuffered)
type pipe struct {
- b buffer
- c sync.Cond
- m sync.Mutex
+ mu sync.Mutex
+ c sync.Cond // c.L must point to
+ b pipeBuffer
+ err error // read error once empty. non-nil means closed.
+}
+
+type pipeBuffer interface {
+ Len() int
+ io.Writer
+ io.Reader
}
// Read waits until data is available and copies bytes
// from the buffer into p.
-func (r *pipe) Read(p []byte) (n int, err error) {
- r.c.L.Lock()
- defer r.c.L.Unlock()
- for r.b.Len() == 0 && !r.b.closed {
- r.c.Wait()
+func (p *pipe) Read(d []byte) (n int, err error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.c.L == nil {
+ p.c.L = &p.mu
}
- return r.b.Read(p)
+ for {
+ if p.b.Len() > 0 {
+ return p.b.Read(d)
+ }
+ if p.err != nil {
+ return 0, p.err
+ }
+ p.c.Wait()
+ }
}
+var errClosedPipeWrite = errors.New("write on closed buffer")
+
// Write copies bytes from p into the buffer and wakes a reader.
// It is an error to write more data than the buffer can hold.
-func (w *pipe) Write(p []byte) (n int, err error) {
- w.c.L.Lock()
- defer w.c.L.Unlock()
- defer w.c.Signal()
- return w.b.Write(p)
+func (p *pipe) Write(d []byte) (n int, err error) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.c.L == nil {
+ p.c.L = &p.mu
+ }
+ defer p.c.Signal()
+ if p.err != nil {
+ return 0, errClosedPipeWrite
+ }
+ return p.b.Write(d)
}
-func (c *pipe) Close(err error) {
- c.c.L.Lock()
- defer c.c.L.Unlock()
- defer c.c.Signal()
- c.b.Close(err)
+// CloseWithError causes Reads to wake up and return the
+// provided err after all data has been read.
+//
+// The error must be non-nil.
+func (p *pipe) CloseWithError(err error) {
+ if err == nil {
+ panic("CloseWithError must be non-nil")
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.c.L == nil {
+ p.c.L = &p.mu
+ }
+ defer p.c.Signal()
+ if p.err == nil {
+ p.err = err
+ }
}
diff --git a/http2/pipe_test.go b/http2/pipe_test.go
index 5283b66..002ce05 100644
--- a/http2/pipe_test.go
+++ b/http2/pipe_test.go
@@ -5,17 +5,18 @@
package http2
import (
+ "bytes"
"errors"
"testing"
)
func TestPipeClose(t *testing.T) {
var p pipe
- p.c.L = &p.m
+ p.b = new(bytes.Buffer)
a := errors.New("a")
b := errors.New("b")
- p.Close(a)
- p.Close(b)
+ p.CloseWithError(a)
+ p.CloseWithError(b)
_, err := p.Read(make([]byte, 1))
if err != a {
t.Errorf("err = %v want %v", err, a)
diff --git a/http2/server.go b/http2/server.go
index ba408fe..5fb92cd 100644
--- a/http2/server.go
+++ b/http2/server.go
@@ -65,6 +65,7 @@
var (
errClientDisconnected = errors.New("client disconnected")
errClosedBody = errors.New("body closed by handler")
+ errHandlerComplete = errors.New("http2: request body closed due to handler exiting")
errStreamClosed = errors.New("http2: stream closed")
)
@@ -872,7 +873,7 @@
errCancel := StreamError{st.id, ErrCodeCancel}
sc.resetStream(errCancel)
case stateHalfClosedRemote:
- sc.closeStream(st, nil)
+ sc.closeStream(st, errHandlerComplete)
}
}
@@ -1142,7 +1143,7 @@
}
delete(sc.streams, st.id)
if p := st.body; p != nil {
- p.Close(err)
+ p.CloseWithError(err)
}
st.cw.Close() // signals Handler's CloseNotifier, unblocks writes, etc
sc.writeSched.forgetStream(st.id)
@@ -1246,7 +1247,7 @@
// Sender sending more than they'd declared?
if st.declBodyBytes != -1 && st.bodyBytes+int64(len(data)) > st.declBodyBytes {
- st.body.Close(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
+ st.body.CloseWithError(fmt.Errorf("sender tried to send more than declared Content-Length of %d bytes", st.declBodyBytes))
return StreamError{id, ErrCodeStreamClosed}
}
if len(data) > 0 {
@@ -1266,10 +1267,10 @@
}
if f.StreamEnded() {
if st.declBodyBytes != -1 && st.declBodyBytes != st.bodyBytes {
- st.body.Close(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
+ st.body.CloseWithError(fmt.Errorf("request declared a Content-Length of %d but only wrote %d bytes",
st.declBodyBytes, st.bodyBytes))
} else {
- st.body.Close(io.EOF)
+ st.body.CloseWithError(io.EOF)
}
st.state = stateHalfClosedRemote
}
@@ -1493,9 +1494,8 @@
}
if bodyOpen {
body.pipe = &pipe{
- b: buffer{buf: make([]byte, initialWindowSize)}, // TODO: share/remove XXX
+ b: &fixedBuffer{buf: make([]byte, initialWindowSize)}, // TODO: share/remove XXX
}
- body.pipe.c.L = &body.pipe.m
if vv, ok := rp.header["Content-Length"]; ok {
req.ContentLength, _ = strconv.ParseInt(vv[0], 10, 64)
@@ -1655,7 +1655,7 @@
func (b *requestBody) Close() error {
if b.pipe != nil {
- b.pipe.Close(errClosedBody)
+ b.pipe.CloseWithError(errClosedBody)
}
b.closed = true
return nil