| // Copyright 2011 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package sync |
| |
| import ( |
| "internal/race" |
| "sync/atomic" |
| "unsafe" |
| ) |
| |
| // A WaitGroup waits for a collection of goroutines to finish. |
| // The main goroutine calls Add to set the number of |
| // goroutines to wait for. Then each of the goroutines |
| // runs and calls Done when finished. At the same time, |
| // Wait can be used to block until all goroutines have finished. |
| // |
| // A WaitGroup must not be copied after first use. |
| type WaitGroup struct { |
| noCopy noCopy |
| |
| // 64-bit value: high 32 bits are counter, low 32 bits are waiter count. |
| // 64-bit atomic operations require 64-bit alignment, but 32-bit |
| // compilers do not ensure it. So we allocate 12 bytes and then use |
| // the aligned 8 bytes in them as state. |
| state1 [12]byte |
| sema uint32 |
| } |
| |
| func (wg *WaitGroup) state() *uint64 { |
| if uintptr(unsafe.Pointer(&wg.state1))%8 == 0 { |
| return (*uint64)(unsafe.Pointer(&wg.state1)) |
| } else { |
| return (*uint64)(unsafe.Pointer(&wg.state1[4])) |
| } |
| } |
| |
| // Add adds delta, which may be negative, to the WaitGroup counter. |
| // If the counter becomes zero, all goroutines blocked on Wait are released. |
| // If the counter goes negative, Add panics. |
| // |
| // Note that calls with a positive delta that occur when the counter is zero |
| // must happen before a Wait. Calls with a negative delta, or calls with a |
| // positive delta that start when the counter is greater than zero, may happen |
| // at any time. |
| // Typically this means the calls to Add should execute before the statement |
| // creating the goroutine or other event to be waited for. |
| // If a WaitGroup is reused to wait for several independent sets of events, |
| // new Add calls must happen after all previous Wait calls have returned. |
| // See the WaitGroup example. |
| func (wg *WaitGroup) Add(delta int) { |
| statep := wg.state() |
| if race.Enabled { |
| _ = *statep // trigger nil deref early |
| if delta < 0 { |
| // Synchronize decrements with Wait. |
| race.ReleaseMerge(unsafe.Pointer(wg)) |
| } |
| race.Disable() |
| defer race.Enable() |
| } |
| state := atomic.AddUint64(statep, uint64(delta)<<32) |
| v := int32(state >> 32) |
| w := uint32(state) |
| if race.Enabled { |
| if delta > 0 && v == int32(delta) { |
| // The first increment must be synchronized with Wait. |
| // Need to model this as a read, because there can be |
| // several concurrent wg.counter transitions from 0. |
| race.Read(unsafe.Pointer(&wg.sema)) |
| } |
| } |
| if v < 0 { |
| panic("sync: negative WaitGroup counter") |
| } |
| if w != 0 && delta > 0 && v == int32(delta) { |
| panic("sync: WaitGroup misuse: Add called concurrently with Wait") |
| } |
| if v > 0 || w == 0 { |
| return |
| } |
| // This goroutine has set counter to 0 when waiters > 0. |
| // Now there can't be concurrent mutations of state: |
| // - Adds must not happen concurrently with Wait, |
| // - Wait does not increment waiters if it sees counter == 0. |
| // Still do a cheap sanity check to detect WaitGroup misuse. |
| if *statep != state { |
| panic("sync: WaitGroup misuse: Add called concurrently with Wait") |
| } |
| // Reset waiters count to 0. |
| *statep = 0 |
| for ; w != 0; w-- { |
| runtime_Semrelease(&wg.sema, false) |
| } |
| } |
| |
| // Done decrements the WaitGroup counter. |
| func (wg *WaitGroup) Done() { |
| wg.Add(-1) |
| } |
| |
| // Wait blocks until the WaitGroup counter is zero. |
| func (wg *WaitGroup) Wait() { |
| statep := wg.state() |
| if race.Enabled { |
| _ = *statep // trigger nil deref early |
| race.Disable() |
| } |
| for { |
| state := atomic.LoadUint64(statep) |
| v := int32(state >> 32) |
| w := uint32(state) |
| if v == 0 { |
| // Counter is 0, no need to wait. |
| if race.Enabled { |
| race.Enable() |
| race.Acquire(unsafe.Pointer(wg)) |
| } |
| return |
| } |
| // Increment waiters count. |
| if atomic.CompareAndSwapUint64(statep, state, state+1) { |
| if race.Enabled && w == 0 { |
| // Wait must be synchronized with the first Add. |
| // Need to model this is as a write to race with the read in Add. |
| // As a consequence, can do the write only for the first waiter, |
| // otherwise concurrent Waits will race with each other. |
| race.Write(unsafe.Pointer(&wg.sema)) |
| } |
| runtime_Semacquire(&wg.sema) |
| if *statep != 0 { |
| panic("sync: WaitGroup is reused before previous Wait has returned") |
| } |
| if race.Enabled { |
| race.Enable() |
| race.Acquire(unsafe.Pointer(wg)) |
| } |
| return |
| } |
| } |
| } |