internal/socket: reuse closure in Recv/SendMmsg

The closure for the callback to RawConn.Read/Write is responsible for
multiple allocations per call to RecvMmsg and SendMmsg.
The batched read and write are used primarily to avoid per-call
overhead, so any such overhead negates the advantage of using these
functions.

This change introduces a struct type holding all the variables
captured by the closure passed to RawConn.Read/Write. The struct is
reused to amortize the allocations by means of a sync.Pool.
A suitable global sync.Pool instance already existed, for buffers used
to pack mmsg headers.

This change allows to reuse all allocations in WriteBatch. In ReadBatch,
only the returned net.Addr instances still need to be allocated for each
message, which cannot be avoided without fundamental changes to the
package interface.

```
name             old time/op    new time/op    delta
UDP/Batch-1-8      5.34µs ± 1%    5.40µs ± 3%     ~     (p=0.173 n=8+10)
UDP/Batch-2-8      9.74µs ± 1%    9.24µs ± 9%   -5.21%  (p=0.035 n=9+10)
UDP/Batch-4-8      16.2µs ± 4%    16.2µs ± 1%     ~     (p=0.758 n=9+7)
UDP/Batch-8-8      30.0µs ± 4%    30.0µs ± 4%     ~     (p=0.971 n=10+10)
UDP/Batch-16-8     57.3µs ± 3%    60.9µs ±16%   +6.43%  (p=0.031 n=9+9)
UDP/Batch-32-8      115µs ± 5%     119µs ± 6%   +3.15%  (p=0.043 n=10+10)
UDP/Batch-64-8      234µs ±16%     237µs ± 4%     ~     (p=0.173 n=10+8)
UDP/Batch-128-8     447µs ± 4%     470µs ± 7%   +5.22%  (p=0.002 n=10+10)
UDP/Batch-256-8     960µs ±10%     966µs ±19%     ~     (p=0.853 n=10+10)
UDP/Batch-512-8    1.00ms ± 7%    0.99ms ± 7%     ~     (p=0.387 n=9+9)

name             old alloc/op   new alloc/op   delta
UDP/Batch-1-8        232B ± 0%       52B ± 0%  -77.59%  (p=0.000 n=10+10)
UDP/Batch-2-8        280B ± 0%      104B ± 0%  -62.86%  (p=0.000 n=10+10)
UDP/Batch-4-8        384B ± 0%      208B ± 0%  -45.83%  (p=0.000 n=10+10)
UDP/Batch-8-8        592B ± 0%      416B ± 0%  -29.73%  (p=0.000 n=10+10)
UDP/Batch-16-8     1.01kB ± 0%    0.83kB ± 0%  -17.46%  (p=0.000 n=10+10)
UDP/Batch-32-8     1.84kB ± 0%    1.66kB ± 0%   -9.57%  (p=0.002 n=8+10)
UDP/Batch-64-8     3.51kB ± 0%    3.33kB ± 0%   -5.00%  (p=0.000 n=10+8)
UDP/Batch-128-8    6.84kB ± 0%    6.66kB ± 0%   -2.57%  (p=0.001 n=7+7)
UDP/Batch-256-8    13.5kB ± 0%    13.3kB ± 0%   -1.33%  (p=0.000 n=10+10)
UDP/Batch-512-8    14.7kB ± 0%    14.5kB ± 0%   -1.19%  (p=0.000 n=8+8)

name             old allocs/op  new allocs/op  delta
UDP/Batch-1-8        8.00 ± 0%      2.00 ± 0%  -75.00%  (p=0.000 n=10+10)
UDP/Batch-2-8        10.0 ± 0%       4.0 ± 0%  -60.00%  (p=0.000 n=10+10)
UDP/Batch-4-8        14.0 ± 0%       8.0 ± 0%  -42.86%  (p=0.000 n=10+10)
UDP/Batch-8-8        22.0 ± 0%      16.0 ± 0%  -27.27%  (p=0.000 n=10+10)
UDP/Batch-16-8       38.0 ± 0%      32.0 ± 0%  -15.79%  (p=0.000 n=10+10)
UDP/Batch-32-8       70.0 ± 0%      64.0 ± 0%   -8.57%  (p=0.000 n=10+10)
UDP/Batch-64-8        134 ± 0%       128 ± 0%   -4.48%  (p=0.000 n=10+10)
UDP/Batch-128-8       262 ± 0%       256 ± 0%   -2.29%  (p=0.000 n=10+10)
UDP/Batch-256-8       518 ± 0%       512 ± 0%   -1.16%  (p=0.000 n=10+10)
UDP/Batch-512-8       562 ± 0%       556 ± 0%   -1.07%  (p=0.000 n=10+10)
```

