net: add special netFD mutex
The mutex, fdMutex, handles locking and lifetime of sysfd,
and serializes Read and Write methods.
This allows to strip 2 sync.Mutex.Lock calls,
2 sync.Mutex.Unlock calls, 1 defer and some amount
of misc overhead from every network operation.
On linux/amd64, Intel E5-2690:
benchmark old ns/op new ns/op delta
BenchmarkTCP4Persistent 9595 9454 -1.47%
BenchmarkTCP4Persistent-2 8978 8772 -2.29%
BenchmarkTCP4ConcurrentReadWrite 4900 4625 -5.61%
BenchmarkTCP4ConcurrentReadWrite-2 2603 2500 -3.96%
In general it strips 70-500 ns from every network operation depending
on processor model. On my relatively new E5-2690 it accounts to ~5%
of network op cost.
Fixes #6074.
R=golang-dev, bradfitz, alex.brainman, iant, mikioh.mikioh
CC=golang-dev
https://golang.org/cl/12418043
diff --git a/src/pkg/net/fd_mutex.go b/src/pkg/net/fd_mutex.go
new file mode 100644
index 0000000..1caf974
--- /dev/null
+++ b/src/pkg/net/fd_mutex.go
@@ -0,0 +1,184 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import "sync/atomic"
+
+// fdMutex is a specialized synchronization primitive
+// that manages lifetime of an fd and serializes access
+// to Read and Write methods on netFD.
+type fdMutex struct {
+ state uint64
+ rsema uint32
+ wsema uint32
+}
+
+// fdMutex.state is organized as follows:
+// 1 bit - whether netFD is closed, if set all subsequent lock operations will fail.
+// 1 bit - lock for read operations.
+// 1 bit - lock for write operations.
+// 20 bits - total number of references (read+write+misc).
+// 20 bits - number of outstanding read waiters.
+// 20 bits - number of outstanding write waiters.
+const (
+ mutexClosed = 1 << 0
+ mutexRLock = 1 << 1
+ mutexWLock = 1 << 2
+ mutexRef = 1 << 3
+ mutexRefMask = (1<<20 - 1) << 3
+ mutexRWait = 1 << 23
+ mutexRMask = (1<<20 - 1) << 23
+ mutexWWait = 1 << 43
+ mutexWMask = (1<<20 - 1) << 43
+)
+
+// Read operations must do RWLock(true)/RWUnlock(true).
+// Write operations must do RWLock(false)/RWUnlock(false).
+// Misc operations must do Incref/Decref. Misc operations include functions like
+// setsockopt and setDeadline. They need to use Incref/Decref to ensure that
+// they operate on the correct fd in presence of a concurrent Close call
+// (otherwise fd can be closed under their feet).
+// Close operation must do IncrefAndClose/Decref.
+
+// RWLock/Incref return whether fd is open.
+// RWUnlock/Decref return whether fd is closed and there are no remaining references.
+
+func (mu *fdMutex) Incref() bool {
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexClosed != 0 {
+ return false
+ }
+ new := old + mutexRef
+ if new&mutexRefMask == 0 {
+ panic("net: inconsistent fdMutex")
+ }
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ return true
+ }
+ }
+}
+
+func (mu *fdMutex) IncrefAndClose() bool {
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexClosed != 0 {
+ return false
+ }
+ // Mark as closed and acquire a reference.
+ new := (old | mutexClosed) + mutexRef
+ if new&mutexRefMask == 0 {
+ panic("net: inconsistent fdMutex")
+ }
+ // Remove all read and write waiters.
+ new &^= mutexRMask | mutexWMask
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ // Wake all read and write waiters,
+ // they will observe closed flag after wakeup.
+ for old&mutexRMask != 0 {
+ old -= mutexRWait
+ runtime_Semrelease(&mu.rsema)
+ }
+ for old&mutexWMask != 0 {
+ old -= mutexWWait
+ runtime_Semrelease(&mu.wsema)
+ }
+ return true
+ }
+ }
+}
+
+func (mu *fdMutex) Decref() bool {
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexRefMask == 0 {
+ panic("net: inconsistent fdMutex")
+ }
+ new := old - mutexRef
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ return new&(mutexClosed|mutexRef) == mutexClosed
+ }
+ }
+}
+
+func (mu *fdMutex) RWLock(read bool) bool {
+ var mutexBit, mutexWait, mutexMask uint64
+ var mutexSema *uint32
+ if read {
+ mutexBit = mutexRLock
+ mutexWait = mutexRWait
+ mutexMask = mutexRMask
+ mutexSema = &mu.rsema
+ } else {
+ mutexBit = mutexWLock
+ mutexWait = mutexWWait
+ mutexMask = mutexWMask
+ mutexSema = &mu.wsema
+ }
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexClosed != 0 {
+ return false
+ }
+ var new uint64
+ if old&mutexBit == 0 {
+ // Lock is free, acquire it.
+ new = (old | mutexBit) + mutexRef
+ if new&mutexRefMask == 0 {
+ panic("net: inconsistent fdMutex")
+ }
+ } else {
+ // Wait for lock.
+ new = old + mutexWait
+ if new&mutexMask == 0 {
+ panic("net: inconsistent fdMutex")
+ }
+ }
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ if old&mutexBit == 0 {
+ return true
+ }
+ runtime_Semacquire(mutexSema)
+ // The signaller has subtracted mutexWait.
+ }
+ }
+}
+
+func (mu *fdMutex) RWUnlock(read bool) bool {
+ var mutexBit, mutexWait, mutexMask uint64
+ var mutexSema *uint32
+ if read {
+ mutexBit = mutexRLock
+ mutexWait = mutexRWait
+ mutexMask = mutexRMask
+ mutexSema = &mu.rsema
+ } else {
+ mutexBit = mutexWLock
+ mutexWait = mutexWWait
+ mutexMask = mutexWMask
+ mutexSema = &mu.wsema
+ }
+ for {
+ old := atomic.LoadUint64(&mu.state)
+ if old&mutexBit == 0 || old&mutexRefMask == 0 {
+ panic("net: inconsistent fdMutex")
+ }
+ // Drop lock, drop reference and wake read waiter if present.
+ new := (old &^ mutexBit) - mutexRef
+ if old&mutexMask != 0 {
+ new -= mutexWait
+ }
+ if atomic.CompareAndSwapUint64(&mu.state, old, new) {
+ if old&mutexMask != 0 {
+ runtime_Semrelease(mutexSema)
+ }
+ return new&(mutexClosed|mutexRef) == mutexClosed
+ }
+ }
+}
+
+// Implemented in runtime package.
+func runtime_Semacquire(sema *uint32)
+func runtime_Semrelease(sema *uint32)
diff --git a/src/pkg/net/fd_mutex_test.go b/src/pkg/net/fd_mutex_test.go
new file mode 100644
index 0000000..8383084
--- /dev/null
+++ b/src/pkg/net/fd_mutex_test.go
@@ -0,0 +1,186 @@
+// Copyright 2013 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package net
+
+import (
+ "math/rand"
+ "runtime"
+ "testing"
+ "time"
+)
+
+func TestMutexLock(t *testing.T) {
+ var mu fdMutex
+
+ if !mu.Incref() {
+ t.Fatal("broken")
+ }
+ if mu.Decref() {
+ t.Fatal("broken")
+ }
+
+ if !mu.RWLock(true) {
+ t.Fatal("broken")
+ }
+ if mu.RWUnlock(true) {
+ t.Fatal("broken")
+ }
+
+ if !mu.RWLock(false) {
+ t.Fatal("broken")
+ }
+ if mu.RWUnlock(false) {
+ t.Fatal("broken")
+ }
+}
+
+func TestMutexClose(t *testing.T) {
+ var mu fdMutex
+ if !mu.IncrefAndClose() {
+ t.Fatal("broken")
+ }
+
+ if mu.Incref() {
+ t.Fatal("broken")
+ }
+ if mu.RWLock(true) {
+ t.Fatal("broken")
+ }
+ if mu.RWLock(false) {
+ t.Fatal("broken")
+ }
+ if mu.IncrefAndClose() {
+ t.Fatal("broken")
+ }
+}
+
+func TestMutexCloseUnblock(t *testing.T) {
+ c := make(chan bool)
+ var mu fdMutex
+ mu.RWLock(true)
+ for i := 0; i < 4; i++ {
+ go func() {
+ if mu.RWLock(true) {
+ t.Fatal("broken")
+ }
+ c <- true
+ }()
+ }
+ // Concurrent goroutines must not be able to read lock the mutex.
+ time.Sleep(time.Millisecond)
+ select {
+ case <-c:
+ t.Fatal("broken")
+ default:
+ }
+ mu.IncrefAndClose() // Must unblock the readers.
+ for i := 0; i < 4; i++ {
+ select {
+ case <-c:
+ case <-time.After(10 * time.Second):
+ t.Fatal("broken")
+ }
+ }
+ if mu.Decref() {
+ t.Fatal("broken")
+ }
+ if !mu.RWUnlock(true) {
+ t.Fatal("broken")
+ }
+}
+
+func TestMutexPanic(t *testing.T) {
+ ensurePanics := func(f func()) {
+ defer func() {
+ if recover() == nil {
+ t.Fatal("does not panic")
+ }
+ }()
+ f()
+ }
+
+ var mu fdMutex
+ ensurePanics(func() { mu.Decref() })
+ ensurePanics(func() { mu.RWUnlock(true) })
+ ensurePanics(func() { mu.RWUnlock(false) })
+
+ ensurePanics(func() { mu.Incref(); mu.Decref(); mu.Decref() })
+ ensurePanics(func() { mu.RWLock(true); mu.RWUnlock(true); mu.RWUnlock(true) })
+ ensurePanics(func() { mu.RWLock(false); mu.RWUnlock(false); mu.RWUnlock(false) })
+
+ // ensure that it's still not broken
+ mu.Incref()
+ mu.Decref()
+ mu.RWLock(true)
+ mu.RWUnlock(true)
+ mu.RWLock(false)
+ mu.RWUnlock(false)
+}
+
+func TestMutexStress(t *testing.T) {
+ P := 8
+ N := int(1e6)
+ if testing.Short() {
+ P = 4
+ N = 1e4
+ }
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(P))
+ done := make(chan bool)
+ var mu fdMutex
+ var readState [2]uint64
+ var writeState [2]uint64
+ for p := 0; p < P; p++ {
+ go func() {
+ r := rand.New(rand.NewSource(rand.Int63()))
+ for i := 0; i < N; i++ {
+ switch r.Intn(3) {
+ case 0:
+ if !mu.Incref() {
+ t.Fatal("broken")
+ }
+ if mu.Decref() {
+ t.Fatal("broken")
+ }
+ case 1:
+ if !mu.RWLock(true) {
+ t.Fatal("broken")
+ }
+ // Ensure that it provides mutual exclusion for readers.
+ if readState[0] != readState[1] {
+ t.Fatal("broken")
+ }
+ readState[0]++
+ readState[1]++
+ if mu.RWUnlock(true) {
+ t.Fatal("broken")
+ }
+ case 2:
+ if !mu.RWLock(false) {
+ t.Fatal("broken")
+ }
+ // Ensure that it provides mutual exclusion for writers.
+ if writeState[0] != writeState[1] {
+ t.Fatal("broken")
+ }
+ writeState[0]++
+ writeState[1]++
+ if mu.RWUnlock(false) {
+ t.Fatal("broken")
+ }
+ }
+ }
+ done <- true
+ }()
+ }
+ for p := 0; p < P; p++ {
+ <-done
+ }
+ if !mu.IncrefAndClose() {
+ t.Fatal("broken")
+ }
+ if !mu.Decref() {
+ t.Fatal("broken")
+ }
+}
diff --git a/src/pkg/net/fd_poll_runtime.go b/src/pkg/net/fd_poll_runtime.go
index 6ae5c60..03474cf 100644
--- a/src/pkg/net/fd_poll_runtime.go
+++ b/src/pkg/net/fd_poll_runtime.go
@@ -132,7 +132,7 @@
if t.IsZero() {
d = 0
}
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
runtime_pollSetDeadline(fd.pd.runtimeCtx, d, mode)
diff --git a/src/pkg/net/fd_unix.go b/src/pkg/net/fd_unix.go
index a2a7714..f704c0a 100644
--- a/src/pkg/net/fd_unix.go
+++ b/src/pkg/net/fd_unix.go
@@ -10,7 +10,6 @@
"io"
"os"
"runtime"
- "sync"
"sync/atomic"
"syscall"
"time"
@@ -18,13 +17,8 @@
// Network file descriptor.
type netFD struct {
- // locking/lifetime of sysfd
- sysmu sync.Mutex
- sysref int
-
- // must lock both sysmu and pollDesc to write
- // can lock either to read
- closing bool
+ // locking/lifetime of sysfd + serialize access to Read and Write methods
+ fdmu fdMutex
// immutable until Close
sysfd int
@@ -35,9 +29,6 @@
laddr Addr
raddr Addr
- // serialize access to Read and Write methods
- rio, wio sync.Mutex
-
// wait server
pd pollDesc
}
@@ -84,8 +75,9 @@
}
func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
- fd.wio.Lock()
- defer fd.wio.Unlock()
+ // Do not need to call fd.writeLock here,
+ // because fd is not yet accessible to user,
+ // so no concurrent operations are possible.
if err := fd.pd.PrepareWrite(); err != nil {
return err
}
@@ -104,44 +96,69 @@
return nil
}
+func (fd *netFD) destroy() {
+ // Poller may want to unregister fd in readiness notification mechanism,
+ // so this must be executed before closesocket.
+ fd.pd.Close()
+ closesocket(fd.sysfd)
+ fd.sysfd = -1
+ runtime.SetFinalizer(fd, nil)
+}
+
// Add a reference to this fd.
-// If closing==true, pollDesc must be locked; mark the fd as closing.
// Returns an error if the fd cannot be used.
-func (fd *netFD) incref(closing bool) error {
- fd.sysmu.Lock()
- if fd.closing {
- fd.sysmu.Unlock()
+func (fd *netFD) incref() error {
+ if !fd.fdmu.Incref() {
return errClosing
}
- fd.sysref++
- if closing {
- fd.closing = true
- }
- fd.sysmu.Unlock()
return nil
}
-// Remove a reference to this FD and close if we've been asked to do so (and
-// there are no references left.
+// Remove a reference to this FD and close if we've been asked to do so
+// (and there are no references left).
func (fd *netFD) decref() {
- fd.sysmu.Lock()
- fd.sysref--
- if fd.closing && fd.sysref == 0 {
- // Poller may want to unregister fd in readiness notification mechanism,
- // so this must be executed before closesocket.
- fd.pd.Close()
- closesocket(fd.sysfd)
- fd.sysfd = -1
- runtime.SetFinalizer(fd, nil)
+ if fd.fdmu.Decref() {
+ fd.destroy()
}
- fd.sysmu.Unlock()
+}
+
+// Add a reference to this fd and lock for reading.
+// Returns an error if the fd cannot be used.
+func (fd *netFD) readLock() error {
+ if !fd.fdmu.RWLock(true) {
+ return errClosing
+ }
+ return nil
+}
+
+// Unlock for reading and remove a reference to this FD.
+func (fd *netFD) readUnlock() {
+ if fd.fdmu.RWUnlock(true) {
+ fd.destroy()
+ }
+}
+
+// Add a reference to this fd and lock for writing.
+// Returns an error if the fd cannot be used.
+func (fd *netFD) writeLock() error {
+ if !fd.fdmu.RWLock(false) {
+ return errClosing
+ }
+ return nil
+}
+
+// Unlock for writing and remove a reference to this FD.
+func (fd *netFD) writeUnlock() {
+ if fd.fdmu.RWUnlock(false) {
+ fd.destroy()
+ }
}
func (fd *netFD) Close() error {
fd.pd.Lock() // needed for both fd.incref(true) and pollDesc.Evict
- if err := fd.incref(true); err != nil {
+ if !fd.fdmu.IncrefAndClose() {
fd.pd.Unlock()
- return err
+ return errClosing
}
// Unblock any I/O. Once it all unblocks and returns,
// so that it cannot be referring to fd.sysfd anymore,
@@ -158,7 +175,7 @@
}
func (fd *netFD) shutdown(how int) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -178,12 +195,10 @@
}
func (fd *netFD) Read(p []byte) (n int, err error) {
- fd.rio.Lock()
- defer fd.rio.Unlock()
- if err := fd.incref(false); err != nil {
+ if err := fd.readLock(); err != nil {
return 0, err
}
- defer fd.decref()
+ defer fd.readUnlock()
if err := fd.pd.PrepareRead(); err != nil {
return 0, &OpError{"read", fd.net, fd.raddr, err}
}
@@ -207,12 +222,10 @@
}
func (fd *netFD) ReadFrom(p []byte) (n int, sa syscall.Sockaddr, err error) {
- fd.rio.Lock()
- defer fd.rio.Unlock()
- if err := fd.incref(false); err != nil {
+ if err := fd.readLock(); err != nil {
return 0, nil, err
}
- defer fd.decref()
+ defer fd.readUnlock()
if err := fd.pd.PrepareRead(); err != nil {
return 0, nil, &OpError{"read", fd.net, fd.laddr, err}
}
@@ -236,12 +249,10 @@
}
func (fd *netFD) ReadMsg(p []byte, oob []byte) (n, oobn, flags int, sa syscall.Sockaddr, err error) {
- fd.rio.Lock()
- defer fd.rio.Unlock()
- if err := fd.incref(false); err != nil {
+ if err := fd.readLock(); err != nil {
return 0, 0, 0, nil, err
}
- defer fd.decref()
+ defer fd.readUnlock()
if err := fd.pd.PrepareRead(); err != nil {
return 0, 0, 0, nil, &OpError{"read", fd.net, fd.laddr, err}
}
@@ -272,12 +283,10 @@
}
func (fd *netFD) Write(p []byte) (nn int, err error) {
- fd.wio.Lock()
- defer fd.wio.Unlock()
- if err := fd.incref(false); err != nil {
+ if err := fd.writeLock(); err != nil {
return 0, err
}
- defer fd.decref()
+ defer fd.writeUnlock()
if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
@@ -311,12 +320,10 @@
}
func (fd *netFD) WriteTo(p []byte, sa syscall.Sockaddr) (n int, err error) {
- fd.wio.Lock()
- defer fd.wio.Unlock()
- if err := fd.incref(false); err != nil {
+ if err := fd.writeLock(); err != nil {
return 0, err
}
- defer fd.decref()
+ defer fd.writeUnlock()
if err := fd.pd.PrepareWrite(); err != nil {
return 0, &OpError{"write", fd.net, fd.raddr, err}
}
@@ -338,12 +345,10 @@
}
func (fd *netFD) WriteMsg(p []byte, oob []byte, sa syscall.Sockaddr) (n int, oobn int, err error) {
- fd.wio.Lock()
- defer fd.wio.Unlock()
- if err := fd.incref(false); err != nil {
+ if err := fd.writeLock(); err != nil {
return 0, 0, err
}
- defer fd.decref()
+ defer fd.writeUnlock()
if err := fd.pd.PrepareWrite(); err != nil {
return 0, 0, &OpError{"write", fd.net, fd.raddr, err}
}
@@ -366,12 +371,10 @@
}
func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (netfd *netFD, err error) {
- fd.rio.Lock()
- defer fd.rio.Unlock()
- if err := fd.incref(false); err != nil {
+ if err := fd.readLock(); err != nil {
return nil, err
}
- defer fd.decref()
+ defer fd.readUnlock()
var s int
var rsa syscall.Sockaddr
diff --git a/src/pkg/net/fd_windows.go b/src/pkg/net/fd_windows.go
index ff3966e..ff01902 100644
--- a/src/pkg/net/fd_windows.go
+++ b/src/pkg/net/fd_windows.go
@@ -105,7 +105,6 @@
qty uint32
// fields used only by net package
- mu sync.Mutex
fd *netFD
errc chan error
buf syscall.WSABuf
@@ -246,10 +245,8 @@
// Network file descriptor.
type netFD struct {
- // locking/lifetime of sysfd
- sysmu sync.Mutex
- sysref int
- closing bool
+ // locking/lifetime of sysfd + serialize access to Read and Write methods
+ fdmu fdMutex
// immutable until Close
sysfd syscall.Handle
@@ -313,6 +310,9 @@
}
func (fd *netFD) connect(la, ra syscall.Sockaddr) error {
+ // Do not need to call fd.writeLock here,
+ // because fd is not yet accessible to user,
+ // so no concurrent operations are possible.
if !canUseConnectEx(fd.net) {
return syscall.Connect(fd.sysfd, ra)
}
@@ -332,8 +332,6 @@
}
// Call ConnectEx API.
o := &fd.wop
- o.mu.Lock()
- defer o.mu.Unlock()
o.sa = ra
_, err := iosrv.ExecIO(o, "ConnectEx", func(o *operation) error {
return syscall.ConnectEx(o.fd.sysfd, o.sa, nil, 0, nil, &o.o)
@@ -345,64 +343,80 @@
return syscall.Setsockopt(fd.sysfd, syscall.SOL_SOCKET, syscall.SO_UPDATE_CONNECT_CONTEXT, (*byte)(unsafe.Pointer(&fd.sysfd)), int32(unsafe.Sizeof(fd.sysfd)))
}
+func (fd *netFD) destroy() {
+ if fd.sysfd == syscall.InvalidHandle {
+ return
+ }
+ // Poller may want to unregister fd in readiness notification mechanism,
+ // so this must be executed before closesocket.
+ fd.pd.Close()
+ closesocket(fd.sysfd)
+ fd.sysfd = syscall.InvalidHandle
+ // no need for a finalizer anymore
+ runtime.SetFinalizer(fd, nil)
+}
+
// Add a reference to this fd.
-// If closing==true, mark the fd as closing.
// Returns an error if the fd cannot be used.
-func (fd *netFD) incref(closing bool) error {
- if fd == nil {
+func (fd *netFD) incref() error {
+ if !fd.fdmu.Incref() {
return errClosing
}
- fd.sysmu.Lock()
- if fd.closing {
- fd.sysmu.Unlock()
- return errClosing
- }
- fd.sysref++
- if closing {
- fd.closing = true
- }
- closing = fd.closing
- fd.sysmu.Unlock()
return nil
}
-// Remove a reference to this FD and close if we've been asked to do so (and
-// there are no references left.
+// Remove a reference to this FD and close if we've been asked to do so
+// (and there are no references left).
func (fd *netFD) decref() {
- if fd == nil {
- return
+ if fd.fdmu.Decref() {
+ fd.destroy()
}
- fd.sysmu.Lock()
- fd.sysref--
- if fd.closing && fd.sysref == 0 && fd.sysfd != syscall.InvalidHandle {
- // Poller may want to unregister fd in readiness notification mechanism,
- // so this must be executed before closesocket.
- fd.pd.Close()
- closesocket(fd.sysfd)
- fd.sysfd = syscall.InvalidHandle
- // no need for a finalizer anymore
- runtime.SetFinalizer(fd, nil)
+}
+
+// Add a reference to this fd and lock for reading.
+// Returns an error if the fd cannot be used.
+func (fd *netFD) readLock() error {
+ if !fd.fdmu.RWLock(true) {
+ return errClosing
}
- fd.sysmu.Unlock()
+ return nil
+}
+
+// Unlock for reading and remove a reference to this FD.
+func (fd *netFD) readUnlock() {
+ if fd.fdmu.RWUnlock(true) {
+ fd.destroy()
+ }
+}
+
+// Add a reference to this fd and lock for writing.
+// Returns an error if the fd cannot be used.
+func (fd *netFD) writeLock() error {
+ if !fd.fdmu.RWLock(false) {
+ return errClosing
+ }
+ return nil
+}
+
+// Unlock for writing and remove a reference to this FD.
+func (fd *netFD) writeUnlock() {
+ if fd.fdmu.RWUnlock(false) {
+ fd.destroy()
+ }
}
func (fd *netFD) Close() error {
- if err := fd.incref(true); err != nil {
- return err
+ if !fd.fdmu.IncrefAndClose() {
+ return errClosing
}
- defer fd.decref()
// unblock pending reader and writer
fd.pd.Evict()
- // wait for both reader and writer to exit
- fd.rop.mu.Lock()
- fd.wop.mu.Lock()
- fd.rop.mu.Unlock()
- fd.wop.mu.Unlock()
+ fd.decref()
return nil
}
func (fd *netFD) shutdown(how int) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -422,13 +436,11 @@
}
func (fd *netFD) Read(buf []byte) (int, error) {
- if err := fd.incref(false); err != nil {
+ if err := fd.readLock(); err != nil {
return 0, err
}
- defer fd.decref()
+ defer fd.readUnlock()
o := &fd.rop
- o.mu.Lock()
- defer o.mu.Unlock()
o.InitBuf(buf)
n, err := iosrv.ExecIO(o, "WSARecv", func(o *operation) error {
return syscall.WSARecv(o.fd.sysfd, &o.buf, 1, &o.qty, &o.flags, &o.o, nil)
@@ -443,13 +455,11 @@
if len(buf) == 0 {
return 0, nil, nil
}
- if err := fd.incref(false); err != nil {
+ if err := fd.readLock(); err != nil {
return 0, nil, err
}
- defer fd.decref()
+ defer fd.readUnlock()
o := &fd.rop
- o.mu.Lock()
- defer o.mu.Unlock()
o.InitBuf(buf)
n, err = iosrv.ExecIO(o, "WSARecvFrom", func(o *operation) error {
if o.rsa == nil {
@@ -466,13 +476,11 @@
}
func (fd *netFD) Write(buf []byte) (int, error) {
- if err := fd.incref(false); err != nil {
+ if err := fd.writeLock(); err != nil {
return 0, err
}
- defer fd.decref()
+ defer fd.writeUnlock()
o := &fd.wop
- o.mu.Lock()
- defer o.mu.Unlock()
o.InitBuf(buf)
return iosrv.ExecIO(o, "WSASend", func(o *operation) error {
return syscall.WSASend(o.fd.sysfd, &o.buf, 1, &o.qty, 0, &o.o, nil)
@@ -483,13 +491,11 @@
if len(buf) == 0 {
return 0, nil
}
- if err := fd.incref(false); err != nil {
+ if err := fd.writeLock(); err != nil {
return 0, err
}
- defer fd.decref()
+ defer fd.writeUnlock()
o := &fd.wop
- o.mu.Lock()
- defer o.mu.Unlock()
o.InitBuf(buf)
o.sa = sa
return iosrv.ExecIO(o, "WSASendto", func(o *operation) error {
@@ -498,10 +504,10 @@
}
func (fd *netFD) accept(toAddr func(syscall.Sockaddr) Addr) (*netFD, error) {
- if err := fd.incref(false); err != nil {
+ if err := fd.readLock(); err != nil {
return nil, err
}
- defer fd.decref()
+ defer fd.readUnlock()
// Get new socket.
s, err := sysSocket(fd.family, fd.sotype, 0)
@@ -522,8 +528,6 @@
// Submit accept request.
o := &fd.rop
- o.mu.Lock()
- defer o.mu.Unlock()
o.handle = s
var rawsa [2]syscall.RawSockaddrAny
o.rsan = int32(unsafe.Sizeof(rawsa[0]))
diff --git a/src/pkg/net/sendfile_freebsd.go b/src/pkg/net/sendfile_freebsd.go
index dc5b767..42fe799 100644
--- a/src/pkg/net/sendfile_freebsd.go
+++ b/src/pkg/net/sendfile_freebsd.go
@@ -58,12 +58,10 @@
return 0, err, false
}
- c.wio.Lock()
- defer c.wio.Unlock()
- if err := c.incref(false); err != nil {
+ if err := c.writeLock(); err != nil {
return 0, err, true
}
- defer c.decref()
+ defer c.writeUnlock()
dst := c.sysfd
src := int(f.Fd())
diff --git a/src/pkg/net/sendfile_linux.go b/src/pkg/net/sendfile_linux.go
index 6f1323b..5e11763 100644
--- a/src/pkg/net/sendfile_linux.go
+++ b/src/pkg/net/sendfile_linux.go
@@ -36,12 +36,10 @@
return 0, nil, false
}
- c.wio.Lock()
- defer c.wio.Unlock()
- if err := c.incref(false); err != nil {
+ if err := c.writeLock(); err != nil {
return 0, err, true
}
- defer c.decref()
+ defer c.writeUnlock()
dst := c.sysfd
src := int(f.Fd())
diff --git a/src/pkg/net/sendfile_windows.go b/src/pkg/net/sendfile_windows.go
index e9b9f91..0107f67 100644
--- a/src/pkg/net/sendfile_windows.go
+++ b/src/pkg/net/sendfile_windows.go
@@ -34,13 +34,12 @@
return 0, nil, false
}
- if err := fd.incref(false); err != nil {
+ if err := fd.writeLock(); err != nil {
return 0, err, true
}
- defer fd.decref()
+ defer fd.writeUnlock()
+
o := &fd.wop
- o.mu.Lock()
- defer o.mu.Unlock()
o.qty = uint32(n)
o.handle = syscall.Handle(f.Fd())
done, err := iosrv.ExecIO(o, "TransmitFile", func(o *operation) error {
diff --git a/src/pkg/net/sockopt_posix.go b/src/pkg/net/sockopt_posix.go
index 886afc2..da2742c 100644
--- a/src/pkg/net/sockopt_posix.go
+++ b/src/pkg/net/sockopt_posix.go
@@ -101,7 +101,7 @@
}
func setReadBuffer(fd *netFD, bytes int) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -109,7 +109,7 @@
}
func setWriteBuffer(fd *netFD, bytes int) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -117,7 +117,7 @@
}
func setKeepAlive(fd *netFD, keepalive bool) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -133,7 +133,7 @@
l.Onoff = 0
l.Linger = 0
}
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/net/sockoptip_bsd.go b/src/pkg/net/sockoptip_bsd.go
index bcae43c..ca080fd 100644
--- a/src/pkg/net/sockoptip_bsd.go
+++ b/src/pkg/net/sockoptip_bsd.go
@@ -18,7 +18,7 @@
}
var a [4]byte
copy(a[:], ip.To4())
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -26,7 +26,7 @@
}
func setIPv4MulticastLoopback(fd *netFD, v bool) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/net/sockoptip_linux.go b/src/pkg/net/sockoptip_linux.go
index f9cf938..a69b778 100644
--- a/src/pkg/net/sockoptip_linux.go
+++ b/src/pkg/net/sockoptip_linux.go
@@ -15,7 +15,7 @@
v = int32(ifi.Index)
}
mreq := &syscall.IPMreqn{Ifindex: v}
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -23,7 +23,7 @@
}
func setIPv4MulticastLoopback(fd *netFD, v bool) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/net/sockoptip_posix.go b/src/pkg/net/sockoptip_posix.go
index c82eef0..5c2a587 100644
--- a/src/pkg/net/sockoptip_posix.go
+++ b/src/pkg/net/sockoptip_posix.go
@@ -16,7 +16,7 @@
if err := setIPv4MreqToInterface(mreq, ifi); err != nil {
return err
}
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -28,7 +28,7 @@
if ifi != nil {
v = ifi.Index
}
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -36,7 +36,7 @@
}
func setIPv6MulticastLoopback(fd *netFD, v bool) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -49,7 +49,7 @@
if ifi != nil {
mreq.Interface = uint32(ifi.Index)
}
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/net/sockoptip_windows.go b/src/pkg/net/sockoptip_windows.go
index fbaf0ed..7b11f20 100644
--- a/src/pkg/net/sockoptip_windows.go
+++ b/src/pkg/net/sockoptip_windows.go
@@ -17,7 +17,7 @@
}
var a [4]byte
copy(a[:], ip.To4())
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
@@ -25,7 +25,7 @@
}
func setIPv4MulticastLoopback(fd *netFD, v bool) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/net/tcpsockopt_darwin.go b/src/pkg/net/tcpsockopt_darwin.go
index d052a14..3314084 100644
--- a/src/pkg/net/tcpsockopt_darwin.go
+++ b/src/pkg/net/tcpsockopt_darwin.go
@@ -14,7 +14,7 @@
// Set keep alive period.
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/net/tcpsockopt_openbsd.go b/src/pkg/net/tcpsockopt_openbsd.go
index 306f4e0..3480f93 100644
--- a/src/pkg/net/tcpsockopt_openbsd.go
+++ b/src/pkg/net/tcpsockopt_openbsd.go
@@ -14,7 +14,7 @@
// Set keep alive period.
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/net/tcpsockopt_posix.go b/src/pkg/net/tcpsockopt_posix.go
index afd8064..8b41b21 100644
--- a/src/pkg/net/tcpsockopt_posix.go
+++ b/src/pkg/net/tcpsockopt_posix.go
@@ -12,7 +12,7 @@
)
func setNoDelay(fd *netFD, noDelay bool) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/net/tcpsockopt_unix.go b/src/pkg/net/tcpsockopt_unix.go
index dfc0452..fba2acd 100644
--- a/src/pkg/net/tcpsockopt_unix.go
+++ b/src/pkg/net/tcpsockopt_unix.go
@@ -14,7 +14,7 @@
// Set keep alive period.
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/net/tcpsockopt_windows.go b/src/pkg/net/tcpsockopt_windows.go
index 538366d..0bf4312 100644
--- a/src/pkg/net/tcpsockopt_windows.go
+++ b/src/pkg/net/tcpsockopt_windows.go
@@ -11,7 +11,7 @@
)
func setKeepAlivePeriod(fd *netFD, d time.Duration) error {
- if err := fd.incref(false); err != nil {
+ if err := fd.incref(); err != nil {
return err
}
defer fd.decref()
diff --git a/src/pkg/runtime/mgc0.c b/src/pkg/runtime/mgc0.c
index abf5df1..3c7df99 100644
--- a/src/pkg/runtime/mgc0.c
+++ b/src/pkg/runtime/mgc0.c
@@ -2019,7 +2019,7 @@
if(gcpercent < 0)
return;
- runtime·semacquire(&runtime·worldsema);
+ runtime·semacquire(&runtime·worldsema, false);
if(!force && mstats.heap_alloc < mstats.next_gc) {
// typically threads which lost the race to grab
// worldsema exit here when gc is done.
@@ -2218,7 +2218,7 @@
// because stoptheworld can only be used by
// one goroutine at a time, and there might be
// a pending garbage collection already calling it.
- runtime·semacquire(&runtime·worldsema);
+ runtime·semacquire(&runtime·worldsema, false);
m->gcing = 1;
runtime·stoptheworld();
updatememstats(nil);
diff --git a/src/pkg/runtime/mprof.goc b/src/pkg/runtime/mprof.goc
index 0d89a26..6e51ef3 100644
--- a/src/pkg/runtime/mprof.goc
+++ b/src/pkg/runtime/mprof.goc
@@ -447,7 +447,7 @@
pc = (uintptr)runtime·getcallerpc(&b);
if(all) {
- runtime·semacquire(&runtime·worldsema);
+ runtime·semacquire(&runtime·worldsema, false);
m->gcing = 1;
runtime·stoptheworld();
}
@@ -494,7 +494,7 @@
ok = false;
n = runtime·gcount();
if(n <= b.len) {
- runtime·semacquire(&runtime·worldsema);
+ runtime·semacquire(&runtime·worldsema, false);
m->gcing = 1;
runtime·stoptheworld();
diff --git a/src/pkg/runtime/netpoll.goc b/src/pkg/runtime/netpoll.goc
index ebe6def..ec6a411 100644
--- a/src/pkg/runtime/netpoll.goc
+++ b/src/pkg/runtime/netpoll.goc
@@ -206,6 +206,14 @@
runtime·ready(wg);
}
+func runtime_Semacquire(addr *uint32) {
+ runtime·semacquire(addr, true);
+}
+
+func runtime_Semrelease(addr *uint32) {
+ runtime·semrelease(addr);
+}
+
uintptr
runtime·netpollfd(PollDesc *pd)
{
diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c
index 1c39807..95b39b6 100644
--- a/src/pkg/runtime/proc.c
+++ b/src/pkg/runtime/proc.c
@@ -1836,7 +1836,7 @@
}
runtime·unlock(&runtime·sched);
- runtime·semacquire(&runtime·worldsema);
+ runtime·semacquire(&runtime·worldsema, false);
m->gcing = 1;
runtime·stoptheworld();
newprocs = n;
diff --git a/src/pkg/runtime/race.c b/src/pkg/runtime/race.c
index 557da6f..875375d 100644
--- a/src/pkg/runtime/race.c
+++ b/src/pkg/runtime/race.c
@@ -326,7 +326,7 @@
void
runtime·RaceSemacquire(uint32 *s)
{
- runtime·semacquire(s);
+ runtime·semacquire(s, false);
}
// func RaceSemrelease(s *uint32)
diff --git a/src/pkg/runtime/runtime.h b/src/pkg/runtime/runtime.h
index 7d04a75..a3edb5e 100644
--- a/src/pkg/runtime/runtime.h
+++ b/src/pkg/runtime/runtime.h
@@ -1021,7 +1021,7 @@
bool runtime·isNaN(float64 f);
float64 runtime·ldexp(float64 d, int32 e);
float64 runtime·modf(float64 d, float64 *ip);
-void runtime·semacquire(uint32*);
+void runtime·semacquire(uint32*, bool);
void runtime·semrelease(uint32*);
int32 runtime·gomaxprocsfunc(int32 n);
void runtime·procyield(uint32);
diff --git a/src/pkg/runtime/sema.goc b/src/pkg/runtime/sema.goc
index 4df01fc..05222e2 100644
--- a/src/pkg/runtime/sema.goc
+++ b/src/pkg/runtime/sema.goc
@@ -98,8 +98,8 @@
return 0;
}
-static void
-semacquireimpl(uint32 volatile *addr, int32 profile)
+void
+runtime·semacquire(uint32 volatile *addr, bool profile)
{
Sema s; // Needs to be allocated on stack, otherwise garbage collector could deallocate it
SemaRoot *root;
@@ -145,12 +145,6 @@
}
void
-runtime·semacquire(uint32 volatile *addr)
-{
- semacquireimpl(addr, 0);
-}
-
-void
runtime·semrelease(uint32 volatile *addr)
{
Sema *s;
@@ -189,7 +183,7 @@
}
func runtime_Semacquire(addr *uint32) {
- semacquireimpl(addr, 1);
+ runtime·semacquire(addr, true);
}
func runtime_Semrelease(addr *uint32) {