blob: 9e11cb3e12cac91d8f9b64e8814197dd763e75ee [file] [log] [blame]
// Copyright 2012 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Parallel for algorithm.
package runtime
import (
"runtime/internal/atomic"
"runtime/internal/sys"
)
// A parfor holds state for the parallel for operation.
type parfor struct {
body func(*parfor, uint32) // executed for each element
done uint32 // number of idle threads
nthr uint32 // total number of threads
thrseq uint32 // thread id sequencer
cnt uint32 // iteration space [0, cnt)
wait bool // if true, wait while all threads finish processing,
// otherwise parfor may return while other threads are still working
thr []parforthread // thread descriptors
// stats
nsteal uint64
nstealcnt uint64
nprocyield uint64
nosyield uint64
nsleep uint64
}
// A parforthread holds state for a single thread in the parallel for.
type parforthread struct {
// the thread's iteration space [32lsb, 32msb)
pos uint64
// stats
nsteal uint64
nstealcnt uint64
nprocyield uint64
nosyield uint64
nsleep uint64
pad [sys.CacheLineSize]byte
}
func parforalloc(nthrmax uint32) *parfor {
return &parfor{
thr: make([]parforthread, nthrmax),
}
}
// Parforsetup initializes desc for a parallel for operation with nthr
// threads executing n jobs.
//
// On return the nthr threads are each expected to call parfordo(desc)
// to run the operation. During those calls, for each i in [0, n), one
// thread will be used invoke body(desc, i).
// If wait is true, no parfordo will return until all work has been completed.
// If wait is false, parfordo may return when there is a small amount
// of work left, under the assumption that another thread has that
// work well in hand.
func parforsetup(desc *parfor, nthr, n uint32, wait bool, body func(*parfor, uint32)) {
if desc == nil || nthr == 0 || nthr > uint32(len(desc.thr)) || body == nil {
print("desc=", desc, " nthr=", nthr, " count=", n, " body=", body, "\n")
throw("parfor: invalid args")
}
desc.body = body
desc.done = 0
desc.nthr = nthr
desc.thrseq = 0
desc.cnt = n
desc.wait = wait
desc.nsteal = 0
desc.nstealcnt = 0
desc.nprocyield = 0
desc.nosyield = 0
desc.nsleep = 0
for i := range desc.thr {
begin := uint32(uint64(n) * uint64(i) / uint64(nthr))
end := uint32(uint64(n) * uint64(i+1) / uint64(nthr))
desc.thr[i].pos = uint64(begin) | uint64(end)<<32
}
}
func parfordo(desc *parfor) {
// Obtain 0-based thread index.
tid := atomic.Xadd(&desc.thrseq, 1) - 1
if tid >= desc.nthr {
print("tid=", tid, " nthr=", desc.nthr, "\n")
throw("parfor: invalid tid")
}
// If single-threaded, just execute the for serially.
body := desc.body
if desc.nthr == 1 {
for i := uint32(0); i < desc.cnt; i++ {
body(desc, i)
}
return
}
me := &desc.thr[tid]
mypos := &me.pos
for {
for {
// While there is local work,
// bump low index and execute the iteration.
pos := atomic.Xadd64(mypos, 1)
begin := uint32(pos) - 1
end := uint32(pos >> 32)
if begin < end {
body(desc, begin)
continue
}
break
}
// Out of work, need to steal something.
idle := false
for try := uint32(0); ; try++ {
// If we don't see any work for long enough,
// increment the done counter...
if try > desc.nthr*4 && !idle {
idle = true
atomic.Xadd(&desc.done, 1)
}
// ...if all threads have incremented the counter,
// we are done.
extra := uint32(0)
if !idle {
extra = 1
}
if desc.done+extra == desc.nthr {
if !idle {
atomic.Xadd(&desc.done, 1)
}
goto exit
}
// Choose a random victim for stealing.
var begin, end uint32
victim := fastrand1() % (desc.nthr - 1)
if victim >= tid {
victim++
}
victimpos := &desc.thr[victim].pos
for {
// See if it has any work.
pos := atomic.Load64(victimpos)
begin = uint32(pos)
end = uint32(pos >> 32)
if begin+1 >= end {
end = 0
begin = end
break
}
if idle {
atomic.Xadd(&desc.done, -1)
idle = false
}
begin2 := begin + (end-begin)/2
newpos := uint64(begin) | uint64(begin2)<<32
if atomic.Cas64(victimpos, pos, newpos) {
begin = begin2
break
}
}
if begin < end {
// Has successfully stolen some work.
if idle {
throw("parfor: should not be idle")
}
atomic.Store64(mypos, uint64(begin)|uint64(end)<<32)
me.nsteal++
me.nstealcnt += uint64(end) - uint64(begin)
break
}
// Backoff.
if try < desc.nthr {
// nothing
} else if try < 4*desc.nthr {
me.nprocyield++
procyield(20)
} else if !desc.wait {
// If a caller asked not to wait for the others, exit now
// (assume that most work is already done at this point).
if !idle {
atomic.Xadd(&desc.done, 1)
}
goto exit
} else if try < 6*desc.nthr {
me.nosyield++
osyield()
} else {
me.nsleep++
usleep(1)
}
}
}
exit:
atomic.Xadd64(&desc.nsteal, int64(me.nsteal))
atomic.Xadd64(&desc.nstealcnt, int64(me.nstealcnt))
atomic.Xadd64(&desc.nprocyield, int64(me.nprocyield))
atomic.Xadd64(&desc.nosyield, int64(me.nosyield))
atomic.Xadd64(&desc.nsleep, int64(me.nsleep))
me.nsteal = 0
me.nstealcnt = 0
me.nprocyield = 0
me.nosyield = 0
me.nsleep = 0
}