| // Copyright 2013 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package poll |
| |
| import "sync/atomic" |
| |
| // fdMutex is a specialized synchronization primitive that manages |
| // lifetime of an fd and serializes access to Read, Write and Close |
| // methods on FD. |
| type fdMutex struct { |
| state uint64 |
| rsema uint32 |
| wsema uint32 |
| } |
| |
| // fdMutex.state is organized as follows: |
| // 1 bit - whether FD is closed, if set all subsequent lock operations will fail. |
| // 1 bit - lock for read operations. |
| // 1 bit - lock for write operations. |
| // 20 bits - total number of references (read+write+misc). |
| // 20 bits - number of outstanding read waiters. |
| // 20 bits - number of outstanding write waiters. |
| const ( |
| mutexClosed = 1 << 0 |
| mutexRLock = 1 << 1 |
| mutexWLock = 1 << 2 |
| mutexRef = 1 << 3 |
| mutexRefMask = (1<<20 - 1) << 3 |
| mutexRWait = 1 << 23 |
| mutexRMask = (1<<20 - 1) << 23 |
| mutexWWait = 1 << 43 |
| mutexWMask = (1<<20 - 1) << 43 |
| ) |
| |
| const overflowMsg = "too many concurrent operations on a single file or socket (max 1048575)" |
| |
| // Read operations must do rwlock(true)/rwunlock(true). |
| // |
| // Write operations must do rwlock(false)/rwunlock(false). |
| // |
| // Misc operations must do incref/decref. |
| // Misc operations include functions like setsockopt and setDeadline. |
| // They need to use incref/decref to ensure that they operate on the |
| // correct fd in presence of a concurrent close call (otherwise fd can |
| // be closed under their feet). |
| // |
| // Close operations must do increfAndClose/decref. |
| |
| // incref adds a reference to mu. |
| // It reports whether mu is available for reading or writing. |
| func (mu *fdMutex) incref() bool { |
| for { |
| old := atomic.LoadUint64(&mu.state) |
| if old&mutexClosed != 0 { |
| return false |
| } |
| new := old + mutexRef |
| if new&mutexRefMask == 0 { |
| panic(overflowMsg) |
| } |
| if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
| return true |
| } |
| } |
| } |
| |
| // increfAndClose sets the state of mu to closed. |
| // It returns false if the file was already closed. |
| func (mu *fdMutex) increfAndClose() bool { |
| for { |
| old := atomic.LoadUint64(&mu.state) |
| if old&mutexClosed != 0 { |
| return false |
| } |
| // Mark as closed and acquire a reference. |
| new := (old | mutexClosed) + mutexRef |
| if new&mutexRefMask == 0 { |
| panic(overflowMsg) |
| } |
| // Remove all read and write waiters. |
| new &^= mutexRMask | mutexWMask |
| if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
| // Wake all read and write waiters, |
| // they will observe closed flag after wakeup. |
| for old&mutexRMask != 0 { |
| old -= mutexRWait |
| runtime_Semrelease(&mu.rsema) |
| } |
| for old&mutexWMask != 0 { |
| old -= mutexWWait |
| runtime_Semrelease(&mu.wsema) |
| } |
| return true |
| } |
| } |
| } |
| |
| // decref removes a reference from mu. |
| // It reports whether there is no remaining reference. |
| func (mu *fdMutex) decref() bool { |
| for { |
| old := atomic.LoadUint64(&mu.state) |
| if old&mutexRefMask == 0 { |
| panic("inconsistent poll.fdMutex") |
| } |
| new := old - mutexRef |
| if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
| return new&(mutexClosed|mutexRefMask) == mutexClosed |
| } |
| } |
| } |
| |
| // lock adds a reference to mu and locks mu. |
| // It reports whether mu is available for reading or writing. |
| func (mu *fdMutex) rwlock(read bool) bool { |
| var mutexBit, mutexWait, mutexMask uint64 |
| var mutexSema *uint32 |
| if read { |
| mutexBit = mutexRLock |
| mutexWait = mutexRWait |
| mutexMask = mutexRMask |
| mutexSema = &mu.rsema |
| } else { |
| mutexBit = mutexWLock |
| mutexWait = mutexWWait |
| mutexMask = mutexWMask |
| mutexSema = &mu.wsema |
| } |
| for { |
| old := atomic.LoadUint64(&mu.state) |
| if old&mutexClosed != 0 { |
| return false |
| } |
| var new uint64 |
| if old&mutexBit == 0 { |
| // Lock is free, acquire it. |
| new = (old | mutexBit) + mutexRef |
| if new&mutexRefMask == 0 { |
| panic(overflowMsg) |
| } |
| } else { |
| // Wait for lock. |
| new = old + mutexWait |
| if new&mutexMask == 0 { |
| panic(overflowMsg) |
| } |
| } |
| if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
| if old&mutexBit == 0 { |
| return true |
| } |
| runtime_Semacquire(mutexSema) |
| // The signaller has subtracted mutexWait. |
| } |
| } |
| } |
| |
| // unlock removes a reference from mu and unlocks mu. |
| // It reports whether there is no remaining reference. |
| func (mu *fdMutex) rwunlock(read bool) bool { |
| var mutexBit, mutexWait, mutexMask uint64 |
| var mutexSema *uint32 |
| if read { |
| mutexBit = mutexRLock |
| mutexWait = mutexRWait |
| mutexMask = mutexRMask |
| mutexSema = &mu.rsema |
| } else { |
| mutexBit = mutexWLock |
| mutexWait = mutexWWait |
| mutexMask = mutexWMask |
| mutexSema = &mu.wsema |
| } |
| for { |
| old := atomic.LoadUint64(&mu.state) |
| if old&mutexBit == 0 || old&mutexRefMask == 0 { |
| panic("inconsistent poll.fdMutex") |
| } |
| // Drop lock, drop reference and wake read waiter if present. |
| new := (old &^ mutexBit) - mutexRef |
| if old&mutexMask != 0 { |
| new -= mutexWait |
| } |
| if atomic.CompareAndSwapUint64(&mu.state, old, new) { |
| if old&mutexMask != 0 { |
| runtime_Semrelease(mutexSema) |
| } |
| return new&(mutexClosed|mutexRefMask) == mutexClosed |
| } |
| } |
| } |
| |
| // Implemented in runtime package. |
| func runtime_Semacquire(sema *uint32) |
| func runtime_Semrelease(sema *uint32) |
| |
| // incref adds a reference to fd. |
| // It returns an error when fd cannot be used. |
| func (fd *FD) incref() error { |
| if !fd.fdmu.incref() { |
| return errClosing(fd.isFile) |
| } |
| return nil |
| } |
| |
| // decref removes a reference from fd. |
| // It also closes fd when the state of fd is set to closed and there |
| // is no remaining reference. |
| func (fd *FD) decref() error { |
| if fd.fdmu.decref() { |
| return fd.destroy() |
| } |
| return nil |
| } |
| |
| // readLock adds a reference to fd and locks fd for reading. |
| // It returns an error when fd cannot be used for reading. |
| func (fd *FD) readLock() error { |
| if !fd.fdmu.rwlock(true) { |
| return errClosing(fd.isFile) |
| } |
| return nil |
| } |
| |
| // readUnlock removes a reference from fd and unlocks fd for reading. |
| // It also closes fd when the state of fd is set to closed and there |
| // is no remaining reference. |
| func (fd *FD) readUnlock() { |
| if fd.fdmu.rwunlock(true) { |
| fd.destroy() |
| } |
| } |
| |
| // writeLock adds a reference to fd and locks fd for writing. |
| // It returns an error when fd cannot be used for writing. |
| func (fd *FD) writeLock() error { |
| if !fd.fdmu.rwlock(false) { |
| return errClosing(fd.isFile) |
| } |
| return nil |
| } |
| |
| // writeUnlock removes a reference from fd and unlocks fd for writing. |
| // It also closes fd when the state of fd is set to closed and there |
| // is no remaining reference. |
| func (fd *FD) writeUnlock() { |
| if fd.fdmu.rwunlock(false) { |
| fd.destroy() |
| } |
| } |