net: reduce number of memory allocations during IO operations
Embed all data necessary for read/write operations directly into netFD.

benchmark                    old ns/op    new ns/op    delta
BenchmarkTCP4Persistent          27669        23341  -15.64%
BenchmarkTCP4Persistent-2        18173        12558  -30.90%
BenchmarkTCP4Persistent-4        10390         7319  -29.56%

This change will intentionally break all builders to see
how many allocations they do per read/write.
This will be fixed soon afterwards.

R=golang-dev, alex.brainman
CC=golang-dev
https://golang.org/cl/12413043
diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go
index f51d161..9ed99ed 100644
--- a/src/pkg/net/fd_windows.go
+++ b/src/pkg/net/fd_windows.go
@@ -67,16 +67,8 @@
 	return dial(net, addr, localAddr, ra, deadline)
 }
 
-// Interface for all IO operations.
-type anOpIface interface {
-	Op() *anOp
-	Name() string
-	Submit() error
-}
-
-// anOp implements functionality common to all IO operations.
-// Its beginning must be the same as runtime.net_anOp. Keep these in sync.
-type anOp struct {
+// operation contains superset of data necessary to perform all async IO.
+type operation struct {
 	// Used by IOCP interface, it must be first field
 	// of the struct, as our code rely on it.
 	o syscall.Overlapped
@@ -87,53 +79,34 @@
 	errno      int32
 	qty        uint32
 
-	errnoc chan error
+	// fields used only by net package
+	mu     sync.Mutex
 	fd     *netFD
+	errc   chan error
+	buf    syscall.WSABuf
+	sa     syscall.Sockaddr
+	rsa    *syscall.RawSockaddrAny
+	rsan   int32
+	handle syscall.Handle
+	flags  uint32
 }
 
-func (o *anOp) Init(fd *netFD, mode int32) {
-	o.fd = fd
-	o.mode = mode
-	o.runtimeCtx = fd.pd.runtimeCtx
-	if !canCancelIO {
-		var i int
-		if mode == 'r' {
-			i = 0
-		} else {
-			i = 1
-		}
-		if fd.errnoc[i] == nil {
-			fd.errnoc[i] = make(chan error)
-		}
-		o.errnoc = fd.errnoc[i]
-	}
-}
-
-func (o *anOp) Op() *anOp {
-	return o
-}
-
-// bufOp is used by IO operations that read / write
-// data from / to client buffer.
-type bufOp struct {
-	anOp
-	buf syscall.WSABuf
-}
-
-func (o *bufOp) Init(fd *netFD, buf []byte, mode int32) {
-	o.anOp.Init(fd, mode)
+func (o *operation) InitBuf(buf []byte) {
 	o.buf.Len = uint32(len(buf))
-	if len(buf) == 0 {
-		o.buf.Buf = nil
-	} else {
+	o.buf.Buf = nil
+	if len(buf) != 0 {
 		o.buf.Buf = (*byte)(unsafe.Pointer(&buf[0]))
 	}
 }
 
 // ioSrv executes net IO requests.
 type ioSrv struct {
-	submchan chan anOpIface // submit IO requests
-	canchan  chan anOpIface // cancel IO requests
+	req chan ioSrvReq
+}
+
+type ioSrvReq struct {
+	o      *operation
+	submit func(o *operation) error // if nil, cancel the operation
 }
 
 // ProcessRemoteIO will execute submit IO requests on behalf
@@ -144,36 +117,34 @@
 func (s *ioSrv) ProcessRemoteIO() {
 	runtime.LockOSThread()
 	defer runtime.UnlockOSThread()
-	for {
-		select {
-		case o := <-s.submchan:
-			o.Op().errnoc <- o.Submit()
-		case o := <-s.canchan:
-			o.Op().errnoc <- syscall.CancelIo(syscall.Handle(o.Op().fd.sysfd))
+	for r := range s.req {
+		if r.submit != nil {
+			r.o.errc <- r.submit(r.o)
+		} else {
+			r.o.errc <- syscall.CancelIo(r.o.fd.sysfd)
 		}
 	}
 }
 
-// ExecIO executes a single IO operation oi. It submits and cancels
+// ExecIO executes a single IO operation o. It submits and cancels
 // IO in the current thread for systems where Windows CancelIoEx API
 // is available. Alternatively, it passes the request onto
 // runtime netpoll and waits for completion or cancels request.
-func (s *ioSrv) ExecIO(oi anOpIface) (int, error) {
-	var err error
-	o := oi.Op()
+func (s *ioSrv) ExecIO(o *operation, name string, submit func(o *operation) error) (int, error) {
+	fd := o.fd
 	// Notify runtime netpoll about starting IO.
-	err = o.fd.pd.Prepare(int(o.mode))
+	err := fd.pd.Prepare(int(o.mode))
 	if err != nil {
-		return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+		return 0, &OpError{name, fd.net, fd.laddr, err}
 	}
 	// Start IO.
 	if canCancelIO {
-		err = oi.Submit()
+		err = submit(o)
 	} else {
 		// Send request to a special dedicated thread,
 		// so it can stop the IO with CancelIO later.
-		s.submchan <- oi
-		err = <-o.errnoc
+		s.req <- ioSrvReq{o, submit}
+		err = <-o.errc
 	}
 	switch err {
 	case nil:
@@ -182,15 +153,15 @@
 		// IO started, and we have to wait for its completion.
 		err = nil
 	default:
-		return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+		return 0, &OpError{name, fd.net, fd.laddr, err}
 	}
 	// Wait for our request to complete.
-	err = o.fd.pd.Wait(int(o.mode))
+	err = fd.pd.Wait(int(o.mode))
 	if err == nil {
 		// All is good. Extract our IO results and return.
 		if o.errno != 0 {
 			err = syscall.Errno(o.errno)
-			return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+			return 0, &OpError{name, fd.net, fd.laddr, err}
 		}
 		return int(o.qty), nil
 	}
@@ -204,24 +175,24 @@
 	}
 	// Cancel our request.
 	if canCancelIO {
-		err := syscall.CancelIoEx(syscall.Handle(o.Op().fd.sysfd), &o.o)
+		err := syscall.CancelIoEx(fd.sysfd, &o.o)
 		// Assuming ERROR_NOT_FOUND is returned, if IO is completed.
 		if err != nil && err != syscall.ERROR_NOT_FOUND {
 			// TODO(brainman): maybe do something else, but panic.
 			panic(err)
 		}
 	} else {
-		s.canchan <- oi
-		<-o.errnoc
+		s.req <- ioSrvReq{o, nil}
+		<-o.errc
 	}
 	// Wait for cancellation to complete.
-	o.fd.pd.WaitCanceled(int(o.mode))
+	fd.pd.WaitCanceled(int(o.mode))
 	if o.errno != 0 {
 		err = syscall.Errno(o.errno)
 		if err == syscall.ERROR_OPERATION_ABORTED { // IO Canceled
 			err = netpollErr
 		}
-		return 0, &OpError{oi.Name(), o.fd.net, o.fd.laddr, err}
+		return 0, &OpError{name, fd.net, fd.laddr, err}
 	}
 	// We issued cancellation request. But, it seems, IO operation succeeded
 	// before cancellation request run. We need to treat IO operation as
@@ -238,8 +209,7 @@
 	if !canCancelIO {
 		// Only CancelIo API is available. Lets start special goroutine
 		// locked to an OS thread, that both starts and cancels IO.
-		iosrv.submchan = make(chan anOpIface)
-		iosrv.canchan = make(chan anOpIface)
+		iosrv.req = make(chan ioSrvReq)
 		go iosrv.ProcessRemoteIO()
 	}
 }
@@ -259,30 +229,39 @@
 	net         string
 	laddr       Addr
 	raddr       Addr
-	errnoc      [2]chan error // read/write submit or cancel operation errors
 
-	// serialize access to Read and Write methods
-	rio, wio sync.Mutex
+	rop operation // read operation
+	wop operation // write operation
 
 	// wait server
 	pd pollDesc
 }
 
-func newFD(fd syscall.Handle, family, sotype int, net string) (*netFD, error) {
+func newFD(sysfd syscall.Handle, family, sotype int, net string) (*netFD, error) {
 	if initErr != nil {
 		return nil, initErr
 	}
 	onceStartServer.Do(startServer)
-	netfd := &netFD{
-		sysfd:  fd,
+	fd := &netFD{
+		sysfd:  sysfd,
 		family: family,
 		sotype: sotype,
 		net:    net,
 	}
-	if err := netfd.pd.Init(netfd); err != nil {
+	if err := fd.pd.Init(fd); err != nil {
 		return nil, err
 	}
-	return netfd, nil
+	fd.rop.mode = 'r'
+	fd.wop.mode = 'w'
+	fd.rop.fd = fd
+	fd.wop.fd = fd
+	fd.rop.runtimeCtx = fd.pd.runtimeCtx
+	fd.wop.runtimeCtx = fd.pd.runtimeCtx
+	if !canCancelIO {
+		fd.rop.errc = make(chan error)
+		fd.rop.errc = make(chan error)
+	}
+	return fd, nil
 }
 
 func (fd *netFD) setAddr(laddr, raddr Addr) {
@@ -291,21 +270,6 @@
 	runtime.SetFinalizer(fd, (*netFD).Close)
 }
 
-// Make new connection.
-
-type connectOp struct {
-	anOp
-	ra syscall.Sockaddr
-}
-
-func (o *connectOp) Submit() error {
-	return syscall.ConnectEx(o.fd.sysfd, o.ra, nil, 0, nil, &o.o)
-}
-
-func (o *connectOp) Name() string {
-	return "ConnectEx"
-}
-
 func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
 	if !canUseConnectEx(fd.net) {
 		return syscall.Connect(fd.sysfd, ra)
@@ -325,10 +289,13 @@
 		}
 	}
 	// Call ConnectEx API.
-	var o connectOp
-	o.Init(fd, 'w')
-	o.ra = ra
-	_, err := iosrv.ExecIO(&o)
+	o := &fd.wop
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	o.sa = ra
+	_, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error {
+		return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o)
+	})
 	if err != nil {
 		return err
 	}
@@ -385,10 +352,10 @@
 	// unblock pending reader and writer
 	fd.pd.Evict()
 	// wait for both reader and writer to exit
-	fd.rio.Lock()
-	defer fd.rio.Unlock()
-	fd.wio.Lock()
-	defer fd.wio.Unlock()
+	fd.rop.mu.Lock()
+	fd.wop.mu.Lock()
+	fd.rop.mu.Unlock()
+	fd.wop.mu.Unlock()
 	return nil
 }
 
@@ -412,54 +379,24 @@
 	return fd.shutdown(syscall.SHUT_WR)
 }
 
-// Read from network.
-
-type readOp struct {
-	bufOp
-}
-
-func (o *readOp) Submit() error {
-	var d, f uint32
-	return syscall.WSARecv(syscall.Handle(o.fd.sysfd), &o.buf, 1, &d, &f, &o.o, nil)
-}
-
-func (o *readOp) Name() string {
-	return "WSARecv"
-}
-
 func (fd *netFD) Read(buf []byte) (int, error) {
 	if err := fd.incref(false); err != nil {
 		return 0, err
 	}
 	defer fd.decref()
-	fd.rio.Lock()
-	defer fd.rio.Unlock()
-	var o readOp
-	o.Init(fd, buf, 'r')
-	n, err := iosrv.ExecIO(&o)
+	o := &fd.rop
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	o.InitBuf(buf)
+	n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error {
+		return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
+	})
 	if err == nil && n == 0 {
 		err = io.EOF
 	}
 	return n, err
 }
 
-// ReadFrom from network.
-
-type readFromOp struct {
-	bufOp
-	rsa  syscall.RawSockaddrAny
-	rsan int32
-}
-
-func (o *readFromOp) Submit() error {
-	var d, f uint32
-	return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &d, &f, &o.rsa, &o.rsan, &o.o, nil)
-}
-
-func (o *readFromOp) Name() string {
-	return "WSARecvFrom"
-}
-
 func (fd *netFD) ReadFrom(buf []byte) (n int, sa syscall.Sockaddr, err error) {
 	if len(buf) == 0 {
 		return 0, nil, nil
@@ -468,12 +405,17 @@
 		return 0, nil, err
 	}
 	defer fd.decref()
-	fd.rio.Lock()
-	defer fd.rio.Unlock()
-	var o readFromOp
-	o.Init(fd, buf, 'r')
-	o.rsan = int32(unsafe.Sizeof(o.rsa))
-	n, err = iosrv.ExecIO(&o)
+	o := &fd.rop
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	o.InitBuf(buf)
+	n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error {
+		if o.rsa == nil {
+			o.rsa = new(syscall.RawSockaddrAny)
+		}
+		o.rsan = int32(unsafe.Sizeof(*o.rsa))
+		return syscall.WSARecvFrom(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, o.rsa, &o.rsan, &o.o, nil)
+	})
 	if err != nil {
 		return 0, nil, err
 	}
