| // Copyright 2014 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package runtime |
| |
| // This file contains the implementation of Go channels. |
| |
| // Invariants: |
| // At least one of c.sendq and c.recvq is empty, |
| // except for the case of an unbuffered channel with a single goroutine |
| // blocked on it for both sending and receiving using a select statement, |
| // in which case the length of c.sendq and c.recvq is limited only by the |
| // size of the select statement. |
| // |
| // For buffered channels, also: |
| // c.qcount > 0 implies that c.recvq is empty. |
| // c.qcount < c.dataqsiz implies that c.sendq is empty. |
| |
| import ( |
| "internal/abi" |
| "internal/runtime/atomic" |
| "internal/runtime/math" |
| "internal/runtime/sys" |
| "unsafe" |
| ) |
| |
| const ( |
| maxAlign = 8 |
| hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) |
| debugChan = false |
| ) |
| |
| type hchan struct { |
| qcount uint // total data in the queue |
| dataqsiz uint // size of the circular queue |
| buf unsafe.Pointer // points to an array of dataqsiz elements |
| elemsize uint16 |
| closed uint32 |
| timer *timer // timer feeding this chan |
| elemtype *_type // element type |
| sendx uint // send index |
| recvx uint // receive index |
| recvq waitq // list of recv waiters |
| sendq waitq // list of send waiters |
| |
| // lock protects all fields in hchan, as well as several |
| // fields in sudogs blocked on this channel. |
| // |
| // Do not change another G's status while holding this lock |
| // (in particular, do not ready a G), as this can deadlock |
| // with stack shrinking. |
| lock mutex |
| } |
| |
| type waitq struct { |
| first *sudog |
| last *sudog |
| } |
| |
| //go:linkname reflect_makechan reflect.makechan |
| func reflect_makechan(t *chantype, size int) *hchan { |
| return makechan(t, size) |
| } |
| |
| func makechan64(t *chantype, size int64) *hchan { |
| if int64(int(size)) != size { |
| panic(plainError("makechan: size out of range")) |
| } |
| |
| return makechan(t, int(size)) |
| } |
| |
| func makechan(t *chantype, size int) *hchan { |
| elem := t.Elem |
| |
| // compiler checks this but be safe. |
| if elem.Size_ >= 1<<16 { |
| throw("makechan: invalid channel element type") |
| } |
| if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign { |
| throw("makechan: bad alignment") |
| } |
| |
| mem, overflow := math.MulUintptr(elem.Size_, uintptr(size)) |
| if overflow || mem > maxAlloc-hchanSize || size < 0 { |
| panic(plainError("makechan: size out of range")) |
| } |
| |
| // Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers. |
| // buf points into the same allocation, elemtype is persistent. |
| // SudoG's are referenced from their owning thread so they can't be collected. |
| // TODO(dvyukov,rlh): Rethink when collector can move allocated objects. |
| var c *hchan |
| switch { |
| case mem == 0: |
| // Queue or element size is zero. |
| c = (*hchan)(mallocgc(hchanSize, nil, true)) |
| // Race detector uses this location for synchronization. |
| c.buf = c.raceaddr() |
| case !elem.Pointers(): |
| // Elements do not contain pointers. |
| // Allocate hchan and buf in one call. |
| c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) |
| c.buf = add(unsafe.Pointer(c), hchanSize) |
| default: |
| // Elements contain pointers. |
| c = new(hchan) |
| c.buf = mallocgc(mem, elem, true) |
| } |
| |
| c.elemsize = uint16(elem.Size_) |
| c.elemtype = elem |
| c.dataqsiz = uint(size) |
| lockInit(&c.lock, lockRankHchan) |
| |
| if debugChan { |
| print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n") |
| } |
| return c |
| } |
| |
| // chanbuf(c, i) is pointer to the i'th slot in the buffer. |
| // |
| // chanbuf should be an internal detail, |
| // but widely used packages access it using linkname. |
| // Notable members of the hall of shame include: |
| // - github.com/fjl/memsize |
| // |
| // Do not remove or change the type signature. |
| // See go.dev/issue/67401. |
| // |
| //go:linkname chanbuf |
| func chanbuf(c *hchan, i uint) unsafe.Pointer { |
| return add(c.buf, uintptr(i)*uintptr(c.elemsize)) |
| } |
| |
| // full reports whether a send on c would block (that is, the channel is full). |
| // It uses a single word-sized read of mutable state, so although |
| // the answer is instantaneously true, the correct answer may have changed |
| // by the time the calling function receives the return value. |
| func full(c *hchan) bool { |
| // c.dataqsiz is immutable (never written after the channel is created) |
| // so it is safe to read at any time during channel operation. |
| if c.dataqsiz == 0 { |
| // Assumes that a pointer read is relaxed-atomic. |
| return c.recvq.first == nil |
| } |
| // Assumes that a uint read is relaxed-atomic. |
| return c.qcount == c.dataqsiz |
| } |
| |
| // entry point for c <- x from compiled code. |
| // |
| //go:nosplit |
| func chansend1(c *hchan, elem unsafe.Pointer) { |
| chansend(c, elem, true, sys.GetCallerPC()) |
| } |
| |
| /* |
| * generic single channel send/recv |
| * If block is not nil, |
| * then the protocol will not |
| * sleep but return if it could |
| * not complete. |
| * |
| * sleep can wake up with g.param == nil |
| * when a channel involved in the sleep has |
| * been closed. it is easiest to loop and re-run |
| * the operation; we'll see that it's now closed. |
| */ |
| func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { |
| if c == nil { |
| if !block { |
| return false |
| } |
| gopark(nil, nil, waitReasonChanSendNilChan, traceBlockForever, 2) |
| throw("unreachable") |
| } |
| |
| if debugChan { |
| print("chansend: chan=", c, "\n") |
| } |
| |
| if raceenabled { |
| racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend)) |
| } |
| |
| // Fast path: check for failed non-blocking operation without acquiring the lock. |
| // |
| // After observing that the channel is not closed, we observe that the channel is |
| // not ready for sending. Each of these observations is a single word-sized read |
| // (first c.closed and second full()). |
| // Because a closed channel cannot transition from 'ready for sending' to |
| // 'not ready for sending', even if the channel is closed between the two observations, |
| // they imply a moment between the two when the channel was both not yet closed |
| // and not ready for sending. We behave as if we observed the channel at that moment, |
| // and report that the send cannot proceed. |
| // |
| // It is okay if the reads are reordered here: if we observe that the channel is not |
| // ready for sending and then observe that it is not closed, that implies that the |
| // channel wasn't closed during the first observation. However, nothing here |
| // guarantees forward progress. We rely on the side effects of lock release in |
| // chanrecv() and closechan() to update this thread's view of c.closed and full(). |
| if !block && c.closed == 0 && full(c) { |
| return false |
| } |
| |
| var t0 int64 |
| if blockprofilerate > 0 { |
| t0 = cputicks() |
| } |
| |
| lock(&c.lock) |
| |
| if c.closed != 0 { |
| unlock(&c.lock) |
| panic(plainError("send on closed channel")) |
| } |
| |
| if sg := c.recvq.dequeue(); sg != nil { |
| // Found a waiting receiver. We pass the value we want to send |
| // directly to the receiver, bypassing the channel buffer (if any). |
| send(c, sg, ep, func() { unlock(&c.lock) }, 3) |
| return true |
| } |
| |
| if c.qcount < c.dataqsiz { |
| // Space is available in the channel buffer. Enqueue the element to send. |
| qp := chanbuf(c, c.sendx) |
| if raceenabled { |
| racenotify(c, c.sendx, nil) |
| } |
| typedmemmove(c.elemtype, qp, ep) |
| c.sendx++ |
| if c.sendx == c.dataqsiz { |
| c.sendx = 0 |
| } |
| c.qcount++ |
| unlock(&c.lock) |
| return true |
| } |
| |
| if !block { |
| unlock(&c.lock) |
| return false |
| } |
| |
| // Block on the channel. Some receiver will complete our operation for us. |
| gp := getg() |
| mysg := acquireSudog() |
| mysg.releasetime = 0 |
| if t0 != 0 { |
| mysg.releasetime = -1 |
| } |
| // No stack splits between assigning elem and enqueuing mysg |
| // on gp.waiting where copystack can find it. |
| mysg.elem = ep |
| mysg.waitlink = nil |
| mysg.g = gp |
| mysg.isSelect = false |
| mysg.c = c |
| gp.waiting = mysg |
| gp.param = nil |
| c.sendq.enqueue(mysg) |
| // Signal to anyone trying to shrink our stack that we're about |
| // to park on a channel. The window between when this G's status |
| // changes and when we set gp.activeStackChans is not safe for |
| // stack shrinking. |
| gp.parkingOnChan.Store(true) |
| gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2) |
| // Ensure the value being sent is kept alive until the |
| // receiver copies it out. The sudog has a pointer to the |
| // stack object, but sudogs aren't considered as roots of the |
| // stack tracer. |
| KeepAlive(ep) |
| |
| // someone woke us up. |
| if mysg != gp.waiting { |
| throw("G waiting list is corrupted") |
| } |
| gp.waiting = nil |
| gp.activeStackChans = false |
| closed := !mysg.success |
| gp.param = nil |
| if mysg.releasetime > 0 { |
| blockevent(mysg.releasetime-t0, 2) |
| } |
| mysg.c = nil |
| releaseSudog(mysg) |
| if closed { |
| if c.closed == 0 { |
| throw("chansend: spurious wakeup") |
| } |
| panic(plainError("send on closed channel")) |
| } |
| return true |
| } |
| |
| // send processes a send operation on an empty channel c. |
| // The value ep sent by the sender is copied to the receiver sg. |
| // The receiver is then woken up to go on its merry way. |
| // Channel c must be empty and locked. send unlocks c with unlockf. |
| // sg must already be dequeued from c. |
| // ep must be non-nil and point to the heap or the caller's stack. |
| func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { |
| if raceenabled { |
| if c.dataqsiz == 0 { |
| racesync(c, sg) |
| } else { |
| // Pretend we go through the buffer, even though |
| // we copy directly. Note that we need to increment |
| // the head/tail locations only when raceenabled. |
| racenotify(c, c.recvx, nil) |
| racenotify(c, c.recvx, sg) |
| c.recvx++ |
| if c.recvx == c.dataqsiz { |
| c.recvx = 0 |
| } |
| c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz |
| } |
| } |
| if sg.elem != nil { |
| sendDirect(c.elemtype, sg, ep) |
| sg.elem = nil |
| } |
| gp := sg.g |
| unlockf() |
| gp.param = unsafe.Pointer(sg) |
| sg.success = true |
| if sg.releasetime != 0 { |
| sg.releasetime = cputicks() |
| } |
| goready(gp, skip+1) |
| } |
| |
| // timerchandrain removes all elements in channel c's buffer. |
| // It reports whether any elements were removed. |
| // Because it is only intended for timers, it does not |
| // handle waiting senders at all (all timer channels |
| // use non-blocking sends to fill the buffer). |
| func timerchandrain(c *hchan) bool { |
| // Note: Cannot use empty(c) because we are called |
| // while holding c.timer.sendLock, and empty(c) will |
| // call c.timer.maybeRunChan, which will deadlock. |
| // We are emptying the channel, so we only care about |
| // the count, not about potentially filling it up. |
| if atomic.Loaduint(&c.qcount) == 0 { |
| return false |
| } |
| lock(&c.lock) |
| any := false |
| for c.qcount > 0 { |
| any = true |
| typedmemclr(c.elemtype, chanbuf(c, c.recvx)) |
| c.recvx++ |
| if c.recvx == c.dataqsiz { |
| c.recvx = 0 |
| } |
| c.qcount-- |
| } |
| unlock(&c.lock) |
| return any |
| } |
| |
| // Sends and receives on unbuffered or empty-buffered channels are the |
| // only operations where one running goroutine writes to the stack of |
| // another running goroutine. The GC assumes that stack writes only |
| // happen when the goroutine is running and are only done by that |
| // goroutine. Using a write barrier is sufficient to make up for |
| // violating that assumption, but the write barrier has to work. |
| // typedmemmove will call bulkBarrierPreWrite, but the target bytes |
| // are not in the heap, so that will not help. We arrange to call |
| // memmove and typeBitsBulkBarrier instead. |
| |
| func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { |
| // src is on our stack, dst is a slot on another stack. |
| |
| // Once we read sg.elem out of sg, it will no longer |
| // be updated if the destination's stack gets copied (shrunk). |
| // So make sure that no preemption points can happen between read & use. |
| dst := sg.elem |
| typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) |
| // No need for cgo write barrier checks because dst is always |
| // Go memory. |
| memmove(dst, src, t.Size_) |
| } |
| |
| func recvDirect(t *_type, sg *sudog, dst unsafe.Pointer) { |
| // dst is on our stack or the heap, src is on another stack. |
| // The channel is locked, so src will not move during this |
| // operation. |
| src := sg.elem |
| typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.Size_) |
| memmove(dst, src, t.Size_) |
| } |
| |
| func closechan(c *hchan) { |
| if c == nil { |
| panic(plainError("close of nil channel")) |
| } |
| |
| lock(&c.lock) |
| if c.closed != 0 { |
| unlock(&c.lock) |
| panic(plainError("close of closed channel")) |
| } |
| |
| if raceenabled { |
| callerpc := sys.GetCallerPC() |
| racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan)) |
| racerelease(c.raceaddr()) |
| } |
| |
| c.closed = 1 |
| |
| var glist gList |
| |
| // release all readers |
| for { |
| sg := c.recvq.dequeue() |
| if sg == nil { |
| break |
| } |
| if sg.elem != nil { |
| typedmemclr(c.elemtype, sg.elem) |
| sg.elem = nil |
| } |
| if sg.releasetime != 0 { |
| sg.releasetime = cputicks() |
| } |
| gp := sg.g |
| gp.param = unsafe.Pointer(sg) |
| sg.success = false |
| if raceenabled { |
| raceacquireg(gp, c.raceaddr()) |
| } |
| glist.push(gp) |
| } |
| |
| // release all writers (they will panic) |
| for { |
| sg := c.sendq.dequeue() |
| if sg == nil { |
| break |
| } |
| sg.elem = nil |
| if sg.releasetime != 0 { |
| sg.releasetime = cputicks() |
| } |
| gp := sg.g |
| gp.param = unsafe.Pointer(sg) |
| sg.success = false |
| if raceenabled { |
| raceacquireg(gp, c.raceaddr()) |
| } |
| glist.push(gp) |
| } |
| unlock(&c.lock) |
| |
| // Ready all Gs now that we've dropped the channel lock. |
| for !glist.empty() { |
| gp := glist.pop() |
| gp.schedlink = 0 |
| goready(gp, 3) |
| } |
| } |
| |
| // empty reports whether a read from c would block (that is, the channel is |
| // empty). It is atomically correct and sequentially consistent at the moment |
| // it returns, but since the channel is unlocked, the channel may become |
| // non-empty immediately afterward. |
| func empty(c *hchan) bool { |
| // c.dataqsiz is immutable. |
| if c.dataqsiz == 0 { |
| return atomic.Loadp(unsafe.Pointer(&c.sendq.first)) == nil |
| } |
| // c.timer is also immutable (it is set after make(chan) but before any channel operations). |
| // All timer channels have dataqsiz > 0. |
| if c.timer != nil { |
| c.timer.maybeRunChan() |
| } |
| return atomic.Loaduint(&c.qcount) == 0 |
| } |
| |
| // entry points for <- c from compiled code. |
| // |
| //go:nosplit |
| func chanrecv1(c *hchan, elem unsafe.Pointer) { |
| chanrecv(c, elem, true) |
| } |
| |
| //go:nosplit |
| func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) { |
| _, received = chanrecv(c, elem, true) |
| return |
| } |
| |
| // chanrecv receives on channel c and writes the received data to ep. |
| // ep may be nil, in which case received data is ignored. |
| // If block == false and no elements are available, returns (false, false). |
| // Otherwise, if c is closed, zeros *ep and returns (true, false). |
| // Otherwise, fills in *ep with an element and returns (true, true). |
| // A non-nil ep must point to the heap or the caller's stack. |
| func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { |
| // raceenabled: don't need to check ep, as it is always on the stack |
| // or is new memory allocated by reflect. |
| |
| if debugChan { |
| print("chanrecv: chan=", c, "\n") |
| } |
| |
| if c == nil { |
| if !block { |
| return |
| } |
| gopark(nil, nil, waitReasonChanReceiveNilChan, traceBlockForever, 2) |
| throw("unreachable") |
| } |
| |
| if c.timer != nil { |
| c.timer.maybeRunChan() |
| } |
| |
| // Fast path: check for failed non-blocking operation without acquiring the lock. |
| if !block && empty(c) { |
| // After observing that the channel is not ready for receiving, we observe whether the |
| // channel is closed. |
| // |
| // Reordering of these checks could lead to incorrect behavior when racing with a close. |
| // For example, if the channel was open and not empty, was closed, and then drained, |
| // reordered reads could incorrectly indicate "open and empty". To prevent reordering, |
| // we use atomic loads for both checks, and rely on emptying and closing to happen in |
| // separate critical sections under the same lock. This assumption fails when closing |
| // an unbuffered channel with a blocked send, but that is an error condition anyway. |
| if atomic.Load(&c.closed) == 0 { |
| // Because a channel cannot be reopened, the later observation of the channel |
| // being not closed implies that it was also not closed at the moment of the |
| // first observation. We behave as if we observed the channel at that moment |
| // and report that the receive cannot proceed. |
| return |
| } |
| // The channel is irreversibly closed. Re-check whether the channel has any pending data |
| // to receive, which could have arrived between the empty and closed checks above. |
| // Sequential consistency is also required here, when racing with such a send. |
| if empty(c) { |
| // The channel is irreversibly closed and empty. |
| if raceenabled { |
| raceacquire(c.raceaddr()) |
| } |
| if ep != nil { |
| typedmemclr(c.elemtype, ep) |
| } |
| return true, false |
| } |
| } |
| |
| var t0 int64 |
| if blockprofilerate > 0 { |
| t0 = cputicks() |
| } |
| |
| lock(&c.lock) |
| |
| if c.closed != 0 { |
| if c.qcount == 0 { |
| if raceenabled { |
| raceacquire(c.raceaddr()) |
| } |
| unlock(&c.lock) |
| if ep != nil { |
| typedmemclr(c.elemtype, ep) |
| } |
| return true, false |
| } |
| // The channel has been closed, but the channel's buffer have data. |
| } else { |
| // Just found waiting sender with not closed. |
| if sg := c.sendq.dequeue(); sg != nil { |
| // Found a waiting sender. If buffer is size 0, receive value |
| // directly from sender. Otherwise, receive from head of queue |
| // and add sender's value to the tail of the queue (both map to |
| // the same buffer slot because the queue is full). |
| recv(c, sg, ep, func() { unlock(&c.lock) }, 3) |
| return true, true |
| } |
| } |
| |
| if c.qcount > 0 { |
| // Receive directly from queue |
| qp := chanbuf(c, c.recvx) |
| if raceenabled { |
| racenotify(c, c.recvx, nil) |
| } |
| if ep != nil { |
| typedmemmove(c.elemtype, ep, qp) |
| } |
| typedmemclr(c.elemtype, qp) |
| c.recvx++ |
| if c.recvx == c.dataqsiz { |
| c.recvx = 0 |
| } |
| c.qcount-- |
| unlock(&c.lock) |
| return true, true |
| } |
| |
| if !block { |
| unlock(&c.lock) |
| return false, false |
| } |
| |
| // no sender available: block on this channel. |
| gp := getg() |
| mysg := acquireSudog() |
| mysg.releasetime = 0 |
| if t0 != 0 { |
| mysg.releasetime = -1 |
| } |
| // No stack splits between assigning elem and enqueuing mysg |
| // on gp.waiting where copystack can find it. |
| mysg.elem = ep |
| mysg.waitlink = nil |
| gp.waiting = mysg |
| |
| mysg.g = gp |
| mysg.isSelect = false |
| mysg.c = c |
| gp.param = nil |
| c.recvq.enqueue(mysg) |
| if c.timer != nil { |
| blockTimerChan(c) |
| } |
| |
| // Signal to anyone trying to shrink our stack that we're about |
| // to park on a channel. The window between when this G's status |
| // changes and when we set gp.activeStackChans is not safe for |
| // stack shrinking. |
| gp.parkingOnChan.Store(true) |
| gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2) |
| |
| // someone woke us up |
| if mysg != gp.waiting { |
| throw("G waiting list is corrupted") |
| } |
| if c.timer != nil { |
| unblockTimerChan(c) |
| } |
| gp.waiting = nil |
| gp.activeStackChans = false |
| if mysg.releasetime > 0 { |
| blockevent(mysg.releasetime-t0, 2) |
| } |
| success := mysg.success |
| gp.param = nil |
| mysg.c = nil |
| releaseSudog(mysg) |
| return true, success |
| } |
| |
| // recv processes a receive operation on a full channel c. |
| // There are 2 parts: |
| // 1. The value sent by the sender sg is put into the channel |
| // and the sender is woken up to go on its merry way. |
| // 2. The value received by the receiver (the current G) is |
| // written to ep. |
| // |
| // For synchronous channels, both values are the same. |
| // For asynchronous channels, the receiver gets its data from |
| // the channel buffer and the sender's data is put in the |
| // channel buffer. |
| // Channel c must be full and locked. recv unlocks c with unlockf. |
| // sg must already be dequeued from c. |
| // A non-nil ep must point to the heap or the caller's stack. |
| func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { |
| if c.dataqsiz == 0 { |
| if raceenabled { |
| racesync(c, sg) |
| } |
| if ep != nil { |
| // copy data from sender |
| recvDirect(c.elemtype, sg, ep) |
| } |
| } else { |
| // Queue is full. Take the item at the |
| // head of the queue. Make the sender enqueue |
| // its item at the tail of the queue. Since the |
| // queue is full, those are both the same slot. |
| qp := chanbuf(c, c.recvx) |
| if raceenabled { |
| racenotify(c, c.recvx, nil) |
| racenotify(c, c.recvx, sg) |
| } |
| // copy data from queue to receiver |
| if ep != nil { |
| typedmemmove(c.elemtype, ep, qp) |
| } |
| // copy data from sender to queue |
| typedmemmove(c.elemtype, qp, sg.elem) |
| c.recvx++ |
| if c.recvx == c.dataqsiz { |
| c.recvx = 0 |
| } |
| c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz |
| } |
| sg.elem = nil |
| gp := sg.g |
| unlockf() |
| gp.param = unsafe.Pointer(sg) |
| sg.success = true |
| if sg.releasetime != 0 { |
| sg.releasetime = cputicks() |
| } |
| goready(gp, skip+1) |
| } |
| |
| func chanparkcommit(gp *g, chanLock unsafe.Pointer) bool { |
| // There are unlocked sudogs that point into gp's stack. Stack |
| // copying must lock the channels of those sudogs. |
| // Set activeStackChans here instead of before we try parking |
| // because we could self-deadlock in stack growth on the |
| // channel lock. |
| gp.activeStackChans = true |
| // Mark that it's safe for stack shrinking to occur now, |
| // because any thread acquiring this G's stack for shrinking |
| // is guaranteed to observe activeStackChans after this store. |
| gp.parkingOnChan.Store(false) |
| // Make sure we unlock after setting activeStackChans and |
| // unsetting parkingOnChan. The moment we unlock chanLock |
| // we risk gp getting readied by a channel operation and |
| // so gp could continue running before everything before |
| // the unlock is visible (even to gp itself). |
| unlock((*mutex)(chanLock)) |
| return true |
| } |
| |
| // compiler implements |
| // |
| // select { |
| // case c <- v: |
| // ... foo |
| // default: |
| // ... bar |
| // } |
| // |
| // as |
| // |
| // if selectnbsend(c, v) { |
| // ... foo |
| // } else { |
| // ... bar |
| // } |
| func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { |
| return chansend(c, elem, false, sys.GetCallerPC()) |
| } |
| |
| // compiler implements |
| // |
| // select { |
| // case v, ok = <-c: |
| // ... foo |
| // default: |
| // ... bar |
| // } |
| // |
| // as |
| // |
| // if selected, ok = selectnbrecv(&v, c); selected { |
| // ... foo |
| // } else { |
| // ... bar |
| // } |
| func selectnbrecv(elem unsafe.Pointer, c *hchan) (selected, received bool) { |
| return chanrecv(c, elem, false) |
| } |
| |
| //go:linkname reflect_chansend reflect.chansend0 |
| func reflect_chansend(c *hchan, elem unsafe.Pointer, nb bool) (selected bool) { |
| return chansend(c, elem, !nb, sys.GetCallerPC()) |
| } |
| |
| //go:linkname reflect_chanrecv reflect.chanrecv |
| func reflect_chanrecv(c *hchan, nb bool, elem unsafe.Pointer) (selected bool, received bool) { |
| return chanrecv(c, elem, !nb) |
| } |
| |
| func chanlen(c *hchan) int { |
| if c == nil { |
| return 0 |
| } |
| async := debug.asynctimerchan.Load() != 0 |
| if c.timer != nil && async { |
| c.timer.maybeRunChan() |
| } |
| if c.timer != nil && !async { |
| // timer channels have a buffered implementation |
| // but present to users as unbuffered, so that we can |
| // undo sends without users noticing. |
| return 0 |
| } |
| return int(c.qcount) |
| } |
| |
| func chancap(c *hchan) int { |
| if c == nil { |
| return 0 |
| } |
| if c.timer != nil { |
| async := debug.asynctimerchan.Load() != 0 |
| if async { |
| return int(c.dataqsiz) |
| } |
| // timer channels have a buffered implementation |
| // but present to users as unbuffered, so that we can |
| // undo sends without users noticing. |
| return 0 |
| } |
| return int(c.dataqsiz) |
| } |
| |
| //go:linkname reflect_chanlen reflect.chanlen |
| func reflect_chanlen(c *hchan) int { |
| return chanlen(c) |
| } |
| |
| //go:linkname reflectlite_chanlen internal/reflectlite.chanlen |
| func reflectlite_chanlen(c *hchan) int { |
| return chanlen(c) |
| } |
| |
| //go:linkname reflect_chancap reflect.chancap |
| func reflect_chancap(c *hchan) int { |
| return chancap(c) |
| } |
| |
| //go:linkname reflect_chanclose reflect.chanclose |
| func reflect_chanclose(c *hchan) { |
| closechan(c) |
| } |
| |
| func (q *waitq) enqueue(sgp *sudog) { |
| sgp.next = nil |
| x := q.last |
| if x == nil { |
| sgp.prev = nil |
| q.first = sgp |
| q.last = sgp |
| return |
| } |
| sgp.prev = x |
| x.next = sgp |
| q.last = sgp |
| } |
| |
| func (q *waitq) dequeue() *sudog { |
| for { |
| sgp := q.first |
| if sgp == nil { |
| return nil |
| } |
| y := sgp.next |
| if y == nil { |
| q.first = nil |
| q.last = nil |
| } else { |
| y.prev = nil |
| q.first = y |
| sgp.next = nil // mark as removed (see dequeueSudoG) |
| } |
| |
| // if a goroutine was put on this queue because of a |
| // select, there is a small window between the goroutine |
| // being woken up by a different case and it grabbing the |
| // channel locks. Once it has the lock |
| // it removes itself from the queue, so we won't see it after that. |
| // We use a flag in the G struct to tell us when someone |
| // else has won the race to signal this goroutine but the goroutine |
| // hasn't removed itself from the queue yet. |
| if sgp.isSelect && !sgp.g.selectDone.CompareAndSwap(0, 1) { |
| continue |
| } |
| |
| return sgp |
| } |
| } |
| |
| func (c *hchan) raceaddr() unsafe.Pointer { |
| // Treat read-like and write-like operations on the channel to |
| // happen at this address. Avoid using the address of qcount |
| // or dataqsiz, because the len() and cap() builtins read |
| // those addresses, and we don't want them racing with |
| // operations like close(). |
| return unsafe.Pointer(&c.buf) |
| } |
| |
| func racesync(c *hchan, sg *sudog) { |
| racerelease(chanbuf(c, 0)) |
| raceacquireg(sg.g, chanbuf(c, 0)) |
| racereleaseg(sg.g, chanbuf(c, 0)) |
| raceacquire(chanbuf(c, 0)) |
| } |
| |
| // Notify the race detector of a send or receive involving buffer entry idx |
| // and a channel c or its communicating partner sg. |
| // This function handles the special case of c.elemsize==0. |
| func racenotify(c *hchan, idx uint, sg *sudog) { |
| // We could have passed the unsafe.Pointer corresponding to entry idx |
| // instead of idx itself. However, in a future version of this function, |
| // we can use idx to better handle the case of elemsize==0. |
| // A future improvement to the detector is to call TSan with c and idx: |
| // this way, Go will continue to not allocating buffer entries for channels |
| // of elemsize==0, yet the race detector can be made to handle multiple |
| // sync objects underneath the hood (one sync object per idx) |
| qp := chanbuf(c, idx) |
| // When elemsize==0, we don't allocate a full buffer for the channel. |
| // Instead of individual buffer entries, the race detector uses the |
| // c.buf as the only buffer entry. This simplification prevents us from |
| // following the memory model's happens-before rules (rules that are |
| // implemented in racereleaseacquire). Instead, we accumulate happens-before |
| // information in the synchronization object associated with c.buf. |
| if c.elemsize == 0 { |
| if sg == nil { |
| raceacquire(qp) |
| racerelease(qp) |
| } else { |
| raceacquireg(sg.g, qp) |
| racereleaseg(sg.g, qp) |
| } |
| } else { |
| if sg == nil { |
| racereleaseacquire(qp) |
| } else { |
| racereleaseacquireg(sg.g, qp) |
| } |
| } |
| } |