| // Copyright 2009 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Semaphore implementation exposed to Go. |
| // Intended use is provide a sleep and wakeup |
| // primitive that can be used in the contended case |
| // of other synchronization primitives. |
| // Thus it targets the same goal as Linux's futex, |
| // but it has much simpler semantics. |
| // |
| // That is, don't think of these as semaphores. |
| // Think of them as a way to implement sleep and wakeup |
| // such that every sleep is paired with a single wakeup, |
| // even if, due to races, the wakeup happens before the sleep. |
| // |
| // See Mullender and Cox, ``Semaphores in Plan 9,'' |
| // https://swtch.com/semaphore.pdf |
| |
| package runtime |
| |
| import ( |
| "internal/cpu" |
| "runtime/internal/atomic" |
| "unsafe" |
| ) |
| |
| // Asynchronous semaphore for sync.Mutex. |
| |
| // A semaRoot holds a balanced tree of sudog with distinct addresses (s.elem). |
| // Each of those sudog may in turn point (through s.waitlink) to a list |
| // of other sudogs waiting on the same address. |
| // The operations on the inner lists of sudogs with the same address |
| // are all O(1). The scanning of the top-level semaRoot list is O(log n), |
| // where n is the number of distinct addresses with goroutines blocked |
| // on them that hash to the given semaRoot. |
| // See golang.org/issue/17953 for a program that worked badly |
| // before we introduced the second level of list, and |
| // BenchmarkSemTable/OneAddrCollision/* for a benchmark that exercises this. |
| type semaRoot struct { |
| lock mutex |
| treap *sudog // root of balanced tree of unique waiters. |
| nwait uint32 // Number of waiters. Read w/o the lock. |
| } |
| |
| var semtable semTable |
| |
| // Prime to not correlate with any user patterns. |
| const semTabSize = 251 |
| |
| type semTable [semTabSize]struct { |
| root semaRoot |
| pad [cpu.CacheLinePadSize - unsafe.Sizeof(semaRoot{})]byte |
| } |
| |
| func (t *semTable) rootFor(addr *uint32) *semaRoot { |
| return &t[(uintptr(unsafe.Pointer(addr))>>3)%semTabSize].root |
| } |
| |
| //go:linkname sync_runtime_Semacquire sync.runtime_Semacquire |
| func sync_runtime_Semacquire(addr *uint32) { |
| semacquire1(addr, false, semaBlockProfile, 0) |
| } |
| |
| //go:linkname poll_runtime_Semacquire internal/poll.runtime_Semacquire |
| func poll_runtime_Semacquire(addr *uint32) { |
| semacquire1(addr, false, semaBlockProfile, 0) |
| } |
| |
| //go:linkname sync_runtime_Semrelease sync.runtime_Semrelease |
| func sync_runtime_Semrelease(addr *uint32, handoff bool, skipframes int) { |
| semrelease1(addr, handoff, skipframes) |
| } |
| |
| //go:linkname sync_runtime_SemacquireMutex sync.runtime_SemacquireMutex |
| func sync_runtime_SemacquireMutex(addr *uint32, lifo bool, skipframes int) { |
| semacquire1(addr, lifo, semaBlockProfile|semaMutexProfile, skipframes) |
| } |
| |
| //go:linkname poll_runtime_Semrelease internal/poll.runtime_Semrelease |
| func poll_runtime_Semrelease(addr *uint32) { |
| semrelease(addr) |
| } |
| |
| func readyWithTime(s *sudog, traceskip int) { |
| if s.releasetime != 0 { |
| s.releasetime = cputicks() |
| } |
| goready(s.g, traceskip) |
| } |
| |
| type semaProfileFlags int |
| |
| const ( |
| semaBlockProfile semaProfileFlags = 1 << iota |
| semaMutexProfile |
| ) |
| |
| // Called from runtime. |
| func semacquire(addr *uint32) { |
| semacquire1(addr, false, 0, 0) |
| } |
| |
| func semacquire1(addr *uint32, lifo bool, profile semaProfileFlags, skipframes int) { |
| gp := getg() |
| if gp != gp.m.curg { |
| throw("semacquire not on the G stack") |
| } |
| |
| // Easy case. |
| if cansemacquire(addr) { |
| return |
| } |
| |
| // Harder case: |
| // increment waiter count |
| // try cansemacquire one more time, return if succeeded |
| // enqueue itself as a waiter |
| // sleep |
| // (waiter descriptor is dequeued by signaler) |
| s := acquireSudog() |
| root := semtable.rootFor(addr) |
| t0 := int64(0) |
| s.releasetime = 0 |
| s.acquiretime = 0 |
| s.ticket = 0 |
| if profile&semaBlockProfile != 0 && blockprofilerate > 0 { |
| t0 = cputicks() |
| s.releasetime = -1 |
| } |
| if profile&semaMutexProfile != 0 && mutexprofilerate > 0 { |
| if t0 == 0 { |
| t0 = cputicks() |
| } |
| s.acquiretime = t0 |
| } |
| for { |
| lockWithRank(&root.lock, lockRankRoot) |
| // Add ourselves to nwait to disable "easy case" in semrelease. |
| atomic.Xadd(&root.nwait, 1) |
| // Check cansemacquire to avoid missed wakeup. |
| if cansemacquire(addr) { |
| atomic.Xadd(&root.nwait, -1) |
| unlock(&root.lock) |
| break |
| } |
| // Any semrelease after the cansemacquire knows we're waiting |
| // (we set nwait above), so go to sleep. |
| root.queue(addr, s, lifo) |
| goparkunlock(&root.lock, waitReasonSemacquire, traceEvGoBlockSync, 4+skipframes) |
| if s.ticket != 0 || cansemacquire(addr) { |
| break |
| } |
| } |
| if s.releasetime > 0 { |
| blockevent(s.releasetime-t0, 3+skipframes) |
| } |
| releaseSudog(s) |
| } |
| |
| func semrelease(addr *uint32) { |
| semrelease1(addr, false, 0) |
| } |
| |
| func semrelease1(addr *uint32, handoff bool, skipframes int) { |
| root := semtable.rootFor(addr) |
| atomic.Xadd(addr, 1) |
| |
| // Easy case: no waiters? |
| // This check must happen after the xadd, to avoid a missed wakeup |
| // (see loop in semacquire). |
| if atomic.Load(&root.nwait) == 0 { |
| return |
| } |
| |
| // Harder case: search for a waiter and wake it. |
| lockWithRank(&root.lock, lockRankRoot) |
| if atomic.Load(&root.nwait) == 0 { |
| // The count is already consumed by another goroutine, |
| // so no need to wake up another goroutine. |
| unlock(&root.lock) |
| return |
| } |
| s, t0 := root.dequeue(addr) |
| if s != nil { |
| atomic.Xadd(&root.nwait, -1) |
| } |
| unlock(&root.lock) |
| if s != nil { // May be slow or even yield, so unlock first |
| acquiretime := s.acquiretime |
| if acquiretime != 0 { |
| mutexevent(t0-acquiretime, 3+skipframes) |
| } |
| if s.ticket != 0 { |
| throw("corrupted semaphore ticket") |
| } |
| if handoff && cansemacquire(addr) { |
| s.ticket = 1 |
| } |
| readyWithTime(s, 5+skipframes) |
| if s.ticket == 1 && getg().m.locks == 0 { |
| // Direct G handoff |
| // readyWithTime has added the waiter G as runnext in the |
| // current P; we now call the scheduler so that we start running |
| // the waiter G immediately. |
| // Note that waiter inherits our time slice: this is desirable |
| // to avoid having a highly contended semaphore hog the P |
| // indefinitely. goyield is like Gosched, but it emits a |
| // "preempted" trace event instead and, more importantly, puts |
| // the current G on the local runq instead of the global one. |
| // We only do this in the starving regime (handoff=true), as in |
| // the non-starving case it is possible for a different waiter |
| // to acquire the semaphore while we are yielding/scheduling, |
| // and this would be wasteful. We wait instead to enter starving |
| // regime, and then we start to do direct handoffs of ticket and |
| // P. |
| // See issue 33747 for discussion. |
| goyield() |
| } |
| } |
| } |
| |
| func cansemacquire(addr *uint32) bool { |
| for { |
| v := atomic.Load(addr) |
| if v == 0 { |
| return false |
| } |
| if atomic.Cas(addr, v, v-1) { |
| return true |
| } |
| } |
| } |
| |
| // queue adds s to the blocked goroutines in semaRoot. |
| func (root *semaRoot) queue(addr *uint32, s *sudog, lifo bool) { |
| s.g = getg() |
| s.elem = unsafe.Pointer(addr) |
| s.next = nil |
| s.prev = nil |
| |
| var last *sudog |
| pt := &root.treap |
| for t := *pt; t != nil; t = *pt { |
| if t.elem == unsafe.Pointer(addr) { |
| // Already have addr in list. |
| if lifo { |
| // Substitute s in t's place in treap. |
| *pt = s |
| s.ticket = t.ticket |
| s.acquiretime = t.acquiretime |
| s.parent = t.parent |
| s.prev = t.prev |
| s.next = t.next |
| if s.prev != nil { |
| s.prev.parent = s |
| } |
| if s.next != nil { |
| s.next.parent = s |
| } |
| // Add t first in s's wait list. |
| s.waitlink = t |
| s.waittail = t.waittail |
| if s.waittail == nil { |
| s.waittail = t |
| } |
| t.parent = nil |
| t.prev = nil |
| t.next = nil |
| t.waittail = nil |
| } else { |
| // Add s to end of t's wait list. |
| if t.waittail == nil { |
| t.waitlink = s |
| } else { |
| t.waittail.waitlink = s |
| } |
| t.waittail = s |
| s.waitlink = nil |
| } |
| return |
| } |
| last = t |
| if uintptr(unsafe.Pointer(addr)) < uintptr(t.elem) { |
| pt = &t.prev |
| } else { |
| pt = &t.next |
| } |
| } |
| |
| // Add s as new leaf in tree of unique addrs. |
| // The balanced tree is a treap using ticket as the random heap priority. |
| // That is, it is a binary tree ordered according to the elem addresses, |
| // but then among the space of possible binary trees respecting those |
| // addresses, it is kept balanced on average by maintaining a heap ordering |
| // on the ticket: s.ticket <= both s.prev.ticket and s.next.ticket. |
| // https://en.wikipedia.org/wiki/Treap |
| // https://faculty.washington.edu/aragon/pubs/rst89.pdf |
| // |
| // s.ticket compared with zero in couple of places, therefore set lowest bit. |
| // It will not affect treap's quality noticeably. |
| s.ticket = fastrand() | 1 |
| s.parent = last |
| *pt = s |
| |
| // Rotate up into tree according to ticket (priority). |
| for s.parent != nil && s.parent.ticket > s.ticket { |
| if s.parent.prev == s { |
| root.rotateRight(s.parent) |
| } else { |
| if s.parent.next != s { |
| panic("semaRoot queue") |
| } |
| root.rotateLeft(s.parent) |
| } |
| } |
| } |
| |
| // dequeue searches for and finds the first goroutine |
| // in semaRoot blocked on addr. |
| // If the sudog was being profiled, dequeue returns the time |
| // at which it was woken up as now. Otherwise now is 0. |
| func (root *semaRoot) dequeue(addr *uint32) (found *sudog, now int64) { |
| ps := &root.treap |
| s := *ps |
| for ; s != nil; s = *ps { |
| if s.elem == unsafe.Pointer(addr) { |
| goto Found |
| } |
| if uintptr(unsafe.Pointer(addr)) < uintptr(s.elem) { |
| ps = &s.prev |
| } else { |
| ps = &s.next |
| } |
| } |
| return nil, 0 |
| |
| Found: |
| now = int64(0) |
| if s.acquiretime != 0 { |
| now = cputicks() |
| } |
| if t := s.waitlink; t != nil { |
| // Substitute t, also waiting on addr, for s in root tree of unique addrs. |
| *ps = t |
| t.ticket = s.ticket |
| t.parent = s.parent |
| t.prev = s.prev |
| if t.prev != nil { |
| t.prev.parent = t |
| } |
| t.next = s.next |
| if t.next != nil { |
| t.next.parent = t |
| } |
| if t.waitlink != nil { |
| t.waittail = s.waittail |
| } else { |
| t.waittail = nil |
| } |
| t.acquiretime = now |
| s.waitlink = nil |
| s.waittail = nil |
| } else { |
| // Rotate s down to be leaf of tree for removal, respecting priorities. |
| for s.next != nil || s.prev != nil { |
| if s.next == nil || s.prev != nil && s.prev.ticket < s.next.ticket { |
| root.rotateRight(s) |
| } else { |
| root.rotateLeft(s) |
| } |
| } |
| // Remove s, now a leaf. |
| if s.parent != nil { |
| if s.parent.prev == s { |
| s.parent.prev = nil |
| } else { |
| s.parent.next = nil |
| } |
| } else { |
| root.treap = nil |
| } |
| } |
| s.parent = nil |
| s.elem = nil |
| s.next = nil |
| s.prev = nil |
| s.ticket = 0 |
| return s, now |
| } |
| |
| // rotateLeft rotates the tree rooted at node x. |
| // turning (x a (y b c)) into (y (x a b) c). |
| func (root *semaRoot) rotateLeft(x *sudog) { |
| // p -> (x a (y b c)) |
| p := x.parent |
| y := x.next |
| b := y.prev |
| |
| y.prev = x |
| x.parent = y |
| x.next = b |
| if b != nil { |
| b.parent = x |
| } |
| |
| y.parent = p |
| if p == nil { |
| root.treap = y |
| } else if p.prev == x { |
| p.prev = y |
| } else { |
| if p.next != x { |
| throw("semaRoot rotateLeft") |
| } |
| p.next = y |
| } |
| } |
| |
| // rotateRight rotates the tree rooted at node y. |
| // turning (y (x a b) c) into (x a (y b c)). |
| func (root *semaRoot) rotateRight(y *sudog) { |
| // p -> (y (x a b) c) |
| p := y.parent |
| x := y.prev |
| b := x.next |
| |
| x.next = y |
| y.parent = x |
| y.prev = b |
| if b != nil { |
| b.parent = y |
| } |
| |
| x.parent = p |
| if p == nil { |
| root.treap = x |
| } else if p.prev == y { |
| p.prev = x |
| } else { |
| if p.next != y { |
| throw("semaRoot rotateRight") |
| } |
| p.next = x |
| } |
| } |
| |
| // notifyList is a ticket-based notification list used to implement sync.Cond. |
| // |
| // It must be kept in sync with the sync package. |
| type notifyList struct { |
| // wait is the ticket number of the next waiter. It is atomically |
| // incremented outside the lock. |
| wait uint32 |
| |
| // notify is the ticket number of the next waiter to be notified. It can |
| // be read outside the lock, but is only written to with lock held. |
| // |
| // Both wait & notify can wrap around, and such cases will be correctly |
| // handled as long as their "unwrapped" difference is bounded by 2^31. |
| // For this not to be the case, we'd need to have 2^31+ goroutines |
| // blocked on the same condvar, which is currently not possible. |
| notify uint32 |
| |
| // List of parked waiters. |
| lock mutex |
| head *sudog |
| tail *sudog |
| } |
| |
| // less checks if a < b, considering a & b running counts that may overflow the |
| // 32-bit range, and that their "unwrapped" difference is always less than 2^31. |
| func less(a, b uint32) bool { |
| return int32(a-b) < 0 |
| } |
| |
| // notifyListAdd adds the caller to a notify list such that it can receive |
| // notifications. The caller must eventually call notifyListWait to wait for |
| // such a notification, passing the returned ticket number. |
| // |
| //go:linkname notifyListAdd sync.runtime_notifyListAdd |
| func notifyListAdd(l *notifyList) uint32 { |
| // This may be called concurrently, for example, when called from |
| // sync.Cond.Wait while holding a RWMutex in read mode. |
| return atomic.Xadd(&l.wait, 1) - 1 |
| } |
| |
| // notifyListWait waits for a notification. If one has been sent since |
| // notifyListAdd was called, it returns immediately. Otherwise, it blocks. |
| // |
| //go:linkname notifyListWait sync.runtime_notifyListWait |
| func notifyListWait(l *notifyList, t uint32) { |
| lockWithRank(&l.lock, lockRankNotifyList) |
| |
| // Return right away if this ticket has already been notified. |
| if less(t, l.notify) { |
| unlock(&l.lock) |
| return |
| } |
| |
| // Enqueue itself. |
| s := acquireSudog() |
| s.g = getg() |
| s.ticket = t |
| s.releasetime = 0 |
| t0 := int64(0) |
| if blockprofilerate > 0 { |
| t0 = cputicks() |
| s.releasetime = -1 |
| } |
| if l.tail == nil { |
| l.head = s |
| } else { |
| l.tail.next = s |
| } |
| l.tail = s |
| goparkunlock(&l.lock, waitReasonSyncCondWait, traceEvGoBlockCond, 3) |
| if t0 != 0 { |
| blockevent(s.releasetime-t0, 2) |
| } |
| releaseSudog(s) |
| } |
| |
| // notifyListNotifyAll notifies all entries in the list. |
| // |
| //go:linkname notifyListNotifyAll sync.runtime_notifyListNotifyAll |
| func notifyListNotifyAll(l *notifyList) { |
| // Fast-path: if there are no new waiters since the last notification |
| // we don't need to acquire the lock. |
| if atomic.Load(&l.wait) == atomic.Load(&l.notify) { |
| return |
| } |
| |
| // Pull the list out into a local variable, waiters will be readied |
| // outside the lock. |
| lockWithRank(&l.lock, lockRankNotifyList) |
| s := l.head |
| l.head = nil |
| l.tail = nil |
| |
| // Update the next ticket to be notified. We can set it to the current |
| // value of wait because any previous waiters are already in the list |
| // or will notice that they have already been notified when trying to |
| // add themselves to the list. |
| atomic.Store(&l.notify, atomic.Load(&l.wait)) |
| unlock(&l.lock) |
| |
| // Go through the local list and ready all waiters. |
| for s != nil { |
| next := s.next |
| s.next = nil |
| readyWithTime(s, 4) |
| s = next |
| } |
| } |
| |
| // notifyListNotifyOne notifies one entry in the list. |
| // |
| //go:linkname notifyListNotifyOne sync.runtime_notifyListNotifyOne |
| func notifyListNotifyOne(l *notifyList) { |
| // Fast-path: if there are no new waiters since the last notification |
| // we don't need to acquire the lock at all. |
| if atomic.Load(&l.wait) == atomic.Load(&l.notify) { |
| return |
| } |
| |
| lockWithRank(&l.lock, lockRankNotifyList) |
| |
| // Re-check under the lock if we need to do anything. |
| t := l.notify |
| if t == atomic.Load(&l.wait) { |
| unlock(&l.lock) |
| return |
| } |
| |
| // Update the next notify ticket number. |
| atomic.Store(&l.notify, t+1) |
| |
| // Try to find the g that needs to be notified. |
| // If it hasn't made it to the list yet we won't find it, |
| // but it won't park itself once it sees the new notify number. |
| // |
| // This scan looks linear but essentially always stops quickly. |
| // Because g's queue separately from taking numbers, |
| // there may be minor reorderings in the list, but we |
| // expect the g we're looking for to be near the front. |
| // The g has others in front of it on the list only to the |
| // extent that it lost the race, so the iteration will not |
| // be too long. This applies even when the g is missing: |
| // it hasn't yet gotten to sleep and has lost the race to |
| // the (few) other g's that we find on the list. |
| for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next { |
| if s.ticket == t { |
| n := s.next |
| if p != nil { |
| p.next = n |
| } else { |
| l.head = n |
| } |
| if n == nil { |
| l.tail = p |
| } |
| unlock(&l.lock) |
| s.next = nil |
| readyWithTime(s, 4) |
| return |
| } |
| } |
| unlock(&l.lock) |
| } |
| |
| //go:linkname notifyListCheck sync.runtime_notifyListCheck |
| func notifyListCheck(sz uintptr) { |
| if sz != unsafe.Sizeof(notifyList{}) { |
| print("runtime: bad notifyList size - sync=", sz, " runtime=", unsafe.Sizeof(notifyList{}), "\n") |
| throw("bad notifyList size") |
| } |
| } |
| |
| //go:linkname sync_nanotime sync.runtime_nanotime |
| func sync_nanotime() int64 { |
| return nanotime() |
| } |