cmd/coordinator: test sharding, better status, logs

Also gomote updates which came about during the process of developing
and debugging this.

Change-Id: Ia53d674118a6b99bcdda7062d3b7161279b6ad52
Reviewed-on: https://go-review.googlesource.com/10463
Reviewed-by: Andrew Gerrand <adg@golang.org>
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index 89de7bd..0aac886 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -25,6 +25,7 @@
 	"os"
 	"path"
 	"runtime"
+	"sort"
 	"strings"
 	"sync"
 	"time"
@@ -40,6 +41,12 @@
 
 var processStartTime = time.Now()
 
+var Version string // set by linker -X
+
+// devPause is a debug option to pause for 5 minutes after the build
+// finishes before destroying buildlets.
+const devPause = false
+
 func init() {
 	// Disabled until we have test sharding. This takes 85+ minutes.
 	// Test sharding is https://github.com/golang/go/issues/10029
@@ -67,7 +74,6 @@
 
 // LOCK ORDER:
 //   statusMu, buildStatus.mu, trySet.mu
-// TODO(bradfitz,adg): rewrite the coordinator
 
 var (
 	startTime = time.Now()
@@ -95,7 +101,7 @@
 		"windows-amd64-gce",
 		"openbsd-386-gce56",
 		"openbsd-amd64-gce56",
-		"plan9-386-gcepartial",
+		"plan9-386",
 		"nacl-386",
 		"nacl-amd64p32",
 		"linux-arm-shard_test",
@@ -251,7 +257,7 @@
 
 func main() {
 	flag.Parse()
-
+	log.Printf("coordinator version %q starting", Version)
 	err := initGCE()
 	if err != nil {
 		if *mode == "" {
@@ -297,6 +303,7 @@
 		go gcePool.cleanUpOldVMs()
 
 		if devCluster {
+			dashboard.BuildletBucket = "dev-go-builder-data"
 			// Only run the linux-amd64 builder in the dev cluster (for now).
 			dashboard.Builders = devClusterBuilders()
 		}
@@ -337,11 +344,11 @@
 	m := map[string]dashboard.BuildConfig{}
 	for _, name := range []string{
 		"linux-amd64",
+		"linux-amd64-race",
 		"windows-amd64-gce",
+		"plan9-386",
 	} {
-		conf := dashboard.Builders[name]
-		conf.SetBuildletBinaryURL(strings.Replace(conf.BuildletBinaryURL(), "go-builder-data", "dev-go-builder-data", 1))
-		m[name] = conf
+		m[name] = dashboard.Builders[name]
 	}
 	return m
 }
@@ -366,6 +373,9 @@
 	if isBuilding(rev) {
 		return false
 	}
+	if devCluster && numCurrentBuilds() != 0 {
+		return false
+	}
 	if dashboard.Builders[rev.name].IsReverse {
 		return reversePool.CanBuild(rev.name)
 	}
@@ -561,6 +571,11 @@
 // for the main dashboard. It does not support gccgo.
 // TODO(bradfitz): it also currently does not support subrepos.
 func findWorkLoop(work chan<- builderRev) {
+	// Useful for debugging a single run:
+	if devCluster && false {
+		work <- builderRev{name: "linux-amd64-race", rev: "54789eff385780c54254f822e09505b6222918e2"}
+		return
+	}
 	ticker := time.NewTicker(15 * time.Second)
 	for {
 		if err := findWork(work); err != nil {
@@ -921,7 +936,23 @@
 }
 
 type eventTimeLogger interface {
-	logEventTime(name string)
+	logEventTime(event string, optText ...string)
+}
+
+var ErrCanceled = errors.New("canceled")
+
+// Cancel is a channel that's closed by the caller when the request is no longer
+// desired. The function receiving a cancel should return ErrCanceled whenever
+// Cancel becomes readable.
+type Cancel <-chan struct{}
+
+func (c Cancel) IsCanceled() bool {
+	select {
+	case <-c:
+		return true
+	default:
+		return false
+	}
 }
 
 type BuildletPool interface {
@@ -933,11 +964,44 @@
 	// anything except for log messages or VM naming.
 	//
 	// Clients must Close when done with the client.
-	GetBuildlet(machineType, rev string, el eventTimeLogger) (*buildlet.Client, error)
+	GetBuildlet(cancel Cancel, machineType, rev string, el eventTimeLogger) (*buildlet.Client, error)
 
 	String() string // TODO(bradfitz): more status stuff
 }
 
+// GetBuildlets creates up to n buildlets and sends them on the returned channel
+// before closing the channel.
+func GetBuildlets(cancel Cancel, pool BuildletPool, n int, machineType, rev string, el eventTimeLogger) <-chan *buildlet.Client {
+	ch := make(chan *buildlet.Client) // NOT buffered
+	var wg sync.WaitGroup
+	wg.Add(n)
+	for i := 0; i < n; i++ {
+		go func() {
+			defer wg.Done()
+			bc, err := pool.GetBuildlet(cancel, machineType, rev, el)
+			if err != nil {
+				if err != ErrCanceled {
+					log.Printf("failed to get a %s buildlet for rev %s: %v", machineType, rev, err)
+				}
+				return
+			}
+			el.logEventTime("helper_ready")
+			select {
+			case ch <- bc:
+			case <-cancel:
+				el.logEventTime("helper_killed_before_use")
+				bc.Close()
+				return
+			}
+		}()
+	}
+	go func() {
+		wg.Wait()
+		close(ch)
+	}()
+	return ch
+}
+
 func poolForConf(conf dashboard.BuildConfig) (BuildletPool, error) {
 	if conf.VMImage != "" {
 		return gcePool, nil
@@ -979,21 +1043,107 @@
 	}()
 }
 
-func (st *buildStatus) build() (retErr error) {
-	buildletType := st.conf.BuildletType
-	if buildletType == "" {
-		buildletType = st.conf.Name
+func (st *buildStatus) buildletType() string {
+	if v := st.conf.BuildletType; v != "" {
+		return v
 	}
+	return st.conf.Name
+}
+
+func (st *buildStatus) buildletPool() (BuildletPool, error) {
+	buildletType := st.buildletType()
 	bconf, ok := dashboard.Builders[buildletType]
 	if !ok {
-		return fmt.Errorf("invalid BuildletType %q for %q", buildletType, st.conf.Name)
+		return nil, fmt.Errorf("invalid BuildletType %q for %q", buildletType, st.conf.Name)
 	}
-	pool, err := poolForConf(bconf)
+	return poolForConf(bconf)
+}
+
+func (st *buildStatus) expectedMakeBashDuration() time.Duration {
+	// TODO: base this on historical measurements, instead of statically configured.
+	// TODO: move this to dashboard/builders.go? But once we based on on historical
+	// measurements, it'll need GCE services (bigtable/bigquery?), so it's probably
+	// better in this file.
+	goos, goarch := st.conf.GOOS(), st.conf.GOARCH()
+
+	if goos == "plan9" {
+		return 2500 * time.Millisecond
+	}
+	if goos == "linux" {
+		if goarch == "arm" {
+			return 4 * time.Minute
+		}
+		return 1000 * time.Millisecond
+	}
+	if goos == "windows" {
+		return 1000 * time.Millisecond
+	}
+
+	return 1500 * time.Millisecond
+}
+
+func (st *buildStatus) expectedBuildletStartDuration() time.Duration {
+	// TODO: move this to dashboard/builders.go? But once we based on on historical
+	// measurements, it'll need GCE services (bigtable/bigquery?), so it's probably
+	// better in this file.
+	pool, _ := st.buildletPool()
+	switch pool.(type) {
+	case *gceBuildletPool:
+		return time.Minute
+	case *reverseBuildletPool:
+		goos, arch := st.conf.GOOS(), st.conf.GOARCH()
+		if goos == "darwin" {
+			if arch == "arm" && arch == "arm64" {
+				// iOS; idle or it's not.
+				return 0
+			}
+			if arch == "amd64" || arch == "386" {
+				return 0               // TODO: remove this once we're using VMware
+				return 1 * time.Minute // VMware boot of hermetic OS X
+			}
+		}
+		if goos == "linux" && arch == "arm" {
+			// Scaleway. Ready or not.
+			return 0
+		}
+	}
+	return 0
+}
+
+// getHelpersReadySoon waits a bit (as a function of the build
+// configuration) and starts getting the buildlets for test sharding
+// ready, such that they're ready when make.bash is done. But we don't
+// want to start too early, lest we waste idle resources during make.bash.
+func (st *buildStatus) getHelpersReadySoon() {
+	if st.conf.NumTestHelpers == 0 {
+		return
+	}
+	time.AfterFunc(st.expectedMakeBashDuration()-st.expectedBuildletStartDuration(),
+		func() {
+			st.logEventTime("starting_helpers")
+			st.getHelpers() // and ignore the result.
+		})
+}
+
+// getHelpers returns a channel of buildlet test helpers, with an item
+// sent as they become available. The channel is closed at the end.
+func (st *buildStatus) getHelpers() <-chan *buildlet.Client {
+	st.onceInitHelpers.Do(st.onceInitHelpersFunc)
+	return st.helpers
+}
+
+func (st *buildStatus) onceInitHelpersFunc() {
+	pool, _ := st.buildletPool() // won't return an error since we called it already
+	st.helpers = GetBuildlets(st.donec, pool, st.conf.NumTestHelpers, st.buildletType(), st.rev, st)
+}
+
+func (st *buildStatus) build() (retErr error) {
+	pool, err := st.buildletPool()
 	if err != nil {
 		return err
 	}
 	st.logEventTime("get_buildlet")
-	bc, err := pool.GetBuildlet(buildletType, st.rev, st)
+	bc, err := pool.GetBuildlet(nil, st.buildletType(), st.rev, st)
 	if err != nil {
 		return fmt.Errorf("failed to get a buildlet: %v", err)
 	}
@@ -1002,7 +1152,7 @@
 	st.bc = bc
 	st.mu.Unlock()
 
-	st.logEventTime("got_buildlet")
+	st.logEventTime("got_buildlet", bc.IPPort())
 	goodRes := func(res *http.Response, err error, what string) bool {
 		if err != nil {
 			retErr = fmt.Errorf("%s: %v", what, err)
@@ -1063,13 +1213,14 @@
 	var lastScript string
 	var remoteErr error
 	if st.conf.SplitMakeRun() {
+		st.getHelpersReadySoon()
 		makeScript := st.conf.MakeScript()
 		lastScript = makeScript
+		maket0 := time.Now()
 		remoteErr, err = bc.Exec(path.Join("go", makeScript), buildlet.ExecOpts{
 			Output: st,
 			OnStartExec: func() {
-				st.logEventTime("running_exec")
-				st.logEventTime("make_exec")
+				st.logEventTime("running_exec", makeScript)
 			},
 			ExtraEnv: st.conf.Env(),
 			Debug:    true,
@@ -1078,7 +1229,7 @@
 		if err != nil {
 			return err
 		}
-		st.logEventTime("make_done")
+		st.logEventTime("exec_done", fmt.Sprintf("%s in %v", makeScript, time.Since(maket0)))
 
 		if remoteErr == nil {
 			if err := st.cleanForSnapshot(); err != nil {
@@ -1089,40 +1240,23 @@
 				return fmt.Errorf("writeSnapshot: %v", err)
 			}
 
-			tests, err := st.distTestList()
+			lastScript = "runTests"
+			remoteErr, err = st.runTests(st.getHelpers())
 			if err != nil {
-				return fmt.Errorf("distTestList: %v", err)
+				return fmt.Errorf("runTests: %v", err)
 			}
-			fmt.Fprintf(st, "Number of dist tests to run: %d\n", len(tests))
-
-			runScript := st.conf.RunScript()
-			lastScript = runScript
-			remoteErr, err = bc.Exec(path.Join("go", runScript), buildlet.ExecOpts{
-				Output:      st,
-				OnStartExec: func() { st.logEventTime("run_exec") },
-				ExtraEnv:    st.conf.Env(),
-				// all.X sources make.X which adds $GOROOT/bin to $PATH,
-				// so run.X expects to find the go binary in $PATH.
-				Path:  []string{"$WORKDIR/go/bin", "$PATH"},
-				Debug: true,
-				Args:  st.conf.RunScriptArgs(),
-			})
-			if err != nil {
-				return err
-			}
-			st.logEventTime("run_done")
 		}
 	} else {
 		// Old way.
 		// TOOD(bradfitz,adg): delete this block when all builders
 		// can split make & run (and then delete the SplitMakeRun method)
+		st.logEventTime("legacy_all_path")
 		allScript := st.conf.AllScript()
 		lastScript = allScript
 		remoteErr, err = bc.Exec(path.Join("go", allScript), buildlet.ExecOpts{
 			Output: st,
 			OnStartExec: func() {
 				st.logEventTime("running_exec")
-				st.logEventTime("all_exec")
 			},
 			ExtraEnv: st.conf.Env(),
 			Debug:    true,
@@ -1131,9 +1265,17 @@
 		if err != nil {
 			return err
 		}
-		st.logEventTime("all_done")
 	}
-	st.logEventTime("done")
+	doneMsg := "all tests passed"
+	if remoteErr != nil {
+		doneMsg = "with test failures"
+	}
+	st.logEventTime("done", doneMsg) // "done" is a magic value
+
+	if devPause {
+		st.logEventTime("DEV_MAIN_SLEEP")
+		time.Sleep(5 * time.Minute)
+	}
 
 	if st.trySet == nil {
 		var buildLog string
@@ -1165,6 +1307,14 @@
 	return st.bc.RemoveAll(cleanForSnapshotFiles...)
 }
 
+func (st *buildStatus) snapshotObjectName() string {
+	return fmt.Sprintf("%v/%v/%v.tar.gz", "go", st.name, st.rev)
+}
+
+func (st *buildStatus) snapshotURL() string {
+	return fmt.Sprintf("https://storage.googleapis.com/%s/%s", snapBucket(), st.snapshotObjectName())
+}
+
 func (st *buildStatus) writeSnapshot() error {
 	st.logEventTime("write_snapshot")
 	defer st.logEventTime("write_snapshot_done")
@@ -1175,8 +1325,7 @@
 	}
 	defer tgz.Close()
 
-	objName := fmt.Sprintf("%v/%v/%v.tar.gz", "go", st.name, st.rev)
-	wr := storage.NewWriter(serviceCtx, snapBucket(), objName)
+	wr := storage.NewWriter(serviceCtx, snapBucket(), st.snapshotObjectName())
 	wr.ContentType = "application/octet-stream"
 	wr.ACL = append(wr.ACL, storage.ACLRule{Entity: storage.AllUsers, Role: storage.RoleReader})
 	if _, err := io.Copy(wr, tgz); err != nil {
@@ -1205,9 +1354,622 @@
 	return strings.Fields(buf.String()), nil
 }
 
+func (st *buildStatus) newTestSet(names []string) *testSet {
+	set := &testSet{
+		st:     st,
+		retryc: make(chan *testItem, len(names)),
+	}
+	for _, name := range names {
+		set.items = append(set.items, &testItem{
+			set:      set,
+			name:     name,
+			duration: testDuration(name),
+			take:     make(chan token, 1),
+			done:     make(chan token),
+		})
+	}
+	return set
+}
+
+func partitionGoTests(tests []string) (sets [][]string) {
+	var srcTests []string
+	var cmdTests []string
+	for _, name := range tests {
+		if strings.HasPrefix(name, "go_test:cmd/") {
+			cmdTests = append(cmdTests, name)
+		} else if strings.HasPrefix(name, "go_test:") {
+			srcTests = append(srcTests, name)
+		}
+	}
+	sort.Strings(srcTests)
+	sort.Strings(cmdTests)
+	goTests := append(srcTests, cmdTests...)
+
+	const sizeThres = 10 * time.Second
+
+	var curSet []string
+	var curDur time.Duration
+
+	flush := func() {
+		if len(curSet) > 0 {
+			sets = append(sets, curSet)
+			curSet = nil
+			curDur = 0
+		}
+	}
+	for _, name := range goTests {
+		d := testDuration(name) - minGoTestSpeed // subtract 'go' tool overhead
+		if curDur+d > sizeThres {
+			flush() // no-op if empty
+		}
+		curSet = append(curSet, name)
+		curDur += d
+	}
+
+	flush()
+	return
+}
+
+var minGoTestSpeed = (func() time.Duration {
+	var min Seconds
+	for name, secs := range fixedTestDuration {
+		if !strings.HasPrefix(name, "go_test:") {
+			continue
+		}
+		if min == 0 || secs < min {
+			min = secs
+		}
+	}
+	return min.Duration()
+})()
+
+type Seconds float64
+
+func (s Seconds) Duration() time.Duration {
+	return time.Duration(float64(s) * float64(time.Second))
+}
+
+// in seconds on Linux/amd64 (once on 2015-05-28), each
+// by themselves. There seems to be a 0.6s+ overhead
+// from the go tool which goes away if they're combined.
+var fixedTestDuration = map[string]Seconds{
+	"go_test:archive/tar":                    1.30,
+	"go_test:archive/zip":                    1.68,
+	"go_test:bufio":                          1.61,
+	"go_test:bytes":                          1.50,
+	"go_test:compress/bzip2":                 0.82,
+	"go_test:compress/flate":                 1.73,
+	"go_test:compress/gzip":                  0.82,
+	"go_test:compress/lzw":                   0.86,
+	"go_test:compress/zlib":                  1.78,
+	"go_test:container/heap":                 0.69,
+	"go_test:container/list":                 0.72,
+	"go_test:container/ring":                 0.64,
+	"go_test:crypto/aes":                     0.79,
+	"go_test:crypto/cipher":                  0.96,
+	"go_test:crypto/des":                     0.96,
+	"go_test:crypto/dsa":                     0.75,
+	"go_test:crypto/ecdsa":                   0.86,
+	"go_test:crypto/elliptic":                1.06,
+	"go_test:crypto/hmac":                    0.67,
+	"go_test:crypto/md5":                     0.77,
+	"go_test:crypto/rand":                    0.89,
+	"go_test:crypto/rc4":                     0.71,
+	"go_test:crypto/rsa":                     1.17,
+	"go_test:crypto/sha1":                    0.75,
+	"go_test:crypto/sha256":                  0.68,
+	"go_test:crypto/sha512":                  0.67,
+	"go_test:crypto/subtle":                  0.56,
+	"go_test:crypto/tls":                     3.29,
+	"go_test:crypto/x509":                    2.81,
+	"go_test:database/sql":                   1.75,
+	"go_test:database/sql/driver":            0.64,
+	"go_test:debug/dwarf":                    0.77,
+	"go_test:debug/elf":                      1.41,
+	"go_test:debug/gosym":                    1.45,
+	"go_test:debug/macho":                    0.97,
+	"go_test:debug/pe":                       0.79,
+	"go_test:debug/plan9obj":                 0.73,
+	"go_test:encoding/ascii85":               0.64,
+	"go_test:encoding/asn1":                  1.16,
+	"go_test:encoding/base32":                0.79,
+	"go_test:encoding/base64":                0.82,
+	"go_test:encoding/binary":                0.96,
+	"go_test:encoding/csv":                   0.67,
+	"go_test:encoding/gob":                   2.70,
+	"go_test:encoding/hex":                   0.66,
+	"go_test:encoding/json":                  2.20,
+	"test:errors":                            0.54,
+	"go_test:expvar":                         1.36,
+	"go_test:flag":                           0.92,
+	"go_test:fmt":                            2.02,
+	"go_test:go/ast":                         1.44,
+	"go_test:go/build":                       1.42,
+	"go_test:go/constant":                    0.92,
+	"go_test:go/doc":                         1.51,
+	"go_test:go/format":                      0.73,
+	"go_test:go/internal/gcimporter":         1.30,
+	"go_test:go/parser":                      1.30,
+	"go_test:go/printer":                     1.61,
+	"go_test:go/scanner":                     0.89,
+	"go_test:go/token":                       0.92,
+	"go_test:go/types":                       5.24,
+	"go_test:hash/adler32":                   0.62,
+	"go_test:hash/crc32":                     0.68,
+	"go_test:hash/crc64":                     0.55,
+	"go_test:hash/fnv":                       0.66,
+	"go_test:html":                           0.74,
+	"go_test:html/template":                  1.93,
+	"go_test:image":                          1.42,
+	"go_test:image/color":                    0.77,
+	"go_test:image/draw":                     1.32,
+	"go_test:image/gif":                      1.15,
+	"go_test:image/jpeg":                     1.32,
+	"go_test:image/png":                      1.23,
+	"go_test:index/suffixarray":              0.79,
+	"go_test:internal/singleflight":          0.66,
+	"go_test:io":                             0.97,
+	"go_test:io/ioutil":                      0.73,
+	"go_test:log":                            0.72,
+	"go_test:log/syslog":                     2.93,
+	"go_test:math":                           1.59,
+	"go_test:math/big":                       3.75,
+	"go_test:math/cmplx":                     0.81,
+	"go_test:math/rand":                      1.15,
+	"go_test:mime":                           1.01,
+	"go_test:mime/multipart":                 1.51,
+	"go_test:mime/quotedprintable":           0.95,
+	"go_test:net":                            6.71,
+	"go_test:net/http":                       9.41,
+	"go_test:net/http/cgi":                   2.00,
+	"go_test:net/http/cookiejar":             1.51,
+	"go_test:net/http/fcgi":                  1.43,
+	"go_test:net/http/httptest":              1.36,
+	"go_test:net/http/httputil":              1.54,
+	"go_test:net/http/internal":              0.68,
+	"go_test:net/internal/socktest":          0.58,
+	"go_test:net/mail":                       0.92,
+	"go_test:net/rpc":                        1.95,
+	"go_test:net/rpc/jsonrpc":                1.50,
+	"go_test:net/smtp":                       1.43,
+	"go_test:net/textproto":                  1.01,
+	"go_test:net/url":                        1.45,
+	"go_test:os":                             1.88,
+	"go_test:os/exec":                        2.13,
+	"go_test:os/signal":                      4.22,
+	"go_test:os/user":                        0.93,
+	"go_test:path":                           0.68,
+	"go_test:path/filepath":                  1.14,
+	"go_test:reflect":                        3.42,
+	"go_test:regexp":                         1.65,
+	"go_test:regexp/syntax":                  1.40,
+	"go_test:runtime":                        21.02,
+	"go_test:runtime/debug":                  0.79,
+	"go_test:runtime/pprof":                  8.01,
+	"go_test:sort":                           0.96,
+	"go_test:strconv":                        1.60,
+	"go_test:strings":                        1.51,
+	"go_test:sync":                           1.05,
+	"go_test:sync/atomic":                    1.13,
+	"go_test:syscall":                        1.69,
+	"go_test:testing":                        3.70,
+	"go_test:testing/quick":                  0.74,
+	"go_test:text/scanner":                   0.79,
+	"go_test:text/tabwriter":                 0.71,
+	"go_test:text/template":                  1.65,
+	"go_test:text/template/parse":            1.25,
+	"go_test:time":                           4.20,
+	"go_test:unicode":                        0.68,
+	"go_test:unicode/utf16":                  0.77,
+	"go_test:unicode/utf8":                   0.71,
+	"go_test:cmd/addr2line":                  1.73,
+	"go_test:cmd/api":                        1.33,
+	"go_test:cmd/asm/internal/asm":           1.24,
+	"go_test:cmd/asm/internal/lex":           0.91,
+	"go_test:cmd/compile/internal/big":       5.26,
+	"go_test:cmd/cover":                      3.32,
+	"go_test:cmd/fix":                        1.26,
+	"go_test:cmd/go":                         3.63,
+	"go_test:cmd/gofmt":                      1.06,
+	"go_test:cmd/internal/goobj":             0.65,
+	"go_test:cmd/internal/obj":               1.16,
+	"go_test:cmd/internal/obj/x86":           1.04,
+	"go_test:cmd/internal/rsc.io/arm/armasm": 1.92,
+	"go_test:cmd/internal/rsc.io/x86/x86asm": 2.22,
+	"go_test:cmd/newlink":                    1.48,
+	"go_test:cmd/nm":                         1.84,
+	"go_test:cmd/objdump":                    3.60,
+	"go_test:cmd/pack":                       2.64,
+	"go_test:cmd/pprof/internal/profile":     1.29,
+	"runtime:cpu124":                         44.78,
+	"sync_cpu":                               1.01,
+	"cgo_stdio":                              1.53,
+	"cgo_life":                               1.56,
+	"cgo_test":                               45.60,
+	"race":                                   42.55,
+	"testgodefs":                             2.37,
+	"testso":                                 2.72,
+	"testcarchive":                           11.11,
+	"testcshared":                            15.80,
+	"testshared":                             7.13,
+	"testasan":                               2.56,
+	"cgo_errors":                             7.03,
+	"testsigfwd":                             2.74,
+	"doc_progs":                              5.38,
+	"wiki":                                   3.56,
+	"shootout":                               11.34,
+	"bench_go1":                              3.72,
+	"test":                                   45, // old, but valid for a couple weeks from 2015-06-04
+	"test:0_5":                               10,
+	"test:1_5":                               10,
+	"test:2_5":                               10,
+	"test:3_5":                               10,
+	"test:4_5":                               10,
+	"codewalk":                               2.42,
+	"api":                                    7.38,
+}
+
+// testDuration predicts how long the dist test 'name' will take.
+// It's only a scheduling guess.
+func testDuration(name string) time.Duration {
+	if secs, ok := fixedTestDuration[name]; ok {
+		return secs.Duration()
+	}
+	return minGoTestSpeed * 2
+}
+
+// runTests is only called for builders which support a split make/run
+// (should be everything, at least soon). Currently (2015-05-27) iOS
+// and Android and Nacl may not. Untested.
+func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err error) {
+	testNames, err := st.distTestList()
+	if err != nil {
+		return nil, fmt.Errorf("distTestList: %v", err)
+	}
+	set := st.newTestSet(testNames)
+	st.logEventTime("starting_tests", fmt.Sprintf("%d tests", len(set.items)))
+	startTime := time.Now()
+
+	// We use our original buildlet to run the tests in order, to
+	// make the streaming somewhat smooth and not incredibly
+	// lumpy.  The rest of the buildlets run the largest tests
+	// first (critical path scheduling).
+	go func() {
+		goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
+		for tis := range set.itemsInOrder() {
+			st.runTestsOnBuildlet(st.bc, tis, goroot)
+		}
+	}()
+	helperWork := set.itemsBiggestFirst()
+	go func() {
+		for helper := range helpers {
+			go func(bc *buildlet.Client) {
+				defer st.logEventTime("closed_helper", bc.IPPort())
+				defer bc.Close()
+				if devPause {
+					defer time.Sleep(5 * time.Minute)
+					defer st.logEventTime("DEV_HELPER_SLEEP", bc.IPPort())
+				}
+				st.logEventTime("got_helper", bc.IPPort())
+				if err := bc.PutTarFromURL(st.snapshotURL(), "go"); err != nil {
+					log.Printf("failed to extract snapshot for helper %s: %v", bc.IPPort(), err)
+					return
+				}
+				workDir, err := bc.WorkDir()
+				if err != nil {
+					log.Printf("error discovering workdir for helper %s: %v", bc.IPPort(), err)
+					return
+				}
+				goroot := st.conf.FilePathJoin(workDir, "go")
+				st.logEventTime("setup_helper", bc.IPPort())
+				for tis := range helperWork {
+					st.runTestsOnBuildlet(bc, tis, goroot)
+				}
+			}(helper)
+		}
+	}()
+
+	var lastBanner string
+	var serialDuration time.Duration
+	for _, ti := range set.items {
+		<-ti.done // wait for success
+
+		serialDuration += ti.execDuration
+		if len(ti.output) > 0 {
+			banner, out := parseOutputAndBanner(ti.output)
+			if banner != lastBanner {
+				lastBanner = banner
+				fmt.Fprintf(st, "\n##### %s\n", banner)
+			}
+			if devCluster {
+				out = bytes.TrimSuffix(out, nl)
+				st.Write(out)
+				fmt.Fprintf(st, " (shard %s; par=%d)\n", ti.shardIPPort, ti.groupSize)
+			} else {
+				st.Write(out)
+			}
+		}
+
+		if ti.remoteErr != nil {
+			set.cancelAll()
+			return fmt.Errorf("dist test failed: %s: %v", ti.name, ti.remoteErr), nil
+		}
+	}
+	shardedDuration := time.Since(startTime)
+	st.logEventTime("tests_complete", fmt.Sprintf("took %v; aggregate %v; saved %v", shardedDuration, serialDuration, serialDuration-shardedDuration))
+	fmt.Fprintf(st, "\nAll tests passed.\n")
+	return nil, nil
+}
+
+const (
+	banner       = "XXXBANNERXXX:" // flag passed to dist
+	bannerPrefix = "\n" + banner   // with the newline added by dist
+)
+
+var bannerPrefixBytes = []byte(bannerPrefix)
+
+func parseOutputAndBanner(b []byte) (banner string, out []byte) {
+	if bytes.HasPrefix(b, bannerPrefixBytes) {
+		b = b[len(bannerPrefixBytes):]
+		nl := bytes.IndexByte(b, '\n')
+		if nl != -1 {
+			banner = string(b[:nl])
+			b = b[nl+1:]
+		}
+	}
+	return banner, b
+}
+
+// maxTestExecError is the number of test execution failures at which
+// we give up and stop trying and instead permanently fail the test.
+// Note that this is not related to whether the test failed remotely,
+// but whether we were unable to start or complete watching it run.
+// (A communication error)
+const maxTestExecErrors = 3
+
+// runTestsOnBuildlet runs tis on bc, using the optional goroot environment variable.
+func (st *buildStatus) runTestsOnBuildlet(bc *buildlet.Client, tis []*testItem, goroot string) {
+	names := make([]string, len(tis))
+	for i, ti := range tis {
+		names[i] = ti.name
+		if i > 0 && !strings.HasPrefix(ti.name, "go_test:") {
+			panic("only go_test:* tests may be merged")
+		}
+	}
+	which := fmt.Sprintf("%s: %v", bc.IPPort(), names)
+	st.logEventTime("start_tests", which)
+
+	// TODO(bradfitz,adg): a few weeks after
+	// https://go-review.googlesource.com/10688 is submitted,
+	// around Jun 18th 2015, remove this innerRx stuff and just
+	// pass a list of test names to dist instead. We don't want to
+	// do it right away, so people don't have to rebase their CLs
+	// to avoid trybot failures.
+	var innerRx string
+	if len(tis) > 1 {
+		innerRx = "(" + strings.Join(names, "|") + ")"
+	} else {
+		innerRx = names[0]
+	}
+
+	var buf bytes.Buffer
+	t0 := time.Now()
+	remoteErr, err := bc.Exec(path.Join("go", "bin", "go"), buildlet.ExecOpts{
+		// We set Dir to "." instead of the default ("go/bin") so when the dist tests
+		// try to run os/exec.Command("go", "test", ...), the LookPath of "go" doesn't
+		// return "./go.exe" (which exists in the current directory: "go/bin") and then
+		// fail when dist tries to run the binary in dir "$GOROOT/src", since
+		// "$GOROOT/src" + "./go.exe" doesn't exist. Perhaps LookPath should return
+		// an absolute path.
+		Dir:      ".",
+		Output:   &buf, // see "maybe stream lines" TODO below
+		ExtraEnv: append(st.conf.Env(), "GOROOT="+goroot),
+		Path:     []string{"$WORKDIR/go/bin", "$PATH"},
+		Args:     []string{"tool", "dist", "test", "--no-rebuild", "--banner=" + banner, "--run=^" + innerRx + "$"},
+	})
+	summary := "ok"
+	if err != nil {
+		summary = "commErr=" + err.Error()
+	} else if remoteErr != nil {
+		summary = "test failed remotely"
+	}
+	execDuration := time.Since(t0)
+	st.logEventTime("end_tests", fmt.Sprintf("%s; %s after %v", which, summary, execDuration))
+	if err != nil {
+		for _, ti := range tis {
+			ti.numFail++
+			st.logf("Execution error running %s on %s: %v (numFails = %d)", ti.name, bc, err, ti.numFail)
+			if ti.numFail >= maxTestExecErrors {
+				msg := fmt.Sprintf("Failed to schedule %q test after %d tries.\n", ti.name, maxTestExecErrors)
+				ti.output = []byte(msg)
+				ti.remoteErr = errors.New(msg)
+				close(ti.done)
+			} else {
+				ti.retry()
+			}
+		}
+		return
+	}
+
+	out := buf.Bytes()
+	out = bytes.Replace(out, []byte("\nALL TESTS PASSED (some were excluded)\n"), nil, 1)
+	out = bytes.Replace(out, []byte("\nALL TESTS PASSED\n"), nil, 1)
+
+	for _, ti := range tis {
+		ti.output = out
+		ti.remoteErr = remoteErr
+		ti.execDuration = execDuration
+		ti.groupSize = len(tis)
+		ti.shardIPPort = bc.IPPort()
+		close(ti.done)
+
+		// After the first one, make the rest succeed with no output.
+		// TODO: maybe stream lines (set Output to a line-reading
+		// Writer instead of &buf). for now we just wait for them in
+		// ~10 second batches.  Doesn't look as smooth on the output,
+		// though.
+		out = nil
+		remoteErr = nil
+		execDuration = 0
+	}
+}
+
+type testSet struct {
+	st    *buildStatus
+	items []*testItem
+
+	// retryc communicates failures to watch a test. The channel is
+	// never closed. Sends should also select on reading st.donec
+	// to see if the things have stopped early due to another test
+	// failing and aborting the build.
+	retryc chan *testItem
+}
+
+// cancelAll cancels all pending tests.
+func (s *testSet) cancelAll() {
+	for _, ti := range s.items {
+		ti.tryTake() // ignore return value
+	}
+}
+
+// itemsInOrder returns a channel of items mostly in their original order.
+// The exception is that an item which fails to execute may happen later
+// in a different order.
+// Each item sent in the channel has been took. (ti.tryTake returned true)
+// The returned channel is closed when no items remain.
+func (s *testSet) itemsInOrder() <-chan []*testItem {
+	return s.itemChan(s.items)
+}
+
+func (s *testSet) itemsBiggestFirst() <-chan []*testItem {
+	items := append([]*testItem(nil), s.items...)
+	sort.Sort(sort.Reverse(byTestDuration(items)))
+	return s.itemChan(items)
+}
+
+// itemChan returns a channel which yields the provided items, usually
+// in the same order given items, but grouped with others tests they
+// should be run with. (only stdlib tests are are grouped)
+func (s *testSet) itemChan(items []*testItem) <-chan []*testItem {
+	names := make([]string, len(items))
+	namedItem := map[string]*testItem{}
+	for i, ti := range items {
+		names[i] = ti.name
+		namedItem[ti.name] = ti
+	}
+	stdSets := partitionGoTests(names)
+	setForTest := map[string][]*testItem{}
+	for _, set := range stdSets {
+		tis := make([]*testItem, len(set))
+		for i, name := range set {
+			tis[i] = namedItem[name]
+			setForTest[name] = tis
+		}
+	}
+
+	ch := make(chan []*testItem)
+	go func() {
+		defer close(ch)
+		for _, ti := range items {
+			if !ti.tryTake() {
+				continue
+			}
+			send := []*testItem{ti}
+			for _, other := range setForTest[ti.name] {
+				if other != ti && other.tryTake() {
+					send = append(send, other)
+				}
+			}
+			select {
+			case ch <- send:
+			case <-s.st.donec:
+				return
+			}
+		}
+		for {
+			select {
+			case ti := <-s.retryc:
+				if ti.tryTake() {
+					select {
+					case ch <- []*testItem{ti}:
+					case <-s.st.donec:
+						return
+					}
+				}
+			case <-s.st.donec:
+				return
+			}
+		}
+	}()
+	return ch
+}
+
+type testItem struct {
+	set      *testSet
+	name     string        // "go_test:sort"
+	duration time.Duration // optional approximate size
+
+	take chan token // buffered size 1: sending takes ownership of rest of fields:
+
+	done    chan token // closed when done; guards output & failed
+	numFail int        // how many times it's failed to execute
+
+	// groupSize is the number of tests which were run together
+	// along with this one with "go dist test".
+	// This is 1 for non-std/cmd tests, and usually >1 for std/cmd tests.
+	groupSize   int
+	shardIPPort string // buildlet's IPPort, for debugging
+
+	// the following are only set for the first item in a group:
+	output       []byte
+	remoteErr    error         // real test failure (not a communications failure)
+	execDuration time.Duration // actual time
+}
+
+func (ti *testItem) tryTake() bool {
+	select {
+	case ti.take <- token{}:
+		return true
+	default:
+		return false
+	}
+}
+
+func (ti *testItem) isDone() bool {
+	select {
+	case <-ti.done:
+		return true
+	default:
+		return false
+	}
+}
+
+// retry reschedules the test to run again, if a machine died before
+// or during execution, so its results aren't yet known.
+// The caller must own the 'take' semaphore.
+func (ti *testItem) retry() {
+	// release it to make it available for somebody else to try later:
+	<-ti.take
+
+	// Enqueue this test to retry, unless the build is
+	// only proceeding to the first failure and it's
+	// already failed.
+	select {
+	case ti.set.retryc <- ti:
+	case <-ti.set.st.donec:
+	}
+}
+
+type byTestDuration []*testItem
+
+func (s byTestDuration) Len() int           { return len(s) }
+func (s byTestDuration) Less(i, j int) bool { return s[i].duration < s[j].duration }
+func (s byTestDuration) Swap(i, j int)      { s[i], s[j] = s[j], s[i] }
+
 type eventAndTime struct {
-	evt string
-	t   time.Time
+	t    time.Time
+	evt  string
+	text string
 }
 
 // buildStatus is the status of a build.
@@ -1219,6 +1981,9 @@
 	trySet    *trySet       // or nil
 	donec     chan struct{} // closed when done
 
+	onceInitHelpers sync.Once // guards call of onceInitHelpersFunc, to init::
+	helpers         <-chan *buildlet.Client
+
 	mu        sync.Mutex       // guards following
 	bc        *buildlet.Client // nil initially, until pool returns one
 	done      time.Time        // finished running
@@ -1245,10 +2010,25 @@
 
 func (st *buildStatus) isRunningLocked() bool { return st.done.IsZero() }
 
-func (st *buildStatus) logEventTime(event string) {
+func (st *buildStatus) logf(format string, args ...interface{}) {
+	log.Printf("[build %s %s]: %s", st.name, st.rev, fmt.Sprintf(format, args...))
+}
+
+func (st *buildStatus) logEventTime(event string, optText ...string) {
 	st.mu.Lock()
 	defer st.mu.Unlock()
-	st.events = append(st.events, eventAndTime{event, time.Now()})
+	var text string
+	if len(optText) > 0 {
+		if len(optText) > 1 {
+			panic("usage")
+		}
+		text = optText[0]
+	}
+	st.events = append(st.events, eventAndTime{
+		t:    time.Now(),
+		evt:  event,
+		text: text,
+	})
 }
 
 func (st *buildStatus) hasEvent(event string) bool {
@@ -1305,18 +2085,27 @@
 }
 
 // st.mu must be held.
-func (st *buildStatus) writeEventsLocked(w io.Writer, html bool) {
+func (st *buildStatus) writeEventsLocked(w io.Writer, htmlMode bool) {
+	var lastT time.Time
 	for i, evt := range st.events {
+		lastT = evt.t
 		var elapsed string
 		if i != 0 {
 			elapsed = fmt.Sprintf("+%0.1fs", evt.t.Sub(st.events[i-1].t).Seconds())
 		}
-		msg := evt.evt
-		if msg == "running_exec" && html {
-			msg = fmt.Sprintf("<a href='%s'>%s</a>", st.logsURL(), msg)
+		e := evt.evt
+		text := evt.text
+		if htmlMode {
+			if e == "running_exec" {
+				e = fmt.Sprintf("<a href='%s'>%s</a>", st.logsURL(), e)
+			}
+			e = "<b>" + e + "</b>"
+			text = "<i>" + html.EscapeString(text) + "</i>"
 		}
-		fmt.Fprintf(w, " %7s %v %s\n", elapsed, evt.t.Format(time.RFC3339), msg)
+		fmt.Fprintf(w, " %7s %v %s %s\n", elapsed, evt.t.Format(time.RFC3339), e, text)
 	}
+	fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds()))
+
 }
 
 func (st *buildStatus) logs() string {
@@ -1424,3 +2213,5 @@
 	check(zw.Close())
 	return bytes.NewReader(buf.Bytes())
 }
+
+var nl = []byte("\n")