net: reduce number of memory allocations during IO operations
Embed all data necessary for read/write operations directly into netFD.
benchmark old ns/op new ns/op delta
BenchmarkTCP4Persistent 27669 23341 -15.64%
BenchmarkTCP4Persistent-2 18173 12558 -30.90%
BenchmarkTCP4Persistent-4 10390 7319 -29.56%
This change will intentionally break all builders to see
how many allocations they do per read/write.
This will be fixed soon afterwards.
R=golang-dev, alex.brainman
CC=golang-dev
https://golang.org/cl/12413043
diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go
index f51d161..9ed99ed 100644
--- a/src/pkg/net/fd_windows.go
+++ b/src/pkg/net/fd_windows.go
@@ -67,16 +67,8 @@
return dial(net, addr, localAddr, ra, deadline)
}
-// Interface for all IO operations.
-type anOpIface interface {
- Op() *anOp
- Name() string
- Submit() error
-}
-
-// anOp implements functionality common to all IO operations.
-// Its beginning must be the same as runtime.net_anOp. Keep these in sync.
-type anOp struct {
+// operation contains superset of data necessary to perform all async IO.
+type operation struct {
// Used by IOCP interface, it must be first field
// of the struct, as our code rely on it.
o syscall.Overlapped
@@ -87,53 +79,34 @@
errno int32
qty uint32
- errnoc chan error
+ // fields used only by net package
+ mu sync.Mutex
fd *netFD
+ errc chan error
+ buf syscall.WSABuf
+ sa syscall.Sockaddr
+ rsa *syscall.RawSockaddrAny
+ rsan int32
+ handle syscall.Handle
+ flags uint32
}
-func (o *anOp) Init(fd *netFD, mode int32) {
- o.fd = fd
- o.mode = mode
- o.runtimeCtx = fd.pd.runtimeCtx
- if !canCancelIO {
- var i int
- if mode == 'r' {
- i = 0
- } else {
- i = 1
- }
- if fd.errnoc[i] == nil {
- fd.errnoc[i] = make(chan error)
- }
- o.errnoc = fd.errnoc[i]
- }
-}
-
-func (o *anOp) Op() *anOp {
- return o
-}
-
-// bufOp is used by IO operations that read / write
-// data from / to client buffer.
-type bufOp struct {
- anOp
- buf syscall.WSABuf
-}
-
-func (o *bufOp) Init(fd *netFD, buf []byte, mode int32) {
- o.anOp.Init(fd, mode)
+func (o *operation) InitBuf(buf []byte) {
o.buf.Len = uint32(len(buf))
- if len(buf) == 0 {
- o.buf.Buf = nil
- } else {
+ o.buf.Buf = nil
+ if len(buf) != 0 {
o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0]))
}
}
// ioSrv executes net IO requests.
type ioSrv struct {
- submchan chan anOpIface // submit IO requests
- canchan chan anOpIface // cancel IO requests
+ req chan ioSrvReq
+}
+
+type ioSrvReq struct {
+ o *operation
+ submit func(o *operation) error // if nil, cancel the operation
}
// ProcessRemoteIO will execute submit IO requests on behalf
@@ -144,36 +117,34 @@
func (s *ioSrv) ProcessRemoteIO() {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
- for {
- select {
- case o := <-s.submchan:
- o.Op().errnoc <- o.Submit()
- case o := <-s.canchan:
- o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd))
+ for r := range s.req {
+ if r.submit != nil {
+ r.o.errc <- r.submit(r.o)
+ } else {
+ r.o.errc <- syscall.CancelIo(r.o.fd.sysfd)
}
}
}
-// ExecIO executes a single IO operation oi. It submits and cancels
+// ExecIO executes a single IO operation o. It submits and cancels
// IO in the current thread for systems where Windows CancelIoEx API
// is available. Alternatively, it passes the request onto
// runtime netpoll and waits for completion or cancels request.
-func (s *ioSrv) ExecIO(oi anOpIface) (int, error) {
- var err error
- o := oi.Op()
+func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) {
+ fd := o.fd
// Notify runtime netpoll about starting IO.
- err = o.fd.pd.Prepare(int(o.mode))
+ err := fd.pd.Prepare(int(o.mode))
if err != nil {
- return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+ return 0, &OpError{name, fd.net, fd.laddr, err}
}
// Start IO.
if canCancelIO {
- err = oi.Submit()
+ err = submit(o)
} else {
// Send request to a special dedicated thread,
// so it can stop the IO with CancelIO later.
- s.submchan <- oi
- err = <-o.errnoc
+ s.req <- ioSrvReq{o, submit}
+ err = <-o.errc
}
switch err {
case nil:
@@ -182,15 +153,15 @@
// IO started, and we have to wait for its completion.
err = nil
default:
- return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+ return 0, &OpError{name, fd.net, fd.laddr, err}
}
// Wait for our request to complete.
- err = o.fd.pd.Wait(int(o.mode))
+ err = fd.pd.Wait(int(o.mode))
if err == nil {
// All is good. Extract our IO results and return.
if o.errno != 0 {
err = syscall.Errno(o.errno)
- return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+ return 0, &OpError{name, fd.net, fd.laddr, err}
}
return int(o.qty), nil
}
@@ -204,24 +175,24 @@
}
// Cancel our request.
if canCancelIO {
- err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o)
+ err := syscall.CancelIoEx(fd.sysfd, &o.o)
// Assuming ERROR_NOT_FOUND is returned, if IO is completed.
if err != nil && err != syscall.ERROR_NOT_FOUND {
// TODO(brainman): maybe do something else, but panic.
panic(err)
}
} else {
- s.canchan <- oi
- <-o.errnoc
+ s.req <- ioSrvReq{o, nil}
+ <-o.errc
}
// Wait for cancellation to complete.
- o.fd.pd.WaitCanceled(int(o.mode))
+ fd.pd.WaitCanceled(int(o.mode))
if o.errno != 0 {
err = syscall.Errno(o.errno)
if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
err = netpollErr
}
- return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+ return 0, &OpError{name, fd.net, fd.laddr, err}
}
// We issued cancellation request. But, it seems, IO operation succeeded
// before cancellation request run. We need to treat IO operation as
@@ -238,8 +209,7 @@
if !canCancelIO {
// Only CancelIo API is available. Lets start special goroutine
// locked to an OS thread, that both starts and cancels IO.
- iosrv.submchan = make(chan anOpIface)
- iosrv.canchan = make(chan anOpIface)
+ iosrv.req = make(chan ioSrvReq)
go iosrv.ProcessRemoteIO()
}
}
@@ -259,30 +229,39 @@
net string
laddr Addr
raddr Addr
- errnoc [2]chan error // read/write submit or cancel operation errors
- // serialize access to Read and Write methods
- rio, wio sync.Mutex
+ rop operation // read operation
+ wop operation // write operation
// wait server
pd pollDesc
}
-func newFD(fd syscall.Handle, family, sotype int, net string) (*netFD, error) {
+func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
if initErr != nil {
return nil, initErr
}
onceStartServer.Do(startServer)
- netfd := &netFD{
- sysfd: fd,
+ fd := &netFD{
+ sysfd: sysfd,
family: family,
sotype: sotype,
net: net,
}
- if err := netfd.pd.Init(netfd); err != nil {
+ if err := fd.pd.Init(fd); err != nil {
return nil, err
}
- return netfd, nil
+ fd.rop.mode = 'r'
+ fd.wop.mode = 'w'
+ fd.rop.fd = fd
+ fd.wop.fd = fd
+ fd.rop.runtimeCtx = fd.pd.runtimeCtx
+ fd.wop.runtimeCtx = fd.pd.runtimeCtx
+ if !canCancelIO {
+ fd.rop.errc = make(chan error)
+ fd.rop.errc = make(chan error)
+ }
+ return fd, nil
}
func (fd *netFD) setAddr(laddr, raddr Addr) {
@@ -291,21 +270,6 @@
runtime.SetFinalizer(fd, (*netFD).Close)
}
-// Make new connection.
-
-type connectOp struct {
- anOp
- ra syscall.Sockaddr
-}
-
-func (o *connectOp) Submit() error {
- return syscall.ConnectEx(o.fd.sysfd, o.ra, nil, 0, nil, &o.o)
-}
-
-func (o *connectOp) Name() string {
- return "ConnectEx"
-}
-
func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
if !canUseConnectEx(fd.net) {
return syscall.Connect(fd.sysfd, ra)
@@ -325,10 +289,13 @@
}
}
// Call ConnectEx API.
- var o connectOp
- o.Init(fd, 'w')
- o.ra = ra
- _, err := iosrv.ExecIO(&o)
+ o := &fd.wop
+ o.mu.Lock()
+ defer o.mu.Unlock()
+ o.sa = ra
+ _, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error {
+ return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o)
+ })
if err != nil {
return err
}
@@ -385,10 +352,10 @@
// unblock pending reader and writer
fd.pd.Evict()
// wait for both reader and writer to exit
- fd.rio.Lock()
- defer fd.rio.Unlock()
- fd.wio.Lock()
- defer fd.wio.Unlock()
+ fd.rop.mu.Lock()
+ fd.wop.mu.Lock()
+ fd.rop.mu.Unlock()
+ fd.wop.mu.Unlock()
return nil
}
@@ -412,54 +379,24 @@
return fd.shutdown(syscall.SHUT_WR)
}
-// Read from network.
-
-type readOp struct {
- bufOp
-}
-
-func (o *readOp) Submit() error {
- var d, f uint32
- return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil)
-}
-
-func (o *readOp) Name() string {
- return "WSARecv"
-}
-
func (fd *netFD) Read(buf []byte) (int, error) {
if err := fd.incref(false); err != nil {
return 0, err
}
defer fd.decref()
- fd.rio.Lock()
- defer fd.rio.Unlock()
- var o readOp
- o.Init(fd, buf, 'r')
- n, err := iosrv.ExecIO(&o)
+ o := &fd.rop
+ o.mu.Lock()
+ defer o.mu.Unlock()
+ o.InitBuf(buf)
+ n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error {
+ return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
+ })
if err == nil && n == 0 {
err = io.EOF
}
return n, err
}
-// ReadFrom from network.
-
-type readFromOp struct {
- bufOp
- rsa syscall.RawSockaddrAny
- rsan int32
-}
-
-func (o *readFromOp) Submit() error {
- var d, f uint32
- return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil)
-}
-
-func (o *readFromOp) Name() string {
- return "WSARecvFrom"
-}
-
func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
if len(buf) == 0 {
return 0, nil, nil
@@ -468,12 +405,17 @@
return 0, nil, err
}
defer fd.decref()
- fd.rio.Lock()
- defer fd.rio.Unlock()
- var o readFromOp
- o.Init(fd, buf, 'r')
- o.rsan = int32(unsafe.Sizeof(o.rsa))
- n, err = iosrv.ExecIO(&o)
+ o := &fd.rop
+ o.mu.Lock()
+ defer o.mu.Unlock()
+ o.InitBuf(buf)
+ n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error {
+ if o.rsa == nil {
+ o.rsa = new(syscall.RawSockaddrAny)
+ }
+ o.rsan = int32(unsafe.Sizeof(*o.rsa))
+ return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil)
+ })
if err != nil {
return 0, nil, err
}
@@ -481,47 +423,18 @@
return
}
-// Write to network.
-
-type writeOp struct {
- bufOp
-}
-
-func (o *writeOp) Submit() error {
- var d uint32
- return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil)
-}
-
-func (o *writeOp) Name() string {
- return "WSASend"
-}
-
func (fd *netFD) Write(buf []byte) (int, error) {
if err := fd.incref(false); err != nil {
return 0, err
}
defer fd.decref()
- fd.wio.Lock()
- defer fd.wio.Unlock()
- var o writeOp
- o.Init(fd, buf, 'w')
- return iosrv.ExecIO(&o)
-}
-
-// WriteTo to network.
-
-type writeToOp struct {
- bufOp
- sa syscall.Sockaddr
-}
-
-func (o *writeToOp) Submit() error {
- var d uint32
- return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil)
-}
-
-func (o *writeToOp) Name() string {
- return "WSASendto"
+ o := &fd.wop
+ o.mu.Lock()
+ defer o.mu.Unlock()
+ o.InitBuf(buf)
+ return iosrv.ExecIO(o, "WSASend", func(o *operation) error {
+ return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
+ })
}
func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
@@ -532,31 +445,14 @@
return 0, err
}
defer fd.decref()
- fd.wio.Lock()
- defer fd.wio.Unlock()
- var o writeToOp
- o.Init(fd, buf, 'w')
+ o := &fd.wop
+ o.mu.Lock()
+ defer o.mu.Unlock()
+ o.InitBuf(buf)
o.sa = sa
- return iosrv.ExecIO(&o)
-}
-
-// Accept new network connections.
-
-type acceptOp struct {
- anOp
- newsock syscall.Handle
- attrs [2]syscall.RawSockaddrAny // space for local and remote address only
-}
-
-func (o *acceptOp) Submit() error {
- var d uint32
- l := uint32(unsafe.Sizeof(o.attrs[0]))
- return syscall.AcceptEx(o.fd.sysfd, o.newsock,
- (*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o)
-}
-
-func (o *acceptOp) Name() string {
- return "AcceptEx"
+ return iosrv.ExecIO(o, "WSASendto", func(o *operation) error {
+ return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
+ })
}
func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
@@ -579,12 +475,15 @@
}
// Submit accept request.
- fd.rio.Lock()
- defer fd.rio.Unlock()
- var o acceptOp
- o.Init(fd, 'r')
- o.newsock = s
- _, err = iosrv.ExecIO(&o)
+ o := &fd.rop
+ o.mu.Lock()
+ defer o.mu.Unlock()
+ o.handle = s
+ var rawsa [2]syscall.RawSockaddrAny
+ o.rsan = int32(unsafe.Sizeof(rawsa[0]))
+ _, err = iosrv.ExecIO(o, "AcceptEx", func(o *operation) error {
+ return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
+ })
if err != nil {
netfd.Close()
return nil, err
@@ -600,9 +499,8 @@
// Get local and peer addr out of AcceptEx buffer.
var lrsa, rrsa *syscall.RawSockaddrAny
var llen, rlen int32
- l := uint32(unsafe.Sizeof(*lrsa))
- syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])),
- 0, l, l, &lrsa, &llen, &rrsa, &rlen)
+ syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])),
+ 0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen)
lsa, _ := lrsa.Sockaddr()
rsa, _ := rrsa.Sockaddr()
diff --git a/src/pkg/net/sendfile_windows.go b/src/pkg/net/sendfile_windows.go
index 5012583..e9b9f91 100644
--- a/src/pkg/net/sendfile_windows.go
+++ b/src/pkg/net/sendfile_windows.go
@@ -10,20 +10,6 @@
"syscall"
)
-type sendfileOp struct {
- anOp
- src syscall.Handle // source
- n uint32
-}
-
-func (o *sendfileOp) Submit() (err error) {
- return syscall.TransmitFile(o.fd.sysfd, o.src, o.n, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
-}
-
-func (o *sendfileOp) Name() string {
- return "TransmitFile"
-}
-
// sendFile copies the contents of r to c using the TransmitFile
// system call to minimize copies.
//
@@ -33,7 +19,7 @@
// if handled == false, sendFile performed no work.
//
// Note that sendfile for windows does not suppport >2GB file.
-func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
+func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) {
var n int64 = 0 // by default, copy until EOF
lr, ok := r.(*io.LimitedReader)
@@ -48,18 +34,18 @@
return 0, nil, false
}
- if err := c.incref(false); err != nil {
+ if err := fd.incref(false); err != nil {
return 0, err, true
}
- defer c.decref()
- c.wio.Lock()
- defer c.wio.Unlock()
-
- var o sendfileOp
- o.Init(c, 'w')
- o.n = uint32(n)
- o.src = syscall.Handle(f.Fd())
- done, err := iosrv.ExecIO(&o)
+ defer fd.decref()
+ o := &fd.wop
+ o.mu.Lock()
+ defer o.mu.Unlock()
+ o.qty = uint32(n)
+ o.handle = syscall.Handle(f.Fd())
+ done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error {
+ return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
+ })
if err != nil {
return 0, err, false
}
diff --git a/src/pkg/net/tcp_test.go b/src/pkg/net/tcp_test.go
index f356f92..dedd41d 100644
--- a/src/pkg/net/tcp_test.go
+++ b/src/pkg/net/tcp_test.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "io"
"reflect"
"runtime"
"sync"
@@ -327,3 +328,46 @@
ln.Close()
wg.Wait()
}
+
+func TestTCPReadWriteMallocs(t *testing.T) {
+ maxMallocs := 0
+ switch runtime.GOOS {
+ // Add other OSes if you know how many mallocs they do.
+ case "windows":
+ maxMallocs = 0
+ }
+ ln, err := Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ t.Fatalf("Listen failed: %v", err)
+ }
+ defer ln.Close()
+ var server Conn
+ errc := make(chan error)
+ go func() {
+ var err error
+ server, err = ln.Accept()
+ errc <- err
+ }()
+ client, err := Dial("tcp", ln.Addr().String())
+ if err != nil {
+ t.Fatalf("Dial failed: %v", err)
+ }
+ if err := <-errc; err != nil {
+ t.Fatalf("Accept failed: %v", err)
+ }
+ defer server.Close()
+ var buf [128]byte
+ mallocs := testing.AllocsPerRun(1000, func() {
+ _, err := server.Write(buf[:])
+ if err != nil {
+ t.Fatalf("Write failed: %v", err)
+ }
+ _, err = io.ReadFull(client, buf[:])
+ if err != nil {
+ t.Fatalf("Read failed: %v", err)
+ }
+ })
+ if int(mallocs) > maxMallocs {
+ t.Fatalf("Got %v allocs, want %v", mallocs, maxMallocs)
+ }
+}