@@ -481,47 +423,18 @@
 	return
 }
 
-// Write to network.
-
-type writeOp struct {
-	bufOp
-}
-
-func (o *writeOp) Submit() error {
-	var d uint32
-	return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &d, 0, &o.o, nil)
-}
-
-func (o *writeOp) Name() string {
-	return "WSASend"
-}
-
 func (fd *netFD) Write(buf []byte) (int, error) {
 	if err := fd.incref(false); err != nil {
 		return 0, err
 	}
 	defer fd.decref()
-	fd.wio.Lock()
-	defer fd.wio.Unlock()
-	var o writeOp
-	o.Init(fd, buf, 'w')
-	return iosrv.ExecIO(&o)
-}
-
-// WriteTo to network.
-
-type writeToOp struct {
-	bufOp
-	sa syscall.Sockaddr
-}
-
-func (o *writeToOp) Submit() error {
-	var d uint32
-	return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &d, 0, o.sa, &o.o, nil)
-}
-
-func (o *writeToOp) Name() string {
-	return "WSASendto"
+	o := &fd.wop
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	o.InitBuf(buf)
+	return iosrv.ExecIO(o, "WSASend", func(o *operation) error {
+		return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
+	})
 }
 
 func (fd *netFD) WriteTo(buf []byte, sa syscall.Sockaddr) (int, error) {
@@ -532,31 +445,14 @@
 		return 0, err
 	}
 	defer fd.decref()
