blob: 5b035aa3967ad35e632d6acd5548b657eff1bdd5 [file] [log] [blame] [edit]
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
"internal/race"
"internal/synctest"
"sync/atomic"
"unsafe"
)
// A WaitGroup is a counting semaphore typically used to wait
// for a group of goroutines or tasks to finish.
//
// Typically, a main goroutine will start tasks, each in a new
// goroutine, by calling [WaitGroup.Go] and then wait for all tasks to
// complete by calling [WaitGroup.Wait]. For example:
//
// var wg sync.WaitGroup
// wg.Go(task1)
// wg.Go(task2)
// wg.Wait()
//
// A WaitGroup may also be used for tracking tasks without using Go to
// start new goroutines by using [WaitGroup.Add] and [WaitGroup.Done].
//
// The previous example can be rewritten using explicitly created
// goroutines along with Add and Done:
//
// var wg sync.WaitGroup
// wg.Add(1)
// go func() {
// defer wg.Done()
// task1()
// }()
// wg.Add(1)
// go func() {
// defer wg.Done()
// task2()
// }()
// wg.Wait()
//
// This pattern is common in code that predates [WaitGroup.Go].
//
// A WaitGroup must not be copied after first use.
type WaitGroup struct {
noCopy noCopy
// Bits (high to low):
// bits[0:32] counter
// bits[32] flag: synctest bubble membership
// bits[33:64] wait count
state atomic.Uint64
sema uint32
}
// waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble.
const waitGroupBubbleFlag = 0x8000_0000
// Add adds delta, which may be negative, to the [WaitGroup] task counter.
// If the counter becomes zero, all goroutines blocked on [WaitGroup.Wait] are released.
// If the counter goes negative, Add panics.
//
// Callers should prefer [WaitGroup.Go].
//
// Note that calls with a positive delta that occur when the counter is zero
// must happen before a Wait. Calls with a negative delta, or calls with a
// positive delta that start when the counter is greater than zero, may happen
// at any time.
// Typically this means the calls to Add should execute before the statement
// creating the goroutine or other event to be waited for.
// If a WaitGroup is reused to wait for several independent sets of events,
// new Add calls must happen after all previous Wait calls have returned.
// See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
if race.Enabled {
if delta < 0 {
// Synchronize decrements with Wait.
race.ReleaseMerge(unsafe.Pointer(wg))
}
race.Disable()
defer race.Enable()
}
bubbled := false
if synctest.IsInBubble() {
// If Add is called from within a bubble, then all Add calls must be made
// from the same bubble.
switch synctest.Associate(wg) {
case synctest.Unbubbled:
case synctest.OtherBubble:
// wg is already associated with a different bubble.
fatal("sync: WaitGroup.Add called from multiple synctest bubbles")
case synctest.CurrentBubble:
bubbled = true
state := wg.state.Or(waitGroupBubbleFlag)
if state != 0 && state&waitGroupBubbleFlag == 0 {
// Add has been called from outside this bubble.
fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
}
}
}
state := wg.state.Add(uint64(delta) << 32)
if state&waitGroupBubbleFlag != 0 && !bubbled {
// Add has been called from within a synctest bubble (and we aren't in one).
fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
}
v := int32(state >> 32)
w := uint32(state & 0x7fffffff)
if race.Enabled && delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
race.Read(unsafe.Pointer(&wg.sema))
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if w != 0 && delta > 0 && v == int32(delta) {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
if v > 0 || w == 0 {
return
}
// This goroutine has set counter to 0 when waiters > 0.
// Now there can't be concurrent mutations of state:
// - Adds must not happen concurrently with Wait,
// - Wait does not increment waiters if it sees counter == 0.
// Still do a cheap sanity check to detect WaitGroup misuse.
if wg.state.Load() != state {
panic("sync: WaitGroup misuse: Add called concurrently with Wait")
}
// Reset waiters count to 0.
wg.state.Store(0)
if bubbled {
// Adds must not happen concurrently with wait when counter is 0,
// so we can safely disassociate wg from its current bubble.
synctest.Disassociate(wg)
}
for ; w != 0; w-- {
runtime_Semrelease(&wg.sema, false, 0)
}
}
// Done decrements the [WaitGroup] task counter by one.
// It is equivalent to Add(-1).
//
// Callers should prefer [WaitGroup.Go].
//
// In the terminology of [the Go memory model], a call to Done
// "synchronizes before" the return of any Wait call that it unblocks.
//
// [the Go memory model]: https://go.dev/ref/mem
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
// Wait blocks until the [WaitGroup] task counter is zero.
func (wg *WaitGroup) Wait() {
if race.Enabled {
race.Disable()
}
for {
state := wg.state.Load()
v := int32(state >> 32)
w := uint32(state & 0x7fffffff)
if v == 0 {
// Counter is 0, no need to wait.
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
if w == 0 && state&waitGroupBubbleFlag != 0 && synctest.IsAssociated(wg) {
// Adds must not happen concurrently with wait when counter is 0,
// so we can disassociate wg from its current bubble.
if wg.state.CompareAndSwap(state, 0) {
synctest.Disassociate(wg)
}
}
return
}
// Increment waiters count.
if wg.state.CompareAndSwap(state, state+1) {
if race.Enabled && w == 0 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
race.Write(unsafe.Pointer(&wg.sema))
}
synctestDurable := false
if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() {
if race.Enabled {
race.Enable()
}
if synctest.IsAssociated(wg) {
// Add was called within the current bubble,
// so this Wait is durably blocking.
synctestDurable = true
}
if race.Enabled {
race.Disable()
}
}
runtime_SemacquireWaitGroup(&wg.sema, synctestDurable)
if wg.state.Load() != 0 {
panic("sync: WaitGroup is reused before previous Wait has returned")
}
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(wg))
}
return
}
}
}
// Go calls f in a new goroutine and adds that task to the [WaitGroup].
// When f returns, the task is removed from the WaitGroup.
//
// The function f must not panic.
//
// If the WaitGroup is empty, Go must happen before a [WaitGroup.Wait].
// Typically, this simply means Go is called to start tasks before Wait is called.
// If the WaitGroup is not empty, Go may happen at any time.
// This means a goroutine started by Go may itself call Go.
// If a WaitGroup is reused to wait for several independent sets of tasks,
// new Go calls must happen after all previous Wait calls have returned.
//
// In the terminology of [the Go memory model], the return from f
// "synchronizes before" the return of any Wait call that it unblocks.
//
// [the Go memory model]: https://go.dev/ref/mem
func (wg *WaitGroup) Go(f func()) {
wg.Add(1)
go func() {
defer wg.Done()
f()
}()
}