blob: 4c64dca393fe0eec79f4631646c576d64fe98620 [file] [log] [blame] [edit]
// Copyright 2011 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package sync
import (
"sync/atomic"
"unsafe"
)
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
// goroutines to wait for. Then each of the goroutines
// runs and calls Done when finished. At the same time,
// Wait can be used to block until all goroutines have finished.
type WaitGroup struct {
m Mutex
counter int32
waiters int32
sema *uint32
}
// WaitGroup creates a new semaphore each time the old semaphore
// is released. This is to avoid the following race:
//
// G1: Add(1)
// G1: go G2()
// G1: Wait() // Context switch after Unlock() and before Semacquire().
// G2: Done() // Release semaphore: sema == 1, waiters == 0. G1 doesn't run yet.
// G3: Wait() // Finds counter == 0, waiters == 0, doesn't block.
// G3: Add(1) // Makes counter == 1, waiters == 0.
// G3: go G4()
// G3: Wait() // G1 still hasn't run, G3 finds sema == 1, unblocked! Bug.
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait are released.
// If the counter goes negative, Add panics.
//
// Note that calls with positive delta must happen before the call to Wait,
// or else Wait may wait for too small a group. Typically this means the calls
// to Add should execute before the statement creating the goroutine or
// other event to be waited for. See the WaitGroup example.
func (wg *WaitGroup) Add(delta int) {
if raceenabled {
_ = wg.m.state // trigger nil deref early
if delta < 0 {
// Synchronize decrements with Wait.
raceReleaseMerge(unsafe.Pointer(wg))
}
raceDisable()
defer raceEnable()
}
v := atomic.AddInt32(&wg.counter, int32(delta))
if raceenabled {
if delta > 0 && v == int32(delta) {
// The first increment must be synchronized with Wait.
// Need to model this as a read, because there can be
// several concurrent wg.counter transitions from 0.
raceRead(unsafe.Pointer(&wg.sema))
}
}
if v < 0 {
panic("sync: negative WaitGroup counter")
}
if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
return
}
wg.m.Lock()
if atomic.LoadInt32(&wg.counter) == 0 {
for i := int32(0); i < wg.waiters; i++ {
runtime_Semrelease(wg.sema)
}
wg.waiters = 0
wg.sema = nil
}
wg.m.Unlock()
}
// Done decrements the WaitGroup counter.
func (wg *WaitGroup) Done() {
wg.Add(-1)
}
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
if raceenabled {
_ = wg.m.state // trigger nil deref early
raceDisable()
}
if atomic.LoadInt32(&wg.counter) == 0 {
if raceenabled {
raceEnable()
raceAcquire(unsafe.Pointer(wg))
}
return
}
wg.m.Lock()
w := atomic.AddInt32(&wg.waiters, 1)
// This code is racing with the unlocked path in Add above.
// The code above modifies counter and then reads waiters.
// We must modify waiters and then read counter (the opposite order)
// to avoid missing an Add.
if atomic.LoadInt32(&wg.counter) == 0 {
atomic.AddInt32(&wg.waiters, -1)
if raceenabled {
raceEnable()
raceAcquire(unsafe.Pointer(wg))
raceDisable()
}
wg.m.Unlock()
if raceenabled {
raceEnable()
}
return
}
if raceenabled && w == 1 {
// Wait must be synchronized with the first Add.
// Need to model this is as a write to race with the read in Add.
// As a consequence, can do the write only for the first waiter,
// otherwise concurrent Waits will race with each other.
raceWrite(unsafe.Pointer(&wg.sema))
}
if wg.sema == nil {
wg.sema = new(uint32)
}
s := wg.sema
wg.m.Unlock()
runtime_Semacquire(s)
if raceenabled {
raceEnable()
raceAcquire(unsafe.Pointer(wg))
}
}