-	fd.wio.Lock()
-	defer fd.wio.Unlock()
-	var o writeToOp
-	o.Init(fd, buf, 'w')
+	o := &fd.wop
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	o.InitBuf(buf)
 	o.sa = sa
-	return iosrv.ExecIO(&o)
-}
-
-// Accept new network connections.
-
-type acceptOp struct {
-	anOp
-	newsock syscall.Handle
-	attrs   [2]syscall.RawSockaddrAny // space for local and remote address only
-}
-
-func (o *acceptOp) Submit() error {
-	var d uint32
-	l := uint32(unsafe.Sizeof(o.attrs[0]))
-	return syscall.AcceptEx(o.fd.sysfd, o.newsock,
-		(*byte)(unsafe.Pointer(&o.attrs[0])), 0, l, l, &d, &o.o)
-}
-
-func (o *acceptOp) Name() string {
-	return "AcceptEx"
+	return iosrv.ExecIO(o, "WSASendto", func(o *operation) error {
+		return syscall.WSASendto(o.fd.sysfd, &o.buf, 1, &o.qty, 0, o.sa, &o.o, nil)
+	})
 }
 
 func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
@@ -579,12 +475,15 @@
 	}
 
 	// Submit accept request.
-	fd.rio.Lock()
-	defer fd.rio.Unlock()
-	var o acceptOp
-	o.Init(fd, 'r')
-	o.newsock = s
-	_, err = iosrv.ExecIO(&o)
+	o := &fd.rop
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	o.handle = s
+	var rawsa [2]syscall.RawSockaddrAny
+	o.rsan = int32(unsafe.Sizeof(rawsa[0]))
+	_, err = iosrv.ExecIO(o, "AcceptEx", func(o *operation) error {
+		return syscall.AcceptEx(o.fd.sysfd, o.handle, (*byte)(unsafe.Pointer(&rawsa[0])), 0, uint32(o.rsan), uint32(o.rsan), &o.qty, &o.o)
+	})
 	if err != nil {
 		netfd.Close()
 		return nil, err
@@ -600,9 +499,8 @@
 	// Get local and peer addr out of AcceptEx buffer.
 	var lrsa, rrsa *syscall.RawSockaddrAny
 	var llen, rlen int32
-	l := uint32(unsafe.Sizeof(*lrsa))
-	syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&o.attrs[0])),
-		0, l, l, &lrsa, &llen, &rrsa, &rlen)
+	syscall.GetAcceptExSockaddrs((*byte)(unsafe.Pointer(&rawsa[0])),
+		0, uint32(o.rsan), uint32(o.rsan), &lrsa, &llen, &rrsa, &rlen)
 	lsa, _ := lrsa.Sockaddr()
 	rsa, _ := rrsa.Sockaddr()
 
