cmd/coordinator: add tests for the scheduler, and resulting fixes

It now passes thousands of iterations in race mode.

Updates golang/go#19178

Change-Id: I210277abd084bdfd3c7ada538189722ff9543e3f
Reviewed-on: https://go-review.googlesource.com/c/build/+/207079
Run-TryBot: Brad Fitzpatrick <bradfitz@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Bryan C. Mills <bcmills@google.com>
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index b5aafd5..87139e6 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -1593,6 +1593,9 @@
 	if testPoolHook != nil {
 		return testPoolHook(conf)
 	}
+	if conf == nil {
+		panic("nil conf")
+	}
 	switch {
 	case conf.IsVM():
 		return gcePool
diff --git a/cmd/coordinator/sched.go b/cmd/coordinator/sched.go
index efa3d9b..e1e6edf 100644
--- a/cmd/coordinator/sched.go
+++ b/cmd/coordinator/sched.go
@@ -9,6 +9,7 @@
 
 import (
 	"context"
+	"fmt"
 	"log"
 	"sync"
 	"time"
@@ -26,7 +27,7 @@
 // If false, any GetBuildlet call to the schedule delegates directly
 // to the BuildletPool's GetBuildlet and we make a bunch of callers
 // fight over a mutex and a random one wins, like we used to do it.
-const useScheduler = false
+var useScheduler = false
 
 // The Scheduler prioritizes access to buidlets. It accepts requests
 // for buildlets, starts the creation of buildlets from BuildletPools,
@@ -79,8 +80,9 @@
 			return
 		}
 		select {
-		case waiter.res <- res.Client:
+		case ch := <-waiter.wantRes:
 			// Normal happy case. Something gets its buildlet.
+			ch <- res.Client
 			return
 		case <-waiter.ctxDone:
 			// Waiter went away in the tiny window between
@@ -138,6 +140,8 @@
 	return createSpan(l, event, optText...)
 }
 
+// getPoolBuildlet is launched as its own goroutine to do a
+// potentially long blocking cal to pool.GetBuildlet.
 func (s *Scheduler) getPoolBuildlet(pool BuildletPool, hostType string) {
 	res := getBuildletResult{
 		Pool:     pool,
@@ -145,21 +149,36 @@
 	}
 	ctx := context.Background() // TODO: make these cancelable and cancel unneeded ones earlier?
 	res.Client, res.Err = pool.GetBuildlet(ctx, hostType, stderrLogger{})
+
+	// This is still slightly racy, but probably ok for now.
+	// (We might invoke the schedule method right after
+	// GetBuildlet returns and dial an extra buildlet, but if so
+	// we'll close it without using it.)
+	s.mu.Lock()
+	s.hostsCreating[res.HostType]--
+	s.mu.Unlock()
+
 	s.matchBuildlet(res)
 }
 
-// matchWaiter returns (and removes from the waiting queue) the highest priority SchedItem
+// matchWaiter returns (and removes from the waiting set) the highest priority SchedItem
 // that matches the provided host type.
 func (s *Scheduler) matchWaiter(hostType string) (_ *SchedItem, ok bool) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
+	waiters := s.waiting[hostType]
+
 	var best *SchedItem
-	for si := range s.waiting[hostType] {
+	for si := range waiters {
 		if best == nil || schedLess(si, best) {
 			best = si
 		}
 	}
-	return best, best != nil
+	if best != nil {
+		delete(waiters, best)
+		return best, true
+	}
+	return nil, false
 }
 
 func (s *Scheduler) removeWaiter(si *SchedItem) {
@@ -170,7 +189,7 @@
 	}
 }
 