Contributes to golang/go#26838

Change-Id: I16ecfc38dbb5a4d9b1ceacd1dd99fda38f346807
GitHub-Last-Rev: d1dda931f61bd08cab782fa50406574d5e227154
GitHub-Pull-Request: golang/net#126
Reviewed-on: https://go-review.googlesource.com/c/net/+/380934
Run-TryBot: Ian Lance Taylor <iant@golang.org>
TryBot-Result: Gopher Robot <gobot@golang.org>
Reviewed-by: Ian Lance Taylor <iant@golang.org>
Trust: Michael Knyszek <mknyszek@google.com>
diff --git a/internal/socket/mmsghdr_unix.go b/internal/socket/mmsghdr_unix.go
index 40ebeda..0bfcf7a 100644
--- a/internal/socket/mmsghdr_unix.go
+++ b/internal/socket/mmsghdr_unix.go
@@ -9,7 +9,9 @@
 
 import (
 	"net"
+	"os"
 	"sync"
+	"syscall"
 )
 
 type mmsghdrs []mmsghdr
@@ -93,22 +95,86 @@
 	return hs
 }
 
-var defaultMmsghdrsPool = mmsghdrsPool{
+// syscaller is a helper to invoke recvmmsg and sendmmsg via the RawConn.Read/Write interface.
+// It is reusable, to amortize the overhead of allocating a closure for the function passed to
+// RawConn.Read/Write.
+type syscaller struct {
+	n     int
+	operr error
+	hs    mmsghdrs
+	flags int
+
+	boundRecvmmsgF func(uintptr) bool
+	boundSendmmsgF func(uintptr) bool
+}
+
+func (r *syscaller) init() {
+	r.boundRecvmmsgF = r.recvmmsgF
+	r.boundSendmmsgF = r.sendmmsgF
+}
+
+func (r *syscaller) recvmmsg(c syscall.RawConn, hs mmsghdrs, flags int) (int, error) {
+	r.n = 0
+	r.operr = nil
+	r.hs = hs
+	r.flags = flags
+	if err := c.Read(r.boundRecvmmsgF); err != nil {
+		return r.n, err
+	}
+	if r.operr != nil {
+		return r.n, os.NewSyscallError("recvmmsg", r.operr)
+	}
+	return r.n, nil
+}
+
+func (r *syscaller) recvmmsgF(s uintptr) bool {
+	r.n, r.operr = recvmmsg(s, r.hs, r.flags)
+	return ioComplete(r.flags, r.operr)
+}
+
+func (r *syscaller) sendmmsg(c syscall.RawConn, hs mmsghdrs, flags int) (int, error) {
+	r.n = 0
+	r.operr = nil
+	r.hs = hs
+	r.flags = flags
+	if err := c.Write(r.boundSendmmsgF); err != nil {
+		return r.n, err
+	}
+	if r.operr != nil {
+		return r.n, os.NewSyscallError("sendmmsg", r.operr)
+	}
+	return r.n, nil
+}
+
+func (r *syscaller) sendmmsgF(s uintptr) bool {
+	r.n, r.operr = sendmmsg(s, r.hs, r.flags)
+	return ioComplete(r.flags, r.operr)
+}
+
+// mmsgTmps holds reusable temporary helpers for recvmmsg and sendmmsg.
+type mmsgTmps struct {
+	packer    mmsghdrsPacker
+	syscaller syscaller
+}
+
+var defaultMmsgTmpsPool = mmsgTmpsPool{
 	p: sync.Pool{
 		New: func() interface{} {
-			return new(mmsghdrsPacker)
+			tmps := new(mmsgTmps)
+			tmps.syscaller.init()
+			return tmps
 		},
 	},
 }
 
-type mmsghdrsPool struct {
+type mmsgTmpsPool struct {
 	p sync.Pool
 }
 
-func (p *mmsghdrsPool) Get() *mmsghdrsPacker {
-	return p.p.Get().(*mmsghdrsPacker)
+func (p *mmsgTmpsPool) Get() *mmsgTmps {
+	return p.p.Get().(*mmsgTmps)
 }
 
-func (p *mmsghdrsPool) Put(packer *mmsghdrsPacker) {
-	p.p.Put(packer)
+func (p *mmsgTmpsPool) Put(tmps *mmsgTmps) {
+	p.p.Put(tmps)
 }
diff --git a/internal/socket/rawconn_mmsg.go b/internal/socket/rawconn_mmsg.go
index 3fcb51b..8f79b38 100644
--- a/internal/socket/rawconn_mmsg.go
+++ b/internal/socket/rawconn_mmsg.go
@@ -9,32 +9,23 @@
 
 import (
 	"net"
-	"os"
 )
 
 func (c *Conn) recvMsgs(ms []Message, flags int) (int, error) {
 	for i := range ms {
 		ms[i].raceWrite()
 	}
-	packer := defaultMmsghdrsPool.Get()
-	defer defaultMmsghdrsPool.Put(packer)
+	tmps := defaultMmsgTmpsPool.Get()
+	defer defaultMmsgTmpsPool.Put(tmps)
 	var parseFn func([]byte, string) (net.Addr, error)
 	if c.network != "tcp" {
 		parseFn = parseInetAddr
 	}
-	hs := packer.pack(ms, parseFn, nil)
-	var operr error
-	var n int
-	fn := func(s uintptr) bool {
-		n, operr = recvmmsg(s, hs, flags)
-		return ioComplete(flags, operr)
-	}
-	if err := c.c.Read(fn); err != nil {
+	hs := tmps.packer.pack(ms, parseFn, nil)
+	n, err := tmps.syscaller.recvmmsg(c.c, hs, flags)
+	if err != nil {
 		return n, err
 	}
-	if operr != nil {
-		return n, os.NewSyscallError("recvmmsg", operr)
-	}
 	if err := hs[:n].unpack(ms[:n], parseFn, c.network); err != nil {
 		return n, err
 	}
@@ -45,25 +36,17 @@
 	for i := range ms {
 		ms[i].raceRead()
 	}
-	packer := defaultMmsghdrsPool.Get()
-	defer defaultMmsghdrsPool.Put(packer)
+	tmps := defaultMmsgTmpsPool.Get()
+	defer defaultMmsgTmpsPool.Put(tmps)
 	var marshalFn func(net.Addr, []byte) int
 	if c.network != "tcp" {
 		marshalFn = marshalInetAddr
 	}
-	hs := packer.pack(ms, nil, marshalFn)
-	var operr error
-	var n int
-	fn := func(s uintptr) bool {
-		n, operr = sendmmsg(s, hs, flags)
-		return ioComplete(flags, operr)
-	}
-	if err := c.c.Write(fn); err != nil {
+	hs := tmps.packer.pack(ms, nil, marshalFn)
+	n, err := tmps.syscaller.sendmmsg(c.c, hs, flags)
+	if err != nil {
 		return n, err
 	}
-	if operr != nil {
-		return n, os.NewSyscallError("sendmmsg", operr)
-	}
 	if err := hs[:n].unpack(ms[:n], nil, ""); err != nil {
 		return n, err
 	}