sync: scalable Pool
Introduce fixed-size P-local caches.
When local caches overflow/underflow a batch of items
is transferred to/from global mutex-protected cache.
benchmark old ns/op new ns/op delta
BenchmarkPool 50554 22423 -55.65%
BenchmarkPool-4 400359 5904 -98.53%
BenchmarkPool-16 403311 1598 -99.60%
BenchmarkPool-32 367310 1526 -99.58%
BenchmarkPoolOverlflow 5214 3633 -30.32%
BenchmarkPoolOverlflow-4 42663 9539 -77.64%
BenchmarkPoolOverlflow-8 46919 11385 -75.73%
BenchmarkPoolOverlflow-16 39454 13048 -66.93%
BenchmarkSprintfEmpty 84 63 -25.68%
BenchmarkSprintfEmpty-2 371 32 -91.13%
BenchmarkSprintfEmpty-4 465 22 -95.25%
BenchmarkSprintfEmpty-8 565 12 -97.77%
BenchmarkSprintfEmpty-16 498 5 -98.87%
BenchmarkSprintfEmpty-32 492 4 -99.04%
BenchmarkSprintfString 259 229 -11.58%
BenchmarkSprintfString-2 574 144 -74.91%
BenchmarkSprintfString-4 651 77 -88.05%
BenchmarkSprintfString-8 868 47 -94.48%
BenchmarkSprintfString-16 825 33 -95.96%
BenchmarkSprintfString-32 825 30 -96.28%
BenchmarkSprintfInt 213 188 -11.74%
BenchmarkSprintfInt-2 448 138 -69.20%
BenchmarkSprintfInt-4 624 52 -91.63%
BenchmarkSprintfInt-8 691 31 -95.43%
BenchmarkSprintfInt-16 724 18 -97.46%
BenchmarkSprintfInt-32 718 16 -97.70%
BenchmarkSprintfIntInt 311 282 -9.32%
BenchmarkSprintfIntInt-2 333 145 -56.46%
BenchmarkSprintfIntInt-4 642 110 -82.87%
BenchmarkSprintfIntInt-8 832 42 -94.90%
BenchmarkSprintfIntInt-16 817 24 -97.00%
BenchmarkSprintfIntInt-32 805 22 -97.17%
BenchmarkSprintfPrefixedInt 309 269 -12.94%
BenchmarkSprintfPrefixedInt-2 245 168 -31.43%
BenchmarkSprintfPrefixedInt-4 598 99 -83.36%
BenchmarkSprintfPrefixedInt-8 770 67 -91.23%
BenchmarkSprintfPrefixedInt-16 829 54 -93.49%
BenchmarkSprintfPrefixedInt-32 824 50 -93.83%
BenchmarkSprintfFloat 418 398 -4.78%
BenchmarkSprintfFloat-2 295 203 -31.19%
BenchmarkSprintfFloat-4 585 128 -78.12%
BenchmarkSprintfFloat-8 873 60 -93.13%
BenchmarkSprintfFloat-16 884 33 -96.24%
BenchmarkSprintfFloat-32 881 29 -96.62%
BenchmarkManyArgs 1097 1069 -2.55%
BenchmarkManyArgs-2 705 567 -19.57%
BenchmarkManyArgs-4 792 319 -59.72%
BenchmarkManyArgs-8 963 172 -82.14%
BenchmarkManyArgs-16 1115 103 -90.76%
BenchmarkManyArgs-32 1133 90 -92.03%
LGTM=rsc
R=golang-codereviews, bradfitz, minux.ma, gobot, rsc
CC=golang-codereviews
https://golang.org/cl/46010043
diff --git a/src/pkg/go/build/deps_test.go b/src/pkg/go/build/deps_test.go
index dd068d4..ab56b65 100644
--- a/src/pkg/go/build/deps_test.go
+++ b/src/pkg/go/build/deps_test.go
@@ -29,7 +29,7 @@
"errors": {},
"io": {"errors", "sync"},
"runtime": {"unsafe"},
- "sync": {"sync/atomic", "unsafe"},
+ "sync": {"runtime", "sync/atomic", "unsafe"},
"sync/atomic": {"unsafe"},
"unsafe": {},
diff --git a/src/pkg/runtime/mgc0.c b/src/pkg/runtime/mgc0.c
index aa93bfb..8b6eeab 100644
--- a/src/pkg/runtime/mgc0.c
+++ b/src/pkg/runtime/mgc0.c
@@ -68,15 +68,19 @@
{
void **pool, **next;
P *p, **pp;
+ uintptr off;
int32 i;
// clear sync.Pool's
for(pool = pools.head; pool != nil; pool = next) {
next = pool[0];
pool[0] = nil; // next
- pool[1] = nil; // slice
- pool[2] = nil;
- pool[3] = nil;
+ pool[1] = nil; // local
+ pool[2] = nil; // localSize
+ off = (uintptr)pool[3] / sizeof(void*);
+ pool[off+0] = nil; // global slice
+ pool[off+1] = nil;
+ pool[off+2] = nil;
}
pools.head = nil;
diff --git a/src/pkg/runtime/proc.c b/src/pkg/runtime/proc.c
index 24feda4..afe71ef 100644
--- a/src/pkg/runtime/proc.c
+++ b/src/pkg/runtime/proc.c
@@ -3046,3 +3046,23 @@
}
return 0;
}
+
+// func runtime_procPin() int
+void
+sync·runtime_procPin(intgo p)
+{
+ M *mp;
+
+ mp = m;
+ // Disable preemption.
+ mp->locks++;
+ p = mp->p->id;
+ FLUSH(&p);
+}
+
+// func runtime_procUnpin()
+void
+sync·runtime_procUnpin(void)
+{
+ m->locks--;
+}
diff --git a/src/pkg/sync/pool.go b/src/pkg/sync/pool.go
index 9eb07c3..1a38887 100644
--- a/src/pkg/sync/pool.go
+++ b/src/pkg/sync/pool.go
@@ -4,6 +4,18 @@
package sync
+import (
+ "runtime"
+ "sync/atomic"
+ "unsafe"
+)
+
+const (
+ cacheLineSize = 128
+ poolLocalSize = 2 * cacheLineSize
+ poolLocalCap = poolLocalSize/unsafe.Sizeof(*(*interface{})(nil)) - 1
+)
+
// A Pool is a set of temporary objects that may be individually saved
// and retrieved.
//
@@ -26,29 +38,52 @@
//
// This is an experimental type and might not be released.
type Pool struct {
- next *Pool // for use by runtime. must be first.
- list []interface{} // offset known to runtime
- mu Mutex // guards list
+ // The following fields are known to runtime.
+ next *Pool // for use by runtime
+ local *poolLocal // local fixed-size per-P pool, actually an array
+ localSize uintptr // size of the local array
+ globalOffset uintptr // offset of global
+ // The rest is not known to runtime.
// New optionally specifies a function to generate
// a value when Get would otherwise return nil.
// It may not be changed concurrently with calls to Get.
New func() interface{}
+
+ pad [cacheLineSize]byte
+ // Read-mostly date above this point, mutable data follows.
+ mu Mutex
+ global []interface{} // global fallback pool
}
-func runtime_registerPool(*Pool)
+// Local per-P Pool appendix.
+type poolLocal struct {
+ tail int
+ unused int
+ buf [poolLocalCap]interface{}
+}
+
+func init() {
+ var v poolLocal
+ if unsafe.Sizeof(v) != poolLocalSize {
+ panic("sync: incorrect pool size")
+ }
+}
// Put adds x to the pool.
func (p *Pool) Put(x interface{}) {
if x == nil {
return
}
- p.mu.Lock()
- if p.list == nil {
- runtime_registerPool(p)
+ l := p.pin()
+ t := l.tail
+ if t < int(poolLocalCap) {
+ l.buf[t] = x
+ l.tail = t + 1
+ runtime_procUnpin()
+ return
}
- p.list = append(p.list, x)
- p.mu.Unlock()
+ p.putSlow(l, x)
}
// Get selects an arbitrary item from the Pool, removes it from the
@@ -60,16 +95,116 @@
// If Get would otherwise return nil and p.New is non-nil, Get returns
// the result of calling p.New.
func (p *Pool) Get() interface{} {
- p.mu.Lock()
- var x interface{}
- if n := len(p.list); n > 0 {
- x = p.list[n-1]
- p.list[n-1] = nil // Just to be safe
- p.list = p.list[:n-1]
+ l := p.pin()
+ t := l.tail
+ if t > 0 {
+ t -= 1
+ x := l.buf[t]
+ l.tail = t
+ runtime_procUnpin()
+ return x
}
+ return p.getSlow()
+}
+
+func (p *Pool) putSlow(l *poolLocal, x interface{}) {
+ // Grab half of items from local pool and put to global pool.
+ // Can not lock the mutex while pinned.
+ const N = int(poolLocalCap/2 + 1)
+ var buf [N]interface{}
+ buf[0] = x
+ for i := 1; i < N; i++ {
+ l.tail--
+ buf[i] = l.buf[l.tail]
+ }
+ runtime_procUnpin()
+
+ p.mu.Lock()
+ p.global = append(p.global, buf[:]...)
p.mu.Unlock()
+}
+
+func (p *Pool) getSlow() (x interface{}) {
+ // Grab a batch of items from global pool and put to local pool.
+ // Can not lock the mutex while pinned.
+ runtime_procUnpin()
+ p.mu.Lock()
+ pid := runtime_procPin()
+ s := p.localSize
+ l := p.local
+ if uintptr(pid) < s {
+ l = indexLocal(l, pid)
+ // Get the item to return.
+ last := len(p.global) - 1
+ if last >= 0 {
+ x = p.global[last]
+ p.global = p.global[:last]
+ }
+ // Try to refill local pool, we may have been rescheduled to another P.
+ if last > 0 && l.tail == 0 {
+ n := int(poolLocalCap / 2)
+ gl := len(p.global)
+ if n > gl {
+ n = gl
+ }
+ copy(l.buf[:], p.global[gl-n:])
+ p.global = p.global[:gl-n]
+ l.tail = n
+ }
+ }
+ runtime_procUnpin()
+ p.mu.Unlock()
+
if x == nil && p.New != nil {
x = p.New()
}
- return x
+ return
}
+
+// pin pins current goroutine to P, disables preemption and returns poolLocal pool for the P.
+// Caller must call runtime_procUnpin() when done with the pool.
+func (p *Pool) pin() *poolLocal {
+ pid := runtime_procPin()
+ // In pinSlow we store to localSize and then to local, here we load in opposite order.
+ // Since we've disabled preemption, GC can not happen in between.
+ // Thus here we must observe local at least as large localSize.
+ // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
+ s := atomic.LoadUintptr(&p.localSize) // load-acquire
+ l := p.local // load-consume
+ if uintptr(pid) < s {
+ return indexLocal(l, pid)
+ }
+ return p.pinSlow()
+}
+
+func (p *Pool) pinSlow() *poolLocal {
+ // Retry under the mutex.
+ runtime_procUnpin()
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ pid := runtime_procPin()
+ s := p.localSize
+ l := p.local
+ if uintptr(pid) < s {
+ return indexLocal(l, pid)
+ }
+ if p.local == nil {
+ p.globalOffset = unsafe.Offsetof(p.global)
+ runtime_registerPool(p)
+ }
+ // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
+ size := runtime.GOMAXPROCS(0)
+ local := make([]poolLocal, size)
+ atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(&p.local)), unsafe.Pointer(&local[0])) // store-release
+ atomic.StoreUintptr(&p.localSize, uintptr(size)) // store-release
+ return &local[pid]
+}
+
+func indexLocal(l *poolLocal, i int) *poolLocal {
+ return (*poolLocal)(unsafe.Pointer(uintptr(unsafe.Pointer(l)) + unsafe.Sizeof(*l)*uintptr(i))) // uh...
+}
+
+// Implemented in runtime.
+func runtime_registerPool(*Pool)
+func runtime_procPin() int
+func runtime_procUnpin()
diff --git a/src/pkg/sync/pool_test.go b/src/pkg/sync/pool_test.go
index e4aeda4..3bf5131 100644
--- a/src/pkg/sync/pool_test.go
+++ b/src/pkg/sync/pool_test.go
@@ -11,7 +11,6 @@
"sync/atomic"
"testing"
"time"
- "unsafe"
)
func TestPool(t *testing.T) {
@@ -125,28 +124,41 @@
}
func BenchmarkPool(b *testing.B) {
- procs := runtime.GOMAXPROCS(-1)
- var dec func() bool
- if unsafe.Sizeof(b.N) == 8 {
- n := int64(b.N)
- dec = func() bool {
- return atomic.AddInt64(&n, -1) >= 0
- }
- } else {
- n := int32(b.N)
- dec = func() bool {
- return atomic.AddInt32(&n, -1) >= 0
- }
- }
var p Pool
var wg WaitGroup
- for i := 0; i < procs; i++ {
+ n0 := uintptr(b.N)
+ n := n0
+ for i := 0; i < runtime.GOMAXPROCS(0); i++ {
wg.Add(1)
go func() {
defer wg.Done()
- for dec() {
- p.Put(1)
- p.Get()
+ for atomic.AddUintptr(&n, ^uintptr(0)) < n0 {
+ for b := 0; b < 100; b++ {
+ p.Put(1)
+ p.Get()
+ }
+ }
+ }()
+ }
+ wg.Wait()
+}
+
+func BenchmarkPoolOverlflow(b *testing.B) {
+ var p Pool
+ var wg WaitGroup
+ n0 := uintptr(b.N)
+ n := n0
+ for i := 0; i < runtime.GOMAXPROCS(0); i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ for atomic.AddUintptr(&n, ^uintptr(0)) < n0 {
+ for b := 0; b < 100; b++ {
+ p.Put(1)
+ }
+ for b := 0; b < 100; b++ {
+ p.Get()
+ }
}
}()
}