-func (s *Scheduler) enqueueWaiter(si *SchedItem) {
+func (s *Scheduler) addWaiter(si *SchedItem) {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	if _, ok := s.waiting[si.HostType]; !ok {
@@ -180,6 +199,12 @@
 	s.scheduleLocked()
 }
 
+func (s *Scheduler) hasWaiter(si *SchedItem) bool {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	return s.waiting[si.HostType][si]
+}
+
 // schedLess reports whether scheduled item ia is "less" (more
 // important) than scheduled item ib.
 func schedLess(ia, ib *SchedItem) bool {
@@ -217,30 +242,44 @@
 	IsTry              bool
 	IsHelper           bool
 
-	// We set in GetBuildlet:
+	// The following unexported fields are set by the Scheduler in
+	// Scheduler.GetBuildlet.
+
 	s           *Scheduler
 	requestTime time.Time
-	tryFor      string // which user. (user with 1 trybot >> user with 50 trybots)
+	commitTime  time.Time // TODO: populate post-submit commit time from maintnerd
+	branch      string    // TODO: populate from maintnerd
+	tryFor      string    // TODO: which user. (user with 1 trybot >> user with 50 trybots)
 	pool        BuildletPool
 	ctxDone     <-chan struct{}
-	// TODO: track the commit time of the BuilderRev, via call to maintnerd probably
-	// commitTime time.Time
 
-	// res is the result channel, containing either a
-	// *buildlet.Client or an error. It is read by GetBuildlet and
-	// written by assignBuildlet.
-	res chan interface{}
-}
-
-func (si *SchedItem) cancel() {
-	si.s.removeWaiter(si)
+	// wantRes is the unbuffered channel that's passed
+	// synchronously from Scheduler.GetBuildlet to
+	// Scheduler.matchBuildlet. Its value is a channel (whose
+	// buffering doesn't matter) to pass over a *buildlet.Client
+	// just obtained from a BuildletPool. The contract to use
+	// wantRes is that the sender must have a result already
+	// available to send on the inner channel, and the receiver
+	// still wants it (their context hasn't expired).
+	wantRes chan chan<- *buildlet.Client
 }
 
 // GetBuildlet requests a buildlet with the parameters described in si.
 //
 // The provided si must be newly allocated; ownership passes to the scheduler.
 func (s *Scheduler) GetBuildlet(ctx context.Context, lg logger, si *SchedItem) (*buildlet.Client, error) {
-	pool := poolForConf(dashboard.Hosts[si.HostType])
+	// TODO: once we remove the useScheduler const, we can remove
+	// the "lg" logger parameter. We don't need to log anything
+	// during the buildlet creation process anymore because we
+	// don't know which build it'll be for. So all we can say in
+	// the logs for is "Asking for a buildlet" and "Got one",
+	// which the caller already does. I think. Verify that.
+
+	hostConf, ok := dashboard.Hosts[si.HostType]
+	if !ok && testPoolHook == nil {
+		return nil, fmt.Errorf("invalid SchedItem.HostType %q", si.HostType)
+	}
+	pool := poolForConf(hostConf)
 
 	if !useScheduler {
 		return pool.GetBuildlet(ctx, si.HostType, lg)
@@ -249,25 +288,19 @@
 	si.pool = pool
 	si.s = s
 	si.requestTime = time.Now()
-	si.res = make(chan interface{}) // NOT buffered
 	si.ctxDone = ctx.Done()
+	si.wantRes = make(chan chan<- *buildlet.Client) // unbuffered
 
-	// TODO: once we remove the useScheduler const, we can
-	// remove the "lg" logger parameter. We don't need to
-	// log anything during the buildlet creation process anymore
-	// because we don't which build it'll be for. So all we can
-	// say in the logs for is "Asking for a buildlet" and "Got
-	// one", which the caller already does. I think. Verify that.
+	s.addWaiter(si)
 
-	s.enqueueWaiter(si)
+	ch := make(chan *buildlet.Client)
 	select {
-	case v := <-si.res:
-		if bc, ok := v.(*buildlet.Client); ok {
-			return bc, nil
-		}
-		return nil, v.(error)
+	case si.wantRes <- ch:
+		// No need to call removeWaiter. If we're here, the
+		// sender has already done so.
+		return <-ch, nil
 	case <-ctx.Done():
-		si.cancel()
+		s.removeWaiter(si)
 		return nil, ctx.Err()
 	}
 }
diff --git a/cmd/coordinator/sched_test.go b/cmd/coordinator/sched_test.go
index dcf7cf8..25f074a 100644
--- a/cmd/coordinator/sched_test.go
+++ b/cmd/coordinator/sched_test.go
@@ -8,8 +8,15 @@
 package main
 
 import (
+	"context"
+	"fmt"
+	"runtime"
 	"testing"
 	"time"
+
+	"golang.org/x/build/buildlet"
+	"golang.org/x/build/cmd/coordinator/spanlog"
+	"golang.org/x/build/dashboard"
 )
 
 func TestSchedLess(t *testing.T) {
@@ -104,5 +111,207 @@
 			t.Errorf("%s: got %v; want %v", tt.name, got, tt.want)
 		}
 	}
+}
 
+type discardLogger struct{}
+
+func (discardLogger) LogEventTime(event string, optText ...string) {}
+
+func (discardLogger) CreateSpan(event string, optText ...string) spanlog.Span {
+	return createSpan(discardLogger{}, event, optText...)
+}
+
+// step is a test step for TestScheduler
+type step func(*testing.T, *Scheduler)
+
+// getBuildletCall represents a call to GetBuildlet.
+type getBuildletCall struct {
+	si        *SchedItem
+	ctx       context.Context
+	ctxCancel context.CancelFunc
+
+	done      chan struct{} // closed when call done
+	gotClient *buildlet.Client
+	gotErr    error
+}
+
+func newGetBuildletCall(si *SchedItem) *getBuildletCall {
+	c := &getBuildletCall{
+		si:   si,
+		done: make(chan struct{}),
+	}
+	c.ctx, c.ctxCancel = context.WithCancel(context.Background())
+	return c
+}
+
+func (c *getBuildletCall) cancel(t *testing.T, s *Scheduler) { c.ctxCancel() }
+
+// start is a step (assignable to type step) that starts a
+// s.GetBuildlet call and waits for it to either succeed or get
+// blocked in the scheduler.
+func (c *getBuildletCall) start(t *testing.T, s *Scheduler) {
+	t.Logf("starting buildlet call for SchedItem=%p", c.si)
+	go func() {
+		c.gotClient, c.gotErr = s.GetBuildlet(c.ctx, discardLogger{}, c.si)
+		close(c.done)
+	}()
+
+	// Wait for si to be enqueued, or this call to be satisified.
+	if !trueSoon(func() bool {
+		select {
+		case <-c.done:
+			return true
+		default:
+			return s.hasWaiter(c.si)
+		}
+	}) {
+		t.Fatalf("timeout waiting for GetBuildlet call to run to its blocking point")
+	}
+}
+
+func trueSoon(f func() bool) bool {
+	deadline := time.Now().Add(5 * time.Second)
+	for {
+		if time.Now().After(deadline) {
+			return false
+		}
+		if f() {
+			return true
+		}
+		time.Sleep(5 * time.Millisecond)
+	}
+}
+
+// wantGetBuildlet is a step (assignable to type step) that) that expects
+// the GetBuildlet call to succeed.
+func (c *getBuildletCall) wantGetBuildlet(t *testing.T, s *Scheduler) {
+	timer := time.NewTimer(5 * time.Second)
+	defer timer.Stop()
+	t.Logf("waiting on sched.getBuildlet(%q) ...", c.si.HostType)
+	select {
+	case <-c.done:
+		t.Logf("got sched.getBuildlet(%q).", c.si.HostType)
+		if c.gotErr != nil {
+			t.Fatalf("GetBuildlet(%q): %v", c.si.HostType, c.gotErr)
+		}
+	case <-timer.C:
+		stack := make([]byte, 1<<20)
+		stack = stack[:runtime.Stack(stack, true)]
+		t.Fatalf("timeout waiting for buildlet of type %q; stacks:\n%s", c.si.HostType, stack)
+	}
+}
+
+type poolChan map[string]chan interface{} // hostType -> { *buildlet.Client | error}
+
+func (m poolChan) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
+	c, ok := m[hostType]
+	if !ok {
+		return nil, fmt.Errorf("pool doesn't support host type %q", hostType)
+	}
+	select {
+	case v := <-c:
+		if c, ok := v.(*buildlet.Client); ok {
+			return c, nil
+		}
+		return nil, v.(error)
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	}
+}
+
+func (poolChan) String() string { return "testing poolChan" }
+
+func TestScheduler(t *testing.T) {
+	defer func(old bool) { useScheduler = old }(useScheduler)
+	useScheduler = true
+	defer func() { testPoolHook = nil }()
+
+	var pool poolChan // initialized per test below
+	// buildletAvailable is a step that creates a buildlet to the pool.
+	buildletAvailable := func(hostType string) step {
+		return func(t *testing.T, s *Scheduler) {
+			bc := buildlet.NewClient("127.0.0.1:9999", buildlet.NoKeyPair) // dummy
+			t.Logf("adding buildlet to pool for %q...", hostType)
+			ch := pool[hostType]
+			ch <- bc
+			t.Logf("added buildlet to pool for %q (ch=%p)", hostType, ch)
+		}
+	}
+
+	tests := []struct {
+		name  string
+		steps func() []step
+	}{
+		{
+			name: "simple-get-before-available",
+			steps: func() []step {
+				si := &SchedItem{HostType: "test-host-foo"}
+				fooGet := newGetBuildletCall(si)
+				return []step{
+					fooGet.start,
+					buildletAvailable("test-host-foo"),
+					fooGet.wantGetBuildlet,
+				}
+			},
+		},
+		{
+			name: "simple-get-already-available",
+			steps: func() []step {
+				si := &SchedItem{HostType: "test-host-foo"}
+				fooGet := newGetBuildletCall(si)
+				return []step{
+					buildletAvailable("test-host-foo"),
+					fooGet.start,
+					fooGet.wantGetBuildlet,
+				}
+			},
+		},
+		{
+			name: "try-bot-trumps-regular", // really that prioritization works at all; TestSchedLess tests actual policy
+			steps: func() []step {
+				tryItem := &SchedItem{HostType: "test-host-foo", IsTry: true}
+				regItem := &SchedItem{HostType: "test-host-foo"}
+				tryGet := newGetBuildletCall(tryItem)
+				regGet := newGetBuildletCall(regItem)
+				return []step{
+					regGet.start,
+					tryGet.start,
+					buildletAvailable("test-host-foo"),
+					tryGet.wantGetBuildlet,
+					buildletAvailable("test-host-foo"),
+					regGet.wantGetBuildlet,
+				}
+			},
+		},
+		{
+			name: "cancel-context-removes-waiter",
+			steps: func() []step {
+				si := &SchedItem{HostType: "test-host-foo"}
+				get := newGetBuildletCall(si)
+				return []step{
+					get.start,
+					get.cancel,
+					func(t *testing.T, s *Scheduler) {
+						if !trueSoon(func() bool { return !s.hasWaiter(si) }) {
+							t.Errorf("still have SchedItem in waiting set")
+						}
+					},
+				}
+			},
+		},
+	}
+	for _, tt := range tests {
+		pool = make(poolChan)
+		pool["test-host-foo"] = make(chan interface{}, 1)
+		pool["test-host-bar"] = make(chan interface{}, 1)
+
+		testPoolHook = func(*dashboard.HostConfig) BuildletPool { return pool }
+		t.Run(tt.name, func(t *testing.T) {
+			s := NewScheduler()
+			for i, st := range tt.steps() {
+				t.Logf("step %v...", i)
+				st(t, s)
+			}
+		})
+	}
 }