cmd/coordinator: finish the scheduler code, at least mostly
Optimizations and tuning remain, but this should be tons better than
what we had before (random).
Updates golang/go#19178
Change-Id: Idb483a4c4209a012814322cc8b37b966ee4681de
Reviewed-on: https://go-review.googlesource.com/c/build/+/205078
Reviewed-by: Bryan C. Mills <bcmills@google.com>
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index 227493e..8c615f1 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -325,12 +325,9 @@
}
}()
- workc := make(chan buildgo.BuilderRev)
-
if *mode == "dev" {
// TODO(crawshaw): do more in dev mode
gcePool.SetEnabled(*devEnableGCE)
- http.HandleFunc("/dosomework/", handleDoSomeWork(workc))
} else {
go gcePool.cleanUpOldVMs()
if kubeErr == nil {
@@ -342,7 +339,7 @@
}
go listenAndServeInternalModuleProxy()
- go findWorkLoop(workc)
+ go findWorkLoop()
go findTryWorkLoop()
go reportMetrics(context.Background())
// TODO(cmang): gccgo will need its own findWorkLoop
@@ -351,22 +348,38 @@
go listenAndServeTLS()
go listenAndServeSSH() // ssh proxy to remote buildlets; remote.go
- for {
- work := <-workc
- if !mayBuildRev(work) {
- if inStaging {
- if _, ok := dashboard.Builders[work.Name]; ok && logCantBuildStaging.Allow() {
- log.Printf("may not build %v; skipping", work)
- }
+ select {}
+}
+
+// ignoreAllNewWork, when true, prevents addWork from doing anything.
+// It's sometimes set in staging mode when people are debugging
+// certain paths.
+var ignoreAllNewWork bool
+
+// addWorkTestHook is optionally set by tests.
+var addWorkTestHook func(work buildgo.BuilderRev)
+
+func addWork(work buildgo.BuilderRev) {
+ if f := addWorkTestHook; f != nil {
+ f(work)
+ return
+ }
+ if ignoreAllNewWork || isBuilding(work) {
+ return
+ }
+ if !mayBuildRev(work) {
+ if inStaging {
+ if _, ok := dashboard.Builders[work.Name]; ok && logCantBuildStaging.Allow() {
+ log.Printf("may not build %v; skipping", work)
}
- continue
}
- st, err := newBuild(work)
- if err != nil {
- log.Printf("Bad build work params %v: %v", work, err)
- } else {
- st.start()
- }
+ return
+ }
+ st, err := newBuild(work)
+ if err != nil {
+ log.Printf("Bad build work params %v: %v", work, err)
+ } else {
+ st.start()
}
}
@@ -811,19 +824,21 @@
w.(http.Flusher).Flush()
}
-// findWorkLoop polls https://build.golang.org/?mode=json looking for new work
-// for the main dashboard. It does not support gccgo.
-func findWorkLoop(work chan<- buildgo.BuilderRev) {
+// findWorkLoop polls https://build.golang.org/?mode=json looking for
+// new post-submit work for the main dashboard. It does not support
+// gccgo. This is separate from trybots, which populates its work from
+// findTryWorkLoop.
+func findWorkLoop() {
// Useful for debugging a single run:
if inStaging && false {
const debugSubrepo = false
if debugSubrepo {
- work <- buildgo.BuilderRev{
+ addWork(buildgo.BuilderRev{
Name: "linux-arm",
Rev: "c9778ec302b2e0e0d6027e1e0fca892e428d9657",
SubName: "tools",
SubRev: "ac303766f5f240c1796eeea3dc9bf34f1261aa35",
- }
+ })
}
const debugArm = false
if debugArm {
@@ -832,31 +847,29 @@
time.Sleep(time.Second)
}
log.Printf("ARM machine(s) registered.")
- work <- buildgo.BuilderRev{Name: "linux-arm", Rev: "3129c67db76bc8ee13a1edc38a6c25f9eddcbc6c"}
+ addWork(buildgo.BuilderRev{Name: "linux-arm", Rev: "3129c67db76bc8ee13a1edc38a6c25f9eddcbc6c"})
} else {
- work <- buildgo.BuilderRev{Name: "linux-amd64", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"}
- work <- buildgo.BuilderRev{Name: "linux-amd64-sid", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"}
- work <- buildgo.BuilderRev{Name: "linux-amd64-clang", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"}
+ addWork(buildgo.BuilderRev{Name: "linux-amd64", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"})
+ addWork(buildgo.BuilderRev{Name: "linux-amd64-sid", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"})
+ addWork(buildgo.BuilderRev{Name: "linux-amd64-clang", Rev: "9b16b9c7f95562bb290f5015324a345be855894d"})
}
-
- // Still run findWork but ignore what it does.
- ignore := make(chan buildgo.BuilderRev)
- go func() {
- for range ignore {
- }
- }()
- work = ignore
+ ignoreAllNewWork = true
}
+ // TODO: remove this hard-coded 15 second ticker and instead
+ // do some new streaming gRPC call to maintnerd to subscribe
+ // to new commits.
ticker := time.NewTicker(15 * time.Second)
for {
- if err := findWork(work); err != nil {
+ if err := findWork(); err != nil {
log.Printf("failed to find new work: %v", err)
}
<-ticker.C
}
}
-func findWork(work chan<- buildgo.BuilderRev) error {
+// findWork polls the https://build.golang.org/ dashboard once to find
+// post-submit work to do. It's called in a loop by findWorkLoop.
+func findWork() error {
var bs types.BuildStatus
if err := dash("GET", "", url.Values{"mode": {"json"}}, nil, &bs); err != nil {
return err
@@ -881,6 +894,7 @@
// 15 seconds, but they should be skewed toward new work.
// This depends on the build dashboard sending back the list
// of empty slots newest first (matching the order on the main screen).
+ // TODO: delete this code when the scheduler is on by default.
sent := map[string]bool{}
var goRevisions []string // revisions of repo "go", branch "master" revisions
@@ -951,12 +965,18 @@
}
}
- // The !sent[builder] here is a clumsy attempt at priority scheduling
- // and probably should be replaced at some point with a better solution.
- // See golang.org/issue/19178 and the long comment above.
- if !isBuilding(rev) && !sent[builder] {
- sent[builder] = true
- work <- rev
+ if useScheduler {
+ addWork(rev)
+ } else {
+ // The !sent[builder] here is a clumsy attempt at priority scheduling
+ // and probably should be replaced at some point with a better solution.
+ // See golang.org/issue/19178 and the long comment above.
+ // TODO: delete all this code and the sent map above when the
+ // useScheduler const is removed.
+ if !sent[builder] {
+ sent[builder] = true
+ addWork(rev)
+ }
}
}
}
@@ -969,10 +989,7 @@
continue
}
for _, rev := range goRevisions {
- br := buildgo.BuilderRev{Name: b, Rev: rev}
- if !isBuilding(br) {
- work <- br
- }
+ addWork(buildgo.BuilderRev{Name: b, Rev: rev})
}
}
return nil
@@ -1530,18 +1547,12 @@
// and highPriorityOpt.
GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error)
- // HasCapacity reports whether the buildlet pool has
- // quota/capacity to create a buildlet of the provided host
- // type. This should return as fast as possible and err on
- // the side of returning false.
- HasCapacity(hostType string) bool
-
String() string // TODO(bradfitz): more status stuff
}
-// GetBuildlets creates up to n buildlets and sends them on the returned channel
+// getBuildlets creates up to n buildlets and sends them on the returned channel
// before closing the channel.
-func GetBuildlets(ctx context.Context, pool BuildletPool, n int, hostType string, lg logger) <-chan *buildlet.Client {
+func getBuildlets(ctx context.Context, n int, schedTmpl *SchedItem, lg logger) <-chan *buildlet.Client {
ch := make(chan *buildlet.Client) // NOT buffered
var wg sync.WaitGroup
wg.Add(n)
@@ -1549,11 +1560,13 @@
go func(i int) {
defer wg.Done()
sp := lg.CreateSpan("get_helper", fmt.Sprintf("helper %d/%d", i+1, n))
- bc, err := pool.GetBuildlet(ctx, hostType, lg)
+ schedItem := *schedTmpl // copy; GetBuildlet takes ownership
+ schedItem.IsHelper = i > 0
+ bc, err := sched.GetBuildlet(ctx, lg, &schedItem)
sp.Done(err)
if err != nil {
if err != context.Canceled {
- log.Printf("failed to get a %s buildlet: %v", hostType, err)
+ log.Printf("failed to get a %s buildlet: %v", schedItem.HostType, err)
}
return
}
@@ -1574,9 +1587,9 @@
return ch
}
-var testPoolHook func(*dashboard.BuildConfig) BuildletPool
+var testPoolHook func(*dashboard.HostConfig) BuildletPool
-func poolForConf(conf *dashboard.BuildConfig) BuildletPool {
+func poolForConf(conf *dashboard.HostConfig) BuildletPool {
if testPoolHook != nil {
return testPoolHook(conf)
}
@@ -1589,10 +1602,10 @@
} else {
return kubePool
}
- case conf.IsReverse():
+ case conf.IsReverse:
return reversePool
default:
- panic(fmt.Sprintf("no buildlet pool for builder type %q", conf.Name))
+ panic(fmt.Sprintf("no buildlet pool for host type %q", conf.HostType))
}
}
@@ -1641,7 +1654,7 @@
}
func (st *buildStatus) buildletPool() BuildletPool {
- return poolForConf(st.conf)
+ return poolForConf(st.conf.HostConfig())
}
// parentRev returns the parent of this build's commit (but only if this build comes from a trySet).
@@ -1721,8 +1734,12 @@
}
func (st *buildStatus) onceInitHelpersFunc() {
- pool := st.buildletPool()
- st.helpers = GetBuildlets(st.ctx, pool, st.conf.NumTestHelpers(st.isTry()), st.conf.HostType, st)
+ schedTmpl := &SchedItem{
+ BuilderRev: st.BuilderRev,
+ HostType: st.conf.HostType,
+ IsTry: st.isTry(),
+ }
+ st.helpers = getBuildlets(st.ctx, st.conf.NumTestHelpers(st.isTry()), schedTmpl, st)
}
// useSnapshot reports whether this type of build uses a snapshot of
@@ -1835,11 +1852,9 @@
}
sp = st.CreateSpan("get_buildlet")
- pool := st.buildletPool()
bc, err := sched.GetBuildlet(st.ctx, st, &SchedItem{
HostType: st.conf.HostType,
IsTry: st.trySet != nil,
- Pool: pool,
BuilderRev: st.BuilderRev,
})
sp.Done(err)
@@ -1980,7 +1995,7 @@
// Log whether we used COS, so we can do queries to analyze
// Kubernetes vs COS performance for containers.
- if st.conf.IsContainer() && poolForConf(st.conf) == gcePool {
+ if st.conf.IsContainer() && poolForConf(st.conf.HostConfig()) == gcePool {
rec.ContainerHost = "cos"
}
@@ -2099,7 +2114,6 @@
kubeBC, err := sched.GetBuildlet(ctx, st, &SchedItem{
HostType: config.CompileHostType,
IsTry: st.trySet != nil,
- Pool: kubePool,
BuilderRev: st.BuilderRev,
})
sp.Done(err)
diff --git a/cmd/coordinator/coordinator_test.go b/cmd/coordinator/coordinator_test.go
index b0b44d9..abd7cfc 100644
--- a/cmd/coordinator/coordinator_test.go
+++ b/cmd/coordinator/coordinator_test.go
@@ -189,16 +189,14 @@
return false
}
- c := make(chan buildgo.BuilderRev, 1000)
- go func() {
- defer close(c)
- err := findWork(c)
- if err != nil {
- t.Error(err)
- }
- }()
- for br := range c {
- t.Logf("Got: %v", br)
+ addWorkTestHook = func(work buildgo.BuilderRev) {
+ t.Logf("Got: %v", work)
+ }
+ defer func() { addWorkTestHook = nil }()
+
+ err := findWork()
+ if err != nil {
+ t.Error(err)
}
}
diff --git a/cmd/coordinator/gce.go b/cmd/coordinator/gce.go
index e4e702d..5922543 100644
--- a/cmd/coordinator/gce.go
+++ b/cmd/coordinator/gce.go
@@ -423,17 +423,6 @@
}
}
-func (p *gceBuildletPool) HasCapacity(hostType string) bool {
- hconf, ok := dashboard.Hosts[hostType]
- if !ok {
- return false
- }
- numCPU := hconf.GCENumCPU()
- p.mu.Lock()
- defer p.mu.Unlock()
- return p.haveQuotaLocked(numCPU)
-}
-
// haveQuotaLocked reports whether the current GCE quota permits
// starting numCPU more CPUs.
//
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index 8ec51a8..9e55837 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -209,16 +209,6 @@
}
-func (p *kubeBuildletPool) HasCapacity(hostType string) bool {
- // TODO: implement. But for now we don't care because we only
- // use the kubePool for the cross-compiled builds and we have
- // very few hostTypes for those, and only one (ARM) that's
- // used day-to-day. So it's okay if we lie here and always try
- // to create buildlets. The scheduler will still give created
- // buildlets to the highest priority waiter.
- return true
-}
-
func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
hconf, ok := dashboard.Hosts[hostType]
if !ok || !hconf.IsContainer() {
diff --git a/cmd/coordinator/remote.go b/cmd/coordinator/remote.go
index bfc3c8e..c1f30c6 100644
--- a/cmd/coordinator/remote.go
+++ b/cmd/coordinator/remote.go
@@ -139,7 +139,6 @@
return
}
user, _, _ := r.BasicAuth()
- pool := poolForConf(bconf)
var closeNotify <-chan bool
if cn, ok := w.(http.CloseNotifier); ok {
@@ -160,13 +159,17 @@
resc := make(chan *buildlet.Client)
errc := make(chan error)
go func() {
- bc, err := pool.GetBuildlet(ctx, bconf.HostType, loggerFunc(func(event string, optText ...string) {
+ lgFunc := loggerFunc(func(event string, optText ...string) {
var extra string
if len(optText) > 0 {
extra = " " + optText[0]
}
log.Printf("creating buildlet %s for %s: %s%s", bconf.HostType, user, event, extra)
- }))
+ })
+ bc, err := sched.GetBuildlet(ctx, lgFunc, &SchedItem{
+ HostType: bconf.HostType,
+ IsGomote: true,
+ })
if bc != nil {
resc <- bc
return
diff --git a/cmd/coordinator/remote_test.go b/cmd/coordinator/remote_test.go
index 7b9e07b..d35614e 100644
--- a/cmd/coordinator/remote_test.go
+++ b/cmd/coordinator/remote_test.go
@@ -122,7 +122,7 @@
log.SetOutput(tlogger{t})
defer log.SetOutput(os.Stderr)
addBuilder(buildName)
- testPoolHook = func(_ *dashboard.BuildConfig) BuildletPool { return testPool }
+ testPoolHook = func(_ *dashboard.HostConfig) BuildletPool { return testPool }
defer func() {
removeBuilder(buildName)
testPoolHook = nil
diff --git a/cmd/coordinator/reverse.go b/cmd/coordinator/reverse.go
index b06d377..e776bb8 100644
--- a/cmd/coordinator/reverse.go
+++ b/cmd/coordinator/reverse.go
@@ -312,21 +312,6 @@
p.waiters[hostType] += delta
}
-func (p *reverseBuildletPool) HasCapacity(hostType string) bool {
- p.mu.Lock()
- defer p.mu.Unlock()
- for _, b := range p.buildlets {
- if b.hostType != hostType {
- continue
- }
- if b.inUse {
- continue
- }
- return true
- }
- return false
-}
-
func (p *reverseBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
p.updateWaiterCounter(hostType, 1)
defer p.updateWaiterCounter(hostType, -1)
diff --git a/cmd/coordinator/sched.go b/cmd/coordinator/sched.go
index 62d09f7..efa3d9b 100644
--- a/cmd/coordinator/sched.go
+++ b/cmd/coordinator/sched.go
@@ -9,11 +9,13 @@
import (
"context"
- "sort"
+ "log"
"sync"
"time"
"golang.org/x/build/buildlet"
+ "golang.org/x/build/cmd/coordinator/spanlog"
+ "golang.org/x/build/dashboard"
"golang.org/x/build/internal/buildgo"
)
@@ -30,209 +32,242 @@
// for buildlets, starts the creation of buildlets from BuildletPools,
// and prioritizes which callers gets them first when they're ready.
type Scheduler struct {
- mu sync.Mutex
- paused bool
- waiting []*SchedWaiter // index 0 is highest priority
+ // mu guards waiting and hostsCreating.
+ mu sync.Mutex
- readyc chan ReadyBuildlet
+ // waiting contains all the set of callers who are waiting for
+ // a buildlet, keyed by the host type they're waiting for.
+ waiting map[string]map[*SchedItem]bool // hostType -> item -> true
- launching map[*SchedWaiter]bool
+ // hostsCreating is the number of GetBuildlet calls currently in flight
+ // to each hostType's respective buildlet pool.
+ hostsCreating map[string]int // hostType -> count
}
-// A ReadyBuildlet is a buildlet that was just created and is up and
+// A getBuildletResult is a buildlet that was just created and is up and
// is ready to be assigned to a caller based on priority.
-type ReadyBuildlet struct {
+type getBuildletResult struct {
Pool BuildletPool
HostType string
- Client *buildlet.Client
+
+ // One of Client or Err gets set:
+ Client *buildlet.Client
+ Err error
}
// NewScheduler returns a new scheduler.
func NewScheduler() *Scheduler {
s := &Scheduler{
- readyc: make(chan ReadyBuildlet, 8),
- }
- if useScheduler {
- go s.assignLoop()
+ hostsCreating: make(map[string]int),
+ waiting: make(map[string]map[*SchedItem]bool),
}
return s
}
-// assignLoop waits for the successful creation of buildlets and
-// assigns them the highest priority waiter.
-//
-// TODO: probably also need to deal with buildlet creation failures to
-// at least re-nudge the scheduler to kick off new buildlet creations
-// if still necessary.
-func (s *Scheduler) assignLoop() {
+// matchBuildlet matches up a successful getBuildletResult to the
+// highest priority waiter, or closes it if there is none.
+func (s *Scheduler) matchBuildlet(res getBuildletResult) {
+ if res.Err != nil {
+ go s.schedule()
+ return
+ }
for {
- rb := <-s.readyc
- bestWaiter, ok := s.matchWaiter(rb)
+ waiter, ok := s.matchWaiter(res.HostType)
if !ok {
- go rb.Client.Close()
- continue
+ log.Printf("sched: no waiter for buildlet of type %q; closing", res.HostType)
+ go res.Client.Close()
+ return
}
select {
- case bestWaiter.Res <- rb.Client:
+ case waiter.res <- res.Client:
// Normal happy case. Something gets its buildlet.
- default:
- // Wait went away. (context timeout?)
- go rb.Client.Close()
+ return
+ case <-waiter.ctxDone:
+ // Waiter went away in the tiny window between
+ // matchWaiter returning it and here. This
+ // should happen super rarely, so log it to verify that.
+ log.Printf("sched: waiter of type %T went away; trying to match next", res.HostType)
}
}
}
-// pause pauses the scheduler.
-func (s *Scheduler) pause(v bool) {
- if !useScheduler {
- return
- }
- s.mu.Lock()
- s.paused = true
- s.mu.Unlock()
-}
-
-// unpause unpauses the scheduler and runs schedule.
-func (s *Scheduler) unpause() {
- if !useScheduler {
- return
- }
- s.mu.Lock()
- s.paused = false
- s.mu.Unlock()
- s.schedule()
-}
-
// schedule starts creating buildlets if there's demand.
//
-// It acquires s.mu so should run as quickly as possible.
+// It acquires s.mu.
func (s *Scheduler) schedule() {
s.mu.Lock()
defer s.mu.Unlock()
- if s.paused {
- return
- }
- poolExhausted := map[BuildletPool]bool{}
- for _, sw := range s.waiting {
- si := sw.si
- if poolExhausted[si.Pool] || !si.Pool.HasCapacity(si.HostType) {
- poolExhausted[si.Pool] = true
+ s.scheduleLocked()
+}
+
+// scheduleLocked starts creating buildlets if there's demand.
+//
+// It requires that s.mu be held.
+func (s *Scheduler) scheduleLocked() {
+ for hostType, waiting := range s.waiting {
+ need := len(waiting) - s.hostsCreating[hostType]
+ if need <= 0 {
continue
}
- // ... TODO kick things off, using a goroutine per
- // slow buildlet creation call. If the creation fails,
- // the goroutine can call back into the scheduler to
- // inform it of that.
- }
-}
-
-// matchWaiter returns (and removes from the waiting queue) the highest priority SchedWaiter
-// that matches the provided ReadyBuildlet.
-func (s *Scheduler) matchWaiter(rb ReadyBuildlet) (sw *SchedWaiter, ok bool) {
- s.mu.Lock()
- defer s.mu.Unlock()
- for i, sw := range s.waiting {
- si := sw.si
- if si.Pool == rb.Pool && si.HostType == rb.HostType {
- copy(s.waiting[i:], s.waiting[i+1:])
- s.waiting[len(s.waiting)-1] = nil
- s.waiting = s.waiting[:len(s.waiting)-1]
- return sw, true
+ pool := poolForConf(dashboard.Hosts[hostType])
+ // TODO: recognize certain pools like the reverse pool
+ // that have finite capacity and will just queue up
+ // GetBuildlet calls anyway and avoid extra goroutines
+ // here and just cap the number of outstanding
+ // GetBuildlet calls. But even with thousands of
+ // outstanding builds, that's a small constant memory
+ // savings, so for now just do the simpler thing.
+ for i := 0; i < need; i++ {
+ s.hostsCreating[hostType]++
+ go s.getPoolBuildlet(pool, hostType)
}
}
- return nil, false
}
-func (s *Scheduler) removeWaiter(remove *SchedWaiter) {
+type stderrLogger struct{}
+
+func (stderrLogger) LogEventTime(event string, optText ...string) {
+ if len(optText) == 0 {
+ log.Printf("sched.getbuildlet: %v", event)
+ } else {
+ log.Printf("sched.getbuildlet: %v, %v", event, optText[0])
+ }
+}
+
+func (l stderrLogger) CreateSpan(event string, optText ...string) spanlog.Span {
+ return createSpan(l, event, optText...)
+}
+
+func (s *Scheduler) getPoolBuildlet(pool BuildletPool, hostType string) {
+ res := getBuildletResult{
+ Pool: pool,
+ HostType: hostType,
+ }
+ ctx := context.Background() // TODO: make these cancelable and cancel unneeded ones earlier?
+ res.Client, res.Err = pool.GetBuildlet(ctx, hostType, stderrLogger{})
+ s.matchBuildlet(res)
+}
+
+// matchWaiter returns (and removes from the waiting queue) the highest priority SchedItem
+// that matches the provided host type.
+func (s *Scheduler) matchWaiter(hostType string) (_ *SchedItem, ok bool) {
s.mu.Lock()
defer s.mu.Unlock()
- newWaiting := s.waiting[:0]
- for _, sw := range s.waiting {
- if sw != remove {
- newWaiting = append(newWaiting, sw)
+ var best *SchedItem
+ for si := range s.waiting[hostType] {
+ if best == nil || schedLess(si, best) {
+ best = si
}
}
- s.waiting = newWaiting
+ return best, best != nil
}
-func (s *Scheduler) enqueueWaiter(si *SchedItem) *SchedWaiter {
- defer s.schedule()
-
- w := &SchedWaiter{
- s: s,
- si: si,
- Res: make(chan interface{}), // NOT buffered
- }
-
+func (s *Scheduler) removeWaiter(si *SchedItem) {
s.mu.Lock()
defer s.mu.Unlock()
- s.waiting = append(s.waiting, w)
- sort.Slice(s.waiting, func(i, j int) bool {
- ia, ib := s.waiting[i].si, s.waiting[j].si
- return schedLess(ia, ib)
- })
- return w
+ if m := s.waiting[si.HostType]; m != nil {
+ delete(m, si)
+ }
+}
+
+func (s *Scheduler) enqueueWaiter(si *SchedItem) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if _, ok := s.waiting[si.HostType]; !ok {
+ s.waiting[si.HostType] = make(map[*SchedItem]bool)
+ }
+ s.waiting[si.HostType][si] = true
+ s.scheduleLocked()
}
// schedLess reports whether scheduled item ia is "less" (more
// important) than scheduled item ib.
func schedLess(ia, ib *SchedItem) bool {
- // TryBots are more important.
+ // TODO: flesh out this policy more. For now this is much
+ // better than the old random policy.
+ // For example, consider IsHelper? Figure out a policy.
+
+ // Gomote is most important, then TryBots, then FIFO for
+ // either Gomote/Try, else LIFO for post-submit builds.
+ if ia.IsGomote != ib.IsGomote {
+ return ia.IsGomote
+ }
if ia.IsTry != ib.IsTry {
return ia.IsTry
}
- return ia.commitTime.Before(ib.commitTime)
+ // Gomote and TryBots are FIFO.
+ if ia.IsGomote || ia.IsTry {
+ // TODO: if IsTry, consider how many TryBot requests
+ // are outstanding per user. The scheduler should
+ // round-robin between CL authors, rather than use
+ // time. But time works for now.
+ return ia.requestTime.Before(ib.requestTime)
+ }
+ // Post-submit builds are LIFO.
+ return ib.requestTime.Before(ia.requestTime)
}
+// SchedItem is a specification of a requested buildlet in its
+// exported fields, and internal scheduler state used while waiting
+// for that buildlet.
type SchedItem struct {
- buildgo.BuilderRev
- Pool BuildletPool
- HostType string
- IsTry bool
+ buildgo.BuilderRev // not set for gomote
+ HostType string
+ IsGomote bool
+ IsTry bool
+ IsHelper bool
// We set in GetBuildlet:
- commitTime time.Time
- tryFor string // which user. (user with 1 trybot >> user with 50 trybots)
-}
+ s *Scheduler
+ requestTime time.Time
+ tryFor string // which user. (user with 1 trybot >> user with 50 trybots)
+ pool BuildletPool
+ ctxDone <-chan struct{}
+ // TODO: track the commit time of the BuilderRev, via call to maintnerd probably
+ // commitTime time.Time
-type SchedWaiter struct {
- s *Scheduler
- si *SchedItem
-
- // Res is the result channel, containing either a
+ // res is the result channel, containing either a
// *buildlet.Client or an error. It is read by GetBuildlet and
// written by assignBuildlet.
- Res chan interface{}
+ res chan interface{}
}
-func (sw *SchedWaiter) cancel() {
- sw.s.removeWaiter(sw)
+func (si *SchedItem) cancel() {
+ si.s.removeWaiter(si)
}
// GetBuildlet requests a buildlet with the parameters described in si.
+//
+// The provided si must be newly allocated; ownership passes to the scheduler.
func (s *Scheduler) GetBuildlet(ctx context.Context, lg logger, si *SchedItem) (*buildlet.Client, error) {
+ pool := poolForConf(dashboard.Hosts[si.HostType])
+
if !useScheduler {
- return si.Pool.GetBuildlet(ctx, si.HostType, lg)
+ return pool.GetBuildlet(ctx, si.HostType, lg)
}
+ si.pool = pool
+ si.s = s
+ si.requestTime = time.Now()
+ si.res = make(chan interface{}) // NOT buffered
+ si.ctxDone = ctx.Done()
+
// TODO: once we remove the useScheduler const, we can
- // probably remove the "lg" logger parameter. We don't need to
+ // remove the "lg" logger parameter. We don't need to
// log anything during the buildlet creation process anymore
// because we don't which build it'll be for. So all we can
// say in the logs for is "Asking for a buildlet" and "Got
// one", which the caller already does. I think. Verify that.
- // TODO: populate si unexported fields
-
- sw := s.enqueueWaiter(si)
+ s.enqueueWaiter(si)
select {
- case v := <-sw.Res:
+ case v := <-si.res:
if bc, ok := v.(*buildlet.Client); ok {
return bc, nil
}
return nil, v.(error)
case <-ctx.Done():
- sw.cancel()
+ si.cancel()
return nil, ctx.Err()
}
}
diff --git a/cmd/coordinator/sched_test.go b/cmd/coordinator/sched_test.go
new file mode 100644
index 0000000..dcf7cf8
--- /dev/null
+++ b/cmd/coordinator/sched_test.go
@@ -0,0 +1,108 @@
+// Copyright 2019 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// +build go1.13
+// +build linux darwin
+
+package main
+
+import (
+ "testing"
+ "time"
+)
+
+func TestSchedLess(t *testing.T) {
+ t1, t2 := time.Unix(1, 0), time.Unix(2, 0)
+ tests := []struct {
+ name string
+ a, b *SchedItem
+ want bool
+ }{
+ {
+ name: "gomote over reg",
+ a: &SchedItem{
+ IsGomote: true,
+ requestTime: t2,
+ },
+ b: &SchedItem{
+ requestTime: t1,
+ },
+ want: true,
+ },
+ {
+ name: "gomote over try",
+ a: &SchedItem{
+ IsGomote: true,
+ requestTime: t2,
+ },
+ b: &SchedItem{
+ IsTry: true,
+ requestTime: t1,
+ },
+ want: true,
+ },
+ {
+ name: "try over reg",
+ a: &SchedItem{
+ IsTry: true,
+ requestTime: t2,
+ },
+ b: &SchedItem{
+ requestTime: t1,
+ },
+ want: true,
+ },
+ {
+ name: "try FIFO, less",
+ a: &SchedItem{
+ IsTry: true,
+ requestTime: t1,
+ },
+ b: &SchedItem{
+ IsTry: true,
+ requestTime: t2,
+ },
+ want: true,
+ },
+ {
+ name: "try FIFO, greater",
+ a: &SchedItem{
+ IsTry: true,
+ requestTime: t2,
+ },
+ b: &SchedItem{
+ IsTry: true,
+ requestTime: t1,
+ },
+ want: false,
+ },
+ {
+ name: "reg LIFO, less",
+ a: &SchedItem{
+ requestTime: t2,
+ },
+ b: &SchedItem{
+ requestTime: t1,
+ },
+ want: true,
+ },
+ {
+ name: "reg LIFO, greater",
+ a: &SchedItem{
+ requestTime: t1,
+ },
+ b: &SchedItem{
+ requestTime: t2,
+ },
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ got := schedLess(tt.a, tt.b)
+ if got != tt.want {
+ t.Errorf("%s: got %v; want %v", tt.name, got, tt.want)
+ }
+ }
+
+}
diff --git a/dashboard/builders.go b/dashboard/builders.go
index c68ebed..8e9178c 100644
--- a/dashboard/builders.go
+++ b/dashboard/builders.go
@@ -844,7 +844,7 @@
if c.FlakyNet {
env = append(env, "GO_BUILDER_FLAKY_NET=1")
}
- env = append(env, c.hostConf().env...)
+ env = append(env, c.HostConfig().env...)
return append(env, c.env...)
}
@@ -900,12 +900,12 @@
return true
}
-func (c *BuildConfig) IsReverse() bool { return c.hostConf().IsReverse }
+func (c *BuildConfig) IsReverse() bool { return c.HostConfig().IsReverse }
-func (c *BuildConfig) IsContainer() bool { return c.hostConf().IsContainer() }
+func (c *BuildConfig) IsContainer() bool { return c.HostConfig().IsContainer() }
func (c *HostConfig) IsContainer() bool { return c.ContainerImage != "" }
-func (c *BuildConfig) IsVM() bool { return c.hostConf().IsVM() }
+func (c *BuildConfig) IsVM() bool { return c.HostConfig().IsVM() }
func (c *HostConfig) IsVM() bool { return c.VMImage != "" }
func (c *BuildConfig) GOOS() string { return c.Name[:strings.Index(c.Name, "-")] }
@@ -955,7 +955,7 @@
// timeoutScale returns this builder's GO_TEST_TIMEOUT_SCALE value, or 1.
func (c *BuildConfig) timeoutScale() int {
const pfx = "GO_TEST_TIMEOUT_SCALE="
- for _, env := range [][]string{c.env, c.hostConf().env} {
+ for _, env := range [][]string{c.env, c.HostConfig().env} {
for _, kv := range env {
if strings.HasPrefix(kv, pfx) {
if n, err := strconv.Atoi(kv[len(pfx):]); err == nil && n > 0 {
@@ -967,7 +967,8 @@
return 1
}
-func (c *BuildConfig) hostConf() *HostConfig {
+// HostConfig returns the host configuration of c.
+func (c *BuildConfig) HostConfig() *HostConfig {
if c.testHostConf != nil {
return c.testHostConf
}
@@ -980,7 +981,7 @@
// GoBootstrapURL returns the URL of a built Go 1.4+ tar.gz for the
// build configuration type c, or empty string if there isn't one.
func (c *BuildConfig) GoBootstrapURL(e *buildenv.Environment) string {
- return strings.Replace(c.hostConf().goBootstrapURLTmpl, "$BUCKET", e.BuildletBucket, 1)
+ return strings.Replace(c.HostConfig().goBootstrapURLTmpl, "$BUCKET", e.BuildletBucket, 1)
}
// BuildletBinaryURL returns the public URL of this builder's buildlet.
@@ -1240,7 +1241,7 @@
// ShortOwner returns a short human-readable owner.
func (c BuildConfig) ShortOwner() string {
- owner := c.hostConf().Owner
+ owner := c.HostConfig().Owner
if owner == "" {
return "go-dev"
}
@@ -1249,7 +1250,7 @@
// OwnerGithub returns the Github handle of the owner.
func (c BuildConfig) OwnerGithub() string {
- return c.hostConf().OwnerGithub
+ return c.HostConfig().OwnerGithub
}
// PoolName returns a short summary of the builder's host type for the