diff --git a/src/pkg/net/sendfile_windows.go b/src/pkg/net/sendfile_windows.go
index 5012583..e9b9f91 100644
--- a/src/pkg/net/sendfile_windows.go
+++ b/src/pkg/net/sendfile_windows.go
@@ -10,20 +10,6 @@
 	"syscall"
 )
 
-type sendfileOp struct {
-	anOp
-	src syscall.Handle // source
-	n   uint32
-}
-
-func (o *sendfileOp) Submit() (err error) {
-	return syscall.TransmitFile(o.fd.sysfd, o.src, o.n, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
-}
-
-func (o *sendfileOp) Name() string {
-	return "TransmitFile"
-}
-
 // sendFile copies the contents of r to c using the TransmitFile
 // system call to minimize copies.
 //
@@ -33,7 +19,7 @@
 // if handled == false, sendFile performed no work.
 //
 // Note that sendfile for windows does not suppport >2GB file.
-func sendFile(c *netFD, r io.Reader) (written int64, err error, handled bool) {
+func sendFile(fd *netFD, r io.Reader) (written int64, err error, handled bool) {
 	var n int64 = 0 // by default, copy until EOF
 
 	lr, ok := r.(*io.LimitedReader)
@@ -48,18 +34,18 @@
 		return 0, nil, false
 	}
 
-	if err := c.incref(false); err != nil {
+	if err := fd.incref(false); err != nil {
 		return 0, err, true
 	}
-	defer c.decref()
-	c.wio.Lock()
-	defer c.wio.Unlock()
-
-	var o sendfileOp
-	o.Init(c, 'w')
-	o.n = uint32(n)
-	o.src = syscall.Handle(f.Fd())
-	done, err := iosrv.ExecIO(&o)
+	defer fd.decref()
+	o := &fd.wop
+	o.mu.Lock()
+	defer o.mu.Unlock()
+	o.qty = uint32(n)
+	o.handle = syscall.Handle(f.Fd())
+	done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error {
+		return syscall.TransmitFile(o.fd.sysfd, o.handle, o.qty, 0, &o.o, nil, syscall.TF_WRITE_BEHIND)
+	})
 	if err != nil {
 		return 0, err, false
 	}
diff --git a/src/pkg/net/tcp_test.go b/src/pkg/net/tcp_test.go
index f356f92..dedd41d 100644
--- a/src/pkg/net/tcp_test.go
+++ b/src/pkg/net/tcp_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"io"
 	"reflect"
 	"runtime"
 	"sync"
@@ -327,3 +328,46 @@
 	ln.Close()
 	wg.Wait()
 }
+
+func TestTCPReadWriteMallocs(t *testing.T) {
+	maxMallocs := 0
+	switch runtime.GOOS {
+	// Add other OSes if you know how many mallocs they do.
+	case "windows":
+		maxMallocs = 0
+	}
+	ln, err := Listen("tcp", "127.0.0.1:0")
+	if err != nil {
+		t.Fatalf("Listen failed: %v", err)
+	}
+	defer ln.Close()
+	var server Conn
+	errc := make(chan error)
+	go func() {
+		var err error
+		server, err = ln.Accept()
+		errc <- err
+	}()
+	client, err := Dial("tcp", ln.Addr().String())
+	if err != nil {
+		t.Fatalf("Dial failed: %v", err)
+	}
+	if err := <-errc; err != nil {
+		t.Fatalf("Accept failed: %v", err)
+	}
+	defer server.Close()
+	var buf [128]byte
+	mallocs := testing.AllocsPerRun(1000, func() {
+		_, err := server.Write(buf[:])
+		if err != nil {
+			t.Fatalf("Write failed: %v", err)
+		}
+		_, err = io.ReadFull(client, buf[:])
+		if err != nil {
+			t.Fatalf("Read failed: %v", err)
+		}
+	})
+	if int(mallocs) > maxMallocs {
+		t.Fatalf("Got %v allocs, want %v", mallocs, maxMallocs)
+	}
+}