sync: add fast paths to WaitGroup
benchmark old ns/op new ns/op delta
BenchmarkWaitGroupUncontended 93.50 33.60 -64.06%
BenchmarkWaitGroupUncontended-2 44.30 16.90 -61.85%
BenchmarkWaitGroupUncontended-4 21.80 8.47 -61.15%
BenchmarkWaitGroupUncontended-8 12.10 4.86 -59.83%
BenchmarkWaitGroupUncontended-16 7.38 3.35 -54.61%
BenchmarkWaitGroupAddDone 58.40 33.70 -42.29%
BenchmarkWaitGroupAddDone-2 293.00 85.80 -70.72%
BenchmarkWaitGroupAddDone-4 243.00 51.10 -78.97%
BenchmarkWaitGroupAddDone-8 236.00 52.20 -77.88%
BenchmarkWaitGroupAddDone-16 215.00 43.30 -79.86%
BenchmarkWaitGroupAddDoneWork 826.00 794.00 -3.87%
BenchmarkWaitGroupAddDoneWork-2 450.00 424.00 -5.78%
BenchmarkWaitGroupAddDoneWork-4 277.00 220.00 -20.58%
BenchmarkWaitGroupAddDoneWork-8 440.00 116.00 -73.64%
BenchmarkWaitGroupAddDoneWork-16 569.00 66.50 -88.31%
BenchmarkWaitGroupWait 29.00 8.04 -72.28%
BenchmarkWaitGroupWait-2 74.10 4.15 -94.40%
BenchmarkWaitGroupWait-4 117.00 2.30 -98.03%
BenchmarkWaitGroupWait-8 111.00 1.31 -98.82%
BenchmarkWaitGroupWait-16 104.00 1.27 -98.78%
BenchmarkWaitGroupWaitWork 802.00 792.00 -1.25%
BenchmarkWaitGroupWaitWork-2 411.00 401.00 -2.43%
BenchmarkWaitGroupWaitWork-4 210.00 199.00 -5.24%
BenchmarkWaitGroupWaitWork-8 206.00 105.00 -49.03%
BenchmarkWaitGroupWaitWork-16 334.00 54.40 -83.71%
R=rsc
CC=golang-dev
https://golang.org/cl/4672050
diff --git a/src/pkg/sync/atomic/asm_386.s b/src/pkg/sync/atomic/asm_386.s
index a9360ef..914d2fe 100644
--- a/src/pkg/sync/atomic/asm_386.s
+++ b/src/pkg/sync/atomic/asm_386.s
@@ -85,3 +85,12 @@
MOVL BX, retlo+12(FP)
MOVL CX, rethi+16(FP)
RET
+
+TEXT ·LoadInt32(SB),7,$0
+ JMP ·LoadUint32(SB)
+
+TEXT ·LoadUint32(SB),7,$0
+ MOVL addrptr+0(FP), AX
+ MOVL 0(AX), AX
+ MOVL AX, ret+4(FP)
+ RET
diff --git a/src/pkg/sync/atomic/asm_amd64.s b/src/pkg/sync/atomic/asm_amd64.s
index a260902..4282950 100644
--- a/src/pkg/sync/atomic/asm_amd64.s
+++ b/src/pkg/sync/atomic/asm_amd64.s
@@ -57,3 +57,13 @@
ADDQ AX, CX
MOVQ CX, ret+16(FP)
RET
+
+TEXT ·LoadInt32(SB),7,$0
+ JMP ·LoadUint32(SB)
+
+TEXT ·LoadUint32(SB),7,$0
+ MOVQ addrptr+0(FP), AX
+ MOVL 0(AX), AX
+ MOVL AX, ret+8(FP)
+ RET
+
diff --git a/src/pkg/sync/atomic/asm_linux_arm.s b/src/pkg/sync/atomic/asm_linux_arm.s
index 72f8d74..a09e067 100644
--- a/src/pkg/sync/atomic/asm_linux_arm.s
+++ b/src/pkg/sync/atomic/asm_linux_arm.s
@@ -83,3 +83,16 @@
TEXT ·AddUint64(SB),7,$0
B ·armAddUint64(SB)
+
+TEXT ·LoadInt32(SB),7,$0
+ B ·LoadUint32(SB)
+
+TEXT ·LoadUint32(SB),7,$0
+ MOVW addrptr+0(FP), R2
+loadloop1:
+ MOVW 0(R2), R0
+ MOVW R0, R1
+ BL cas<>(SB)
+ BCC loadloop1
+ MOVW R0, val+4(FP)
+ RET
diff --git a/src/pkg/sync/atomic/atomic_test.go b/src/pkg/sync/atomic/atomic_test.go
index 119ad00..2229e58 100644
--- a/src/pkg/sync/atomic/atomic_test.go
+++ b/src/pkg/sync/atomic/atomic_test.go
@@ -308,6 +308,46 @@
}
}
+func TestLoadInt32(t *testing.T) {
+ var x struct {
+ before int32
+ i int32
+ after int32
+ }
+ x.before = magic32
+ x.after = magic32
+ for delta := int32(1); delta+delta > delta; delta += delta {
+ k := LoadInt32(&x.i)
+ if k != x.i {
+ t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
+ }
+ x.i += delta
+ }
+ if x.before != magic32 || x.after != magic32 {
+ t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
+ }
+}
+
+func TestLoadUint32(t *testing.T) {
+ var x struct {
+ before uint32
+ i uint32
+ after uint32
+ }
+ x.before = magic32
+ x.after = magic32
+ for delta := uint32(1); delta+delta > delta; delta += delta {
+ k := LoadUint32(&x.i)
+ if k != x.i {
+ t.Fatalf("delta=%d i=%d k=%d", delta, x.i, k)
+ }
+ x.i += delta
+ }
+ if x.before != magic32 || x.after != magic32 {
+ t.Fatalf("wrong magic: %#x _ %#x != %#x _ %#x", x.before, x.after, magic32, magic32)
+ }
+}
+
// Tests of correct behavior, with contention.
// (Is the function atomic?)
//
@@ -537,3 +577,65 @@
}
}
}
+
+func hammerLoadInt32(t *testing.T, uval *uint32) {
+ val := (*int32)(unsafe.Pointer(uval))
+ for {
+ v := LoadInt32(val)
+ vlo := v & ((1 << 16) - 1)
+ vhi := v >> 16
+ if vlo != vhi {
+ t.Fatalf("LoadInt32: %#x != %#x", vlo, vhi)
+ }
+ new := v + 1 + 1<<16
+ if vlo == 1e4 {
+ new = 0
+ }
+ if CompareAndSwapInt32(val, v, new) {
+ break
+ }
+ }
+}
+
+func hammerLoadUint32(t *testing.T, val *uint32) {
+ for {
+ v := LoadUint32(val)
+ vlo := v & ((1 << 16) - 1)
+ vhi := v >> 16
+ if vlo != vhi {
+ t.Fatalf("LoadUint32: %#x != %#x", vlo, vhi)
+ }
+ new := v + 1 + 1<<16
+ if vlo == 1e4 {
+ new = 0
+ }
+ if CompareAndSwapUint32(val, v, new) {
+ break
+ }
+ }
+}
+
+func TestHammerLoad(t *testing.T) {
+ tests := [...]func(*testing.T, *uint32){hammerLoadInt32, hammerLoadUint32}
+ n := 100000
+ if testing.Short() {
+ n = 10000
+ }
+ const procs = 8
+ defer runtime.GOMAXPROCS(runtime.GOMAXPROCS(procs))
+ for _, tt := range tests {
+ c := make(chan int)
+ var val uint32
+ for p := 0; p < procs; p++ {
+ go func() {
+ for i := 0; i < n; i++ {
+ tt(t, &val)
+ }
+ c <- 1
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+ }
+}
diff --git a/src/pkg/sync/atomic/doc.go b/src/pkg/sync/atomic/doc.go
index ec5a0d3..b35eb53 100644
--- a/src/pkg/sync/atomic/doc.go
+++ b/src/pkg/sync/atomic/doc.go
@@ -56,6 +56,12 @@
// AddUintptr atomically adds delta to *val and returns the new value.
func AddUintptr(val *uintptr, delta uintptr) (new uintptr)
+// LoadInt32 atomically loads *addr.
+func LoadInt32(addr *int32) (val int32)
+
+// LoadUint32 atomically loads *addr.
+func LoadUint32(addr *uint32) (val uint32)
+
// Helper for ARM. Linker will discard on other systems
func panic64() {
panic("sync/atomic: broken 64-bit atomic operations (buggy QEMU)")
diff --git a/src/pkg/sync/waitgroup.go b/src/pkg/sync/waitgroup.go
index 05478c6..a4c9b7e 100644
--- a/src/pkg/sync/waitgroup.go
+++ b/src/pkg/sync/waitgroup.go
@@ -4,7 +4,10 @@
package sync
-import "runtime"
+import (
+ "runtime"
+ "sync/atomic"
+)
// A WaitGroup waits for a collection of goroutines to finish.
// The main goroutine calls Add to set the number of
@@ -28,8 +31,8 @@
//
type WaitGroup struct {
m Mutex
- counter int
- waiters int
+ counter int32
+ waiters int32
sema *uint32
}
@@ -48,19 +51,19 @@
// Add adds delta, which may be negative, to the WaitGroup counter.
// If the counter becomes zero, all goroutines blocked on Wait() are released.
func (wg *WaitGroup) Add(delta int) {
- wg.m.Lock()
- if delta < -wg.counter {
- wg.m.Unlock()
+ v := atomic.AddInt32(&wg.counter, int32(delta))
+ if v < 0 {
panic("sync: negative WaitGroup count")
}
- wg.counter += delta
- if wg.counter == 0 && wg.waiters > 0 {
- for i := 0; i < wg.waiters; i++ {
- runtime.Semrelease(wg.sema)
- }
- wg.waiters = 0
- wg.sema = nil
+ if v > 0 || atomic.LoadInt32(&wg.waiters) == 0 {
+ return
}
+ wg.m.Lock()
+ for i := int32(0); i < wg.waiters; i++ {
+ runtime.Semrelease(wg.sema)
+ }
+ wg.waiters = 0
+ wg.sema = nil
wg.m.Unlock()
}
@@ -71,12 +74,20 @@
// Wait blocks until the WaitGroup counter is zero.
func (wg *WaitGroup) Wait() {
+ if atomic.LoadInt32(&wg.counter) == 0 {
+ return
+ }
wg.m.Lock()
- if wg.counter == 0 {
+ atomic.AddInt32(&wg.waiters, 1)
+ // This code is racing with the unlocked path in Add above.
+ // The code above modifies counter and then reads waiters.
+ // We must modify waiters and then read counter (the opposite order)
+ // to avoid missing an Add.
+ if atomic.LoadInt32(&wg.counter) == 0 {
+ atomic.AddInt32(&wg.waiters, -1)
wg.m.Unlock()
return
}
- wg.waiters++
if wg.sema == nil {
wg.sema = new(uint32)
}
diff --git a/src/pkg/sync/waitgroup_test.go b/src/pkg/sync/waitgroup_test.go
index fe35732..34430fc 100644
--- a/src/pkg/sync/waitgroup_test.go
+++ b/src/pkg/sync/waitgroup_test.go
@@ -5,7 +5,9 @@
package sync_test
import (
+ "runtime"
. "sync"
+ "sync/atomic"
"testing"
)
@@ -58,3 +60,106 @@
wg.Done()
t.Fatal("Should panic")
}
+
+func BenchmarkWaitGroupUncontended(b *testing.B) {
+ type PaddedWaitGroup struct {
+ WaitGroup
+ pad [128]uint8
+ }
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ for p := 0; p < procs; p++ {
+ go func() {
+ var wg PaddedWaitGroup
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ wg.Add(1)
+ wg.Done()
+ wg.Wait()
+ }
+ }
+ c <- true
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func benchmarkWaitGroupAddDone(b *testing.B, localWork int) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ var wg WaitGroup
+ for p := 0; p < procs; p++ {
+ go func() {
+ foo := 0
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ wg.Add(1)
+ for i := 0; i < localWork; i++ {
+ foo *= 2
+ foo /= 2
+ }
+ wg.Done()
+ }
+ }
+ c <- foo == 42
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkWaitGroupAddDone(b *testing.B) {
+ benchmarkWaitGroupAddDone(b, 0)
+}
+
+func BenchmarkWaitGroupAddDoneWork(b *testing.B) {
+ benchmarkWaitGroupAddDone(b, 100)
+}
+
+func benchmarkWaitGroupWait(b *testing.B, localWork int) {
+ const CallsPerSched = 1000
+ procs := runtime.GOMAXPROCS(-1)
+ N := int32(b.N / CallsPerSched)
+ c := make(chan bool, procs)
+ var wg WaitGroup
+ wg.Add(procs)
+ for p := 0; p < procs; p++ {
+ go wg.Done()
+ }
+ for p := 0; p < procs; p++ {
+ go func() {
+ foo := 0
+ for atomic.AddInt32(&N, -1) >= 0 {
+ runtime.Gosched()
+ for g := 0; g < CallsPerSched; g++ {
+ wg.Wait()
+ for i := 0; i < localWork; i++ {
+ foo *= 2
+ foo /= 2
+ }
+ }
+ }
+ c <- foo == 42
+ }()
+ }
+ for p := 0; p < procs; p++ {
+ <-c
+ }
+}
+
+func BenchmarkWaitGroupWait(b *testing.B) {
+ benchmarkWaitGroupWait(b, 0)
+}
+
+func BenchmarkWaitGroupWaitWork(b *testing.B) {
+ benchmarkWaitGroupWait(b, 100)
+}