net: refactoring in preparation for integrated network poller
Introduce pollDesc struct, to split netFD struct into fd-related
and poller-related parts.
R=golang-dev, bradfitz, iant
CC=golang-dev
https://golang.org/cl/7762044
diff --git a/src/pkg/net/fd_poll_unix.go b/src/pkg/net/fd_poll_unix.go
index 93b64bb..7f7f764 100644
--- a/src/pkg/net/fd_poll_unix.go
+++ b/src/pkg/net/fd_poll_unix.go
@@ -29,10 +29,25 @@
pr, pw *os.File
poll *pollster // low-level OS hooks
sync.Mutex // controls pending and deadline
- pending map[int]*netFD
+ pending map[int]*pollDesc
deadline int64 // next deadline (nsec since 1970)
}
+// A pollDesc contains netFD state related to pollServer.
+type pollDesc struct {
+ // immutable after Init()
+ pollServer *pollServer
+ sysfd int
+ cr, cw chan error
+
+ // mutable, protected by pollServer mutex
+ closing bool
+ ncr, ncw int
+
+ // mutable, safe for concurrent access
+ rdeadline, wdeadline deadline
+}
+
func newPollServer() (s *pollServer, err error) {
s = new(pollServer)
if s.pr, s.pw, err = os.Pipe(); err != nil {
@@ -51,7 +66,7 @@
s.poll.Close()
goto Error
}
- s.pending = make(map[int]*netFD)
+ s.pending = make(map[int]*pollDesc)
go s.Run()
return s, nil
@@ -67,10 +82,10 @@
return nil, err
}
-func (s *pollServer) AddFD(fd *netFD, mode int) error {
+func (s *pollServer) AddFD(pd *pollDesc, mode int) error {
s.Lock()
- intfd := fd.sysfd
- if intfd < 0 || fd.closing {
+ intfd := pd.sysfd
+ if intfd < 0 || pd.closing {
// fd closed underfoot
s.Unlock()
return errClosing
@@ -79,14 +94,14 @@
var t int64
key := intfd << 1
if mode == 'r' {
- fd.ncr++
- t = fd.rdeadline.value()
+ pd.ncr++
+ t = pd.rdeadline.value()
} else {
- fd.ncw++
+ pd.ncw++
key++
- t = fd.wdeadline.value()
+ t = pd.wdeadline.value()
}
- s.pending[key] = fd
+ s.pending[key] = pd
doWakeup := false
if t > 0 && (s.deadline == 0 || t < s.deadline) {
s.deadline = t
@@ -96,7 +111,7 @@
wake, err := s.poll.AddFD(intfd, mode, false)
s.Unlock()
if err != nil {
- return &OpError{"addfd", fd.net, fd.laddr, err}
+ return err
}
if wake || doWakeup {
s.Wakeup()
@@ -104,25 +119,26 @@
return nil
}
-// Evict evicts fd from the pending list, unblocking
-// any I/O running on fd. The caller must have locked
+// Evict evicts pd from the pending list, unblocking
+// any I/O running on pd. The caller must have locked
// pollserver.
// Return value is whether the pollServer should be woken up.
-func (s *pollServer) Evict(fd *netFD) bool {
+func (s *pollServer) Evict(pd *pollDesc) bool {
+ pd.closing = true
doWakeup := false
- if s.pending[fd.sysfd<<1] == fd {
- s.WakeFD(fd, 'r', errClosing)
- if s.poll.DelFD(fd.sysfd, 'r') {
+ if s.pending[pd.sysfd<<1] == pd {
+ s.WakeFD(pd, 'r', errClosing)
+ if s.poll.DelFD(pd.sysfd, 'r') {
doWakeup = true
}
- delete(s.pending, fd.sysfd<<1)
+ delete(s.pending, pd.sysfd<<1)
}
- if s.pending[fd.sysfd<<1|1] == fd {
- s.WakeFD(fd, 'w', errClosing)
- if s.poll.DelFD(fd.sysfd, 'w') {
+ if s.pending[pd.sysfd<<1|1] == pd {
+ s.WakeFD(pd, 'w', errClosing)
+ if s.poll.DelFD(pd.sysfd, 'w') {
doWakeup = true
}
- delete(s.pending, fd.sysfd<<1|1)
+ delete(s.pending, pd.sysfd<<1|1)
}
return doWakeup
}
@@ -131,7 +147,7 @@
func (s *pollServer) Wakeup() { s.pw.Write(wakeupbuf[0:]) }
-func (s *pollServer) LookupFD(fd int, mode int) *netFD {
+func (s *pollServer) LookupFD(fd int, mode int) *pollDesc {
key := fd << 1
if mode == 'w' {
key++
@@ -144,16 +160,16 @@
return netfd
}
-func (s *pollServer) WakeFD(fd *netFD, mode int, err error) {
+func (s *pollServer) WakeFD(pd *pollDesc, mode int, err error) {
if mode == 'r' {
- for fd.ncr > 0 {
- fd.ncr--
- fd.cr <- err
+ for pd.ncr > 0 {
+ pd.ncr--
+ pd.cr <- err
}
} else {
- for fd.ncw > 0 {
- fd.ncw--
- fd.cw <- err
+ for pd.ncw > 0 {
+ pd.ncw--
+ pd.cw <- err
}
}
}
@@ -164,7 +180,7 @@
// probably with a heap indexed by wakeup time.
var nextDeadline int64
- for key, fd := range s.pending {
+ for key, pd := range s.pending {
var t int64
var mode int
if key&1 == 0 {
@@ -173,15 +189,15 @@
mode = 'w'
}
if mode == 'r' {
- t = fd.rdeadline.value()
+ t = pd.rdeadline.value()
} else {
- t = fd.wdeadline.value()
+ t = pd.wdeadline.value()
}
if t > 0 {
if t <= now {
delete(s.pending, key)
- s.poll.DelFD(fd.sysfd, mode)
- s.WakeFD(fd, mode, errTimeout)
+ s.poll.DelFD(pd.sysfd, mode)
+ s.WakeFD(pd, mode, errTimeout)
} else if nextDeadline == 0 || t < nextDeadline {
nextDeadline = t
}
@@ -220,48 +236,67 @@
s.pr.Read(scratch[0:])
s.CheckDeadlines()
} else {
- netfd := s.LookupFD(fd, mode)
- if netfd == nil {
+ pd := s.LookupFD(fd, mode)
+ if pd == nil {
// This can happen because the WaitFD runs without
// holding s's lock, so there might be a pending wakeup
// for an fd that has been evicted. No harm done.
continue
}
- s.WakeFD(netfd, mode, nil)
+ s.WakeFD(pd, mode, nil)
}
}
}
-func (s *pollServer) PrepareRead(fd *netFD) error {
- if fd.rdeadline.expired() {
+func (pd *pollDesc) Close() {
+}
+
+func (pd *pollDesc) Lock() {
+ pd.pollServer.Lock()
+}
+
+func (pd *pollDesc) Unlock() {
+ pd.pollServer.Unlock()
+}
+
+func (pd *pollDesc) Wakeup() {
+ pd.pollServer.Wakeup()
+}
+
+func (pd *pollDesc) PrepareRead() error {
+ if pd.rdeadline.expired() {
return errTimeout
}
return nil
}
-func (s *pollServer) PrepareWrite(fd *netFD) error {
- if fd.wdeadline.expired() {
+func (pd *pollDesc) PrepareWrite() error {
+ if pd.wdeadline.expired() {
return errTimeout
}
return nil
}
-func (s *pollServer) WaitRead(fd *netFD) error {
- err := s.AddFD(fd, 'r')
+func (pd *pollDesc) WaitRead() error {
+ err := pd.pollServer.AddFD(pd, 'r')
if err == nil {
- err = <-fd.cr
+ err = <-pd.cr
}
return err
}
-func (s *pollServer) WaitWrite(fd *netFD) error {
- err := s.AddFD(fd, 'w')
+func (pd *pollDesc) WaitWrite() error {
+ err := pd.pollServer.AddFD(pd, 'w')
if err == nil {
- err = <-fd.cw
+ err = <-pd.cw
}
return err
}
+func (pd *pollDesc) Evict() bool {
+ return pd.pollServer.Evict(pd)
+}
+
// Spread network FDs over several pollServers.
var pollMaxN int
@@ -292,31 +327,29 @@
pollservers[k] = p
}
-func pollServerInit(fd *netFD) error {
+func (pd *pollDesc) Init(fd *netFD) error {
pollN := runtime.GOMAXPROCS(0)
if pollN > pollMaxN {
pollN = pollMaxN
}
k := fd.sysfd % pollN
startServersOnce[k]()
- fd.pollServer = pollservers[k]
- fd.cr = make(chan error, 1)
- fd.cw = make(chan error, 1)
+ pd.sysfd = fd.sysfd
+ pd.pollServer = pollservers[k]
+ pd.cr = make(chan error, 1)
+ pd.cw = make(chan error, 1)
return nil
}
-func (s *pollServer) Close(fd *netFD) {
-}
-
// TODO(dfc) these unused error returns could be removed
func setReadDeadline(fd *netFD, t time.Time) error {
- fd.rdeadline.setTime(t)
+ fd.pd.rdeadline.setTime(t)
return nil
}
func setWriteDeadline(fd *netFD, t time.Time) error {
- fd.wdeadline.setTime(t)
+ fd.pd.wdeadline.setTime(t)
return nil
}
diff --git a/src/pkg/net/fd_unix.go b/src/pkg/net/fd_unix.go
index 51269d8..5621927d 100644
--- a/src/pkg/net/fd_unix.go
+++ b/src/pkg/net/fd_unix.go
@@ -20,7 +20,7 @@
sysmu sync.Mutex
sysref int
- // must lock both sysmu and pollserver to write
+ // must lock both sysmu and pollDesc to write
// can lock either to read
closing bool
@@ -30,8 +30,6 @@
sotype int
isConnected bool
sysfile *os.File
- cr chan error
- cw chan error
net string
laddr Addr
raddr Addr
@@ -39,14 +37,8 @@
// serialize access to Read and Write methods
rio, wio sync.Mutex
- // read and write deadlines
- rdeadline, wdeadline deadline
-
- // owned by fd wait server
- ncr, ncw int
-
// wait server
- pollServer *pollServer
+ pd pollDesc
}
func dialTimeout(net, addr string, timeout time.Duration) (Conn, error) {
@@ -65,7 +57,7 @@
sotype: sotype,
net: net,
}
- if err := pollServerInit(netfd); err != nil {
+ if err := netfd.pd.Init(netfd); err != nil {
return nil, err
}
return netfd, nil
@@ -91,12 +83,12 @@
func (fd *netFD) connect(ra syscall.Sockaddr) error {
fd.wio.Lock()
defer fd.wio.Unlock()
- if err := fd.pollServer.PrepareWrite(fd); err != nil {
+ if err := fd.pd.PrepareWrite(); err != nil {
return err
}
err := syscall.Connect(fd.sysfd, ra)
if err == syscall.EINPROGRESS {
- if err = fd.pollServer.WaitWrite(fd); err != nil {
+ if err = fd.pd.WaitWrite(); err != nil {
return err
}
var e int
@@ -112,7 +104,7 @@
}
// Add a reference to this fd.
-// If closing==true, pollserver must be locked; mark the fd as closing.
+// If closing==true, pollDesc must be locked; mark the fd as closing.
// Returns an error if the fd cannot be used.
func (fd *netFD) incref(closing bool) error {
fd.sysmu.Lock()
@@ -135,7 +127,7 @@
fd.sysref--
if fd.closing && fd.sysref == 0 && fd.sysfile != nil {
fd.sysfile.Close()
- fd.pollServer.Close(fd)
+ fd.pd.Close()
fd.sysfile = nil
fd.sysfd = -1
}
@@ -143,21 +135,21 @@
}
func (fd *netFD) Close() error {
- fd.pollServer.Lock() // needed for both fd.incref(true) and pollserver.Evict
+ fd.pd.Lock() // needed for both fd.incref(true) and pollDesc.Evict
if err := fd.incref(true); err != nil {
- fd.pollServer.Unlock()
+ fd.pd.Unlock()
return err
}
// Unblock any I/O. Once it all unblocks and returns,
// so that it cannot be referring to fd.sysfd anymore,
// the final decref will close fd.sysfd. This should happen
// fairly quickly, since all the I/O is non-blocking, and any
- // attempts to block in the pollserver will return errClosing.
- doWakeup := fd.pollServer.Evict(fd)
- fd.pollServer.Unlock()
+ // attempts to block in the pollDesc will return errClosing.
+ doWakeup := fd.pd.Evict()
+ fd.pd.Unlock()
fd.decref()
if doWakeup {
- fd.pollServer.Wakeup()
+ fd.pd.Wakeup()
}
return nil
}
@@ -189,7 +181,7 @@
return 0, err
}
defer fd.decref()
- if err := fd.pollServer.PrepareRead(fd); err != nil {
+ if err := fd.pd.PrepareRead(); err != nil {
return 0, &OpError{"read", fd.net, fd.raddr, err}
}
for {
@@ -197,7 +189,7 @@
if err != nil {
n = 0
if err == syscall.EAGAIN {
- if err = fd.pollServer.WaitRead(fd); err == nil {
+ if err = fd.pd.WaitRead(); err == nil {
continue
}
}
@@ -218,7 +210,7 @@
return 0, nil, err
}
defer fd.decref()
- if err := fd.pollServer.PrepareRead(fd); err != nil {
+ if err := fd.pd.PrepareRead(); err != nil {
return 0, nil, &OpError{"read", fd.net, fd.laddr, err}
}
for {
@@ -226,7 +218,7 @@
if err != nil {
n = 0
if err == syscall.EAGAIN {
- if err = fd.pollServer.WaitRead(fd); err == nil {
+ if err = fd.pd.WaitRead(); err == nil {
continue
}
}
@@ -247,7 +239,7 @@
return 0, 0, 0, nil, err
}
defer fd.decref()
- if err := fd.pollServer.PrepareRead(fd); err != nil {
+ if err := fd.pd.PrepareRead(); err != nil {
return 0, 0, 0, nil, &OpError{"read", fd.net, fd.laddr, err}
}
for {
@@ -255,7 +247,7 @@
if err != nil {
// TODO(dfc) should n and oobn be set to 0
if err == syscall.EAGAIN {
- if err = fd.pollServer.WaitRead(fd); err == nil {
+ if err = fd.pd.WaitRead(); err == nil {
continue
}
}
@@ -283,7 +275,7 @@
return 0, err
}
defer fd.decref()
- if err := fd.pollServer.PrepareWrite(fd); err != nil {
+ if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
for {
@@ -296,7 +288,7 @@
break
}
if err == syscall.EAGAIN {
- if err = fd.pollServer.WaitWrite(fd); err == nil {
+ if err = fd.pd.WaitWrite(); err == nil {
continue
}
}
@@ -322,13 +314,13 @@
return 0, err
}
defer fd.decref()
- if err := fd.pollServer.PrepareWrite(fd); err != nil {
+ if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
for {
err = syscall.Sendto(fd.sysfd, p, 0, sa)
if err == syscall.EAGAIN {
- if err = fd.pollServer.WaitWrite(fd); err == nil {
+ if err = fd.pd.WaitWrite(); err == nil {
continue
}
}
@@ -349,13 +341,13 @@
return 0, 0, err
}
defer fd.decref()
- if err := fd.pollServer.PrepareWrite(fd); err != nil {
+ if err := fd.pd.PrepareWrite(); err != nil {
return 0, 0, &OpError{"write", fd.net, fd.raddr, err}
}
for {
err = syscall.Sendmsg(fd.sysfd, p, oob, sa, 0)
if err == syscall.EAGAIN {
- if err = fd.pollServer.WaitWrite(fd); err == nil {
+ if err = fd.pd.WaitWrite(); err == nil {
continue
}
}
@@ -380,14 +372,14 @@
var s int
var rsa syscall.Sockaddr
- if err = fd.pollServer.PrepareRead(fd); err != nil {
+ if err = fd.pd.PrepareRead(); err != nil {
return nil, &OpError{"accept", fd.net, fd.laddr, err}
}
for {
s, rsa, err = accept(fd.sysfd)
if err != nil {
if err == syscall.EAGAIN {
- if err = fd.pollServer.WaitRead(fd); err == nil {
+ if err = fd.pd.WaitRead(); err == nil {
continue
}
} else if err == syscall.ECONNABORTED {
diff --git a/src/pkg/net/sendfile_freebsd.go b/src/pkg/net/sendfile_freebsd.go
index 8008bc3..dc5b767 100644
--- a/src/pkg/net/sendfile_freebsd.go
+++ b/src/pkg/net/sendfile_freebsd.go
@@ -83,7 +83,7 @@
break
}
if err1 == syscall.EAGAIN {
- if err1 = c.pollServer.WaitWrite(c); err1 == nil {
+ if err1 = c.pd.WaitWrite(); err1 == nil {
continue
}
}
diff --git a/src/pkg/net/sendfile_linux.go b/src/pkg/net/sendfile_linux.go
index 3357e65..6f1323b 100644
--- a/src/pkg/net/sendfile_linux.go
+++ b/src/pkg/net/sendfile_linux.go
@@ -59,7 +59,7 @@
break
}
if err1 == syscall.EAGAIN {
- if err1 = c.pollServer.WaitWrite(c); err1 == nil {
+ if err1 = c.pd.WaitWrite(); err1 == nil {
continue
}
}