| // Copyright 2011 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package sync |
| |
| import ( |
| "internal/race" |
| "internal/synctest" |
| "sync/atomic" |
| "unsafe" |
| ) |
| |
| // A WaitGroup is a counting semaphore typically used to wait |
| // for a group of goroutines or tasks to finish. |
| // |
| // Typically, a main goroutine will start tasks, each in a new |
| // goroutine, by calling [WaitGroup.Go] and then wait for all tasks to |
| // complete by calling [WaitGroup.Wait]. For example: |
| // |
| // var wg sync.WaitGroup |
| // wg.Go(task1) |
| // wg.Go(task2) |
| // wg.Wait() |
| // |
| // A WaitGroup may also be used for tracking tasks without using Go to |
| // start new goroutines by using [WaitGroup.Add] and [WaitGroup.Done]. |
| // |
| // The previous example can be rewritten using explicitly created |
| // goroutines along with Add and Done: |
| // |
| // var wg sync.WaitGroup |
| // wg.Add(1) |
| // go func() { |
| // defer wg.Done() |
| // task1() |
| // }() |
| // wg.Add(1) |
| // go func() { |
| // defer wg.Done() |
| // task2() |
| // }() |
| // wg.Wait() |
| // |
| // This pattern is common in code that predates [WaitGroup.Go]. |
| // |
| // A WaitGroup must not be copied after first use. |
| type WaitGroup struct { |
| noCopy noCopy |
| |
| // Bits (high to low): |
| // bits[0:32] counter |
| // bits[32] flag: synctest bubble membership |
| // bits[33:64] wait count |
| state atomic.Uint64 |
| sema uint32 |
| } |
| |
| // waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble. |
| const waitGroupBubbleFlag = 0x8000_0000 |
| |
| // Add adds delta, which may be negative, to the [WaitGroup] task counter. |
| // If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released. |
| // If the counter goes negative, Add panics. |
| // |
| // Callers should prefer [WaitGroup.Go]. |
| // |
| // Note that calls with a positive delta that occur when the counter is zero |
| // must happen before a Wait. Calls with a negative delta, or calls with a |
| // positive delta that start when the counter is greater than zero, may happen |
| // at any time. |
| // Typically this means the calls to Add should execute before the statement |
| // creating the goroutine or other event to be waited for. |
| // If a WaitGroup is reused to wait for several independent sets of events, |
| // new Add calls must happen after all previous Wait calls have returned. |
| // See the WaitGroup example. |
| func (wg *WaitGroup) Add(delta int) { |
| if race.Enabled { |
| if delta < 0 { |
| // Synchronize decrements with Wait. |
| race.ReleaseMerge(unsafe.Pointer(wg)) |
| } |
| race.Disable() |
| defer race.Enable() |
| } |
| bubbled := false |
| if synctest.IsInBubble() { |
| // If Add is called from within a bubble, then all Add calls must be made |
| // from the same bubble. |
| switch synctest.Associate(wg) { |
| case synctest.Unbubbled: |
| case synctest.OtherBubble: |
| // wg is already associated with a different bubble. |
| fatal("sync: WaitGroup.Add called from multiple synctest bubbles") |
| case synctest.CurrentBubble: |
| bubbled = true |
| state := wg.state.Or(waitGroupBubbleFlag) |
| if state != 0 && state&waitGroupBubbleFlag == 0 { |
| // Add has been called from outside this bubble. |
| fatal("sync: WaitGroup.Add called from inside and outside synctest bubble") |
| } |
| } |
| } |
| state := wg.state.Add(uint64(delta) << 32) |
| if state&waitGroupBubbleFlag != 0 && !bubbled { |
| // Add has been called from within a synctest bubble (and we aren't in one). |
| fatal("sync: WaitGroup.Add called from inside and outside synctest bubble") |
| } |
| v := int32(state >> 32) |
| w := uint32(state & 0x7fffffff) |
| if race.Enabled && delta > 0 && v == int32(delta) { |
| // The first increment must be synchronized with Wait. |
| // Need to model this as a read, because there can be |
| // several concurrent wg.counter transitions from 0. |
| race.Read(unsafe.Pointer(&wg.sema)) |
| } |
| if v < 0 { |
| panic("sync: negative WaitGroup counter") |
| } |
| if w != 0 && delta > 0 && v == int32(delta) { |
| panic("sync: WaitGroup misuse: Add called concurrently with Wait") |
| } |
| if v > 0 || w == 0 { |
| return |
| } |
| // This goroutine has set counter to 0 when waiters > 0. |
| // Now there can't be concurrent mutations of state: |
| // - Adds must not happen concurrently with Wait, |
| // - Wait does not increment waiters if it sees counter == 0. |
| // Still do a cheap sanity check to detect WaitGroup misuse. |
| if wg.state.Load() != state { |
| panic("sync: WaitGroup misuse: Add called concurrently with Wait") |
| } |
| // Reset waiters count to 0. |
| wg.state.Store(0) |
| if bubbled { |
| // Adds must not happen concurrently with wait when counter is 0, |
| // so we can safely disassociate wg from its current bubble. |
| synctest.Disassociate(wg) |
| } |
| for ; w != 0; w-- { |
| runtime_Semrelease(&wg.sema, false, 0) |
| } |
| } |
| |
| // Done decrements the [WaitGroup] task counter by one. |
| // It is equivalent to Add(-1). |
| // |
| // Callers should prefer [WaitGroup.Go]. |
| // |
| // In the terminology of [the Go memory model], a call to Done |
| // "synchronizes before" the return of any Wait call that it unblocks. |
| // |
| // [the Go memory model]: https://go.dev/ref/mem |
| func (wg *WaitGroup) Done() { |
| wg.Add(-1) |
| } |
| |
| // Wait blocks until the [WaitGroup] task counter is zero. |
| func (wg *WaitGroup) Wait() { |
| if race.Enabled { |
| race.Disable() |
| } |
| for { |
| state := wg.state.Load() |
| v := int32(state >> 32) |
| w := uint32(state & 0x7fffffff) |
| if v == 0 { |
| // Counter is 0, no need to wait. |
| if race.Enabled { |
| race.Enable() |
| race.Acquire(unsafe.Pointer(wg)) |
| } |
| if w == 0 && state&waitGroupBubbleFlag != 0 && synctest.IsAssociated(wg) { |
| // Adds must not happen concurrently with wait when counter is 0, |
| // so we can disassociate wg from its current bubble. |
| if wg.state.CompareAndSwap(state, 0) { |
| synctest.Disassociate(wg) |
| } |
| } |
| return |
| } |
| // Increment waiters count. |
| if wg.state.CompareAndSwap(state, state+1) { |
| if race.Enabled && w == 0 { |
| // Wait must be synchronized with the first Add. |
| // Need to model this is as a write to race with the read in Add. |
| // As a consequence, can do the write only for the first waiter, |
| // otherwise concurrent Waits will race with each other. |
| race.Write(unsafe.Pointer(&wg.sema)) |
| } |
| synctestDurable := false |
| if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() { |
| if race.Enabled { |
| race.Enable() |
| } |
| if synctest.IsAssociated(wg) { |
| // Add was called within the current bubble, |
| // so this Wait is durably blocking. |
| synctestDurable = true |
| } |
| if race.Enabled { |
| race.Disable() |
| } |
| } |
| runtime_SemacquireWaitGroup(&wg.sema, synctestDurable) |
| if wg.state.Load() != 0 { |
| panic("sync: WaitGroup is reused before previous Wait has returned") |
| } |
| if race.Enabled { |
| race.Enable() |
| race.Acquire(unsafe.Pointer(wg)) |
| } |
| return |
| } |
| } |
| } |
| |
| // Go calls f in a new goroutine and adds that task to the [WaitGroup]. |
| // When f returns, the task is removed from the WaitGroup. |
| // |
| // The function f must not panic. |
| // |
| // If the WaitGroup is empty, Go must happen before a [WaitGroup.Wait]. |
| // Typically, this simply means Go is called to start tasks before Wait is called. |
| // If the WaitGroup is not empty, Go may happen at any time. |
| // This means a goroutine started by Go may itself call Go. |
| // If a WaitGroup is reused to wait for several independent sets of tasks, |
| // new Go calls must happen after all previous Wait calls have returned. |
| // |
| // In the terminology of [the Go memory model], the return from f |
| // "synchronizes before" the return of any Wait call that it unblocks. |
| // |
| // [the Go memory model]: https://go.dev/ref/mem |
| func (wg *WaitGroup) Go(f func()) { |
| wg.Add(1) |
| go func() { |
| defer wg.Done() |
| f() |
| }() |
| } |