cmd/coordinator: test sharding, better status, logs
Also gomote updates which came about during the process of developing
and debugging this.
Change-Id: Ia53d674118a6b99bcdda7062d3b7161279b6ad52
Reviewed-on: https://go-review.googlesource.com/10463
Reviewed-by: Andrew Gerrand <adg@golang.org>
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index 89de7bd..0aac886 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -25,6 +25,7 @@
"os"
"path"
"runtime"
+ "sort"
"strings"
"sync"
"time"
@@ -40,6 +41,12 @@
var processStartTime = time.Now()
+var Version string // set by linker -X
+
+// devPause is a debug option to pause for 5 minutes after the build
+// finishes before destroying buildlets.
+const devPause = false
+
func init() {
// Disabled until we have test sharding. This takes 85+ minutes.
// Test sharding is https://github.com/golang/go/issues/10029
@@ -67,7 +74,6 @@
// LOCK ORDER:
// statusMu, buildStatus.mu, trySet.mu
-// TODO(bradfitz,adg): rewrite the coordinator
var (
startTime = time.Now()
@@ -95,7 +101,7 @@
"windows-amd64-gce",
"openbsd-386-gce56",
"openbsd-amd64-gce56",
- "plan9-386-gcepartial",
+ "plan9-386",
"nacl-386",
"nacl-amd64p32",
"linux-arm-shard_test",
@@ -251,7 +257,7 @@
func main() {
flag.Parse()
-
+ log.Printf("coordinator version %q starting", Version)
err := initGCE()
if err != nil {
if *mode == "" {
@@ -297,6 +303,7 @@
go gcePool.cleanUpOldVMs()
if devCluster {
+ dashboard.BuildletBucket = "dev-go-builder-data"
// Only run the linux-amd64 builder in the dev cluster (for now).
dashboard.Builders = devClusterBuilders()
}
@@ -337,11 +344,11 @@
m := map[string]dashboard.BuildConfig{}
for _, name := range []string{
"linux-amd64",
+ "linux-amd64-race",
"windows-amd64-gce",
+ "plan9-386",
} {
- conf := dashboard.Builders[name]
- conf.SetBuildletBinaryURL(strings.Replace(conf.BuildletBinaryURL(), "go-builder-data", "dev-go-builder-data", 1))
- m[name] = conf
+ m[name] = dashboard.Builders[name]
}
return m
}
@@ -366,6 +373,9 @@
if isBuilding(rev) {
return false
}
+ if devCluster && numCurrentBuilds() != 0 {
+ return false
+ }
if dashboard.Builders[rev.name].IsReverse {
return reversePool.CanBuild(rev.name)
}
@@ -561,6 +571,11 @@
// for the main dashboard. It does not support gccgo.
// TODO(bradfitz): it also currently does not support subrepos.
func findWorkLoop(work chan<- builderRev) {
+ // Useful for debugging a single run:
+ if devCluster && false {
+ work <- builderRev{name: "linux-amd64-race", rev: "54789eff385780c54254f822e09505b6222918e2"}
+ return
+ }
ticker := time.NewTicker(15 * time.Second)
for {
if err := findWork(work); err != nil {
@@ -921,7 +936,23 @@
}
type eventTimeLogger interface {
- logEventTime(name string)
+ logEventTime(event string, optText ...string)
+}
+
+var ErrCanceled = errors.New("canceled")
+
+// Cancel is a channel that's closed by the caller when the request is no longer
+// desired. The function receiving a cancel should return ErrCanceled whenever
+// Cancel becomes readable.
+type Cancel <-chan struct{}
+
+func (c Cancel) IsCanceled() bool {
+ select {
+ case <-c:
+ return true
+ default:
+ return false
+ }
}
type BuildletPool interface {
@@ -933,11 +964,44 @@
// anything except for log messages or VM naming.
//
// Clients must Close when done with the client.
- GetBuildlet(machineType, rev string, el eventTimeLogger) (*buildlet.Client, error)
+ GetBuildlet(cancel Cancel, machineType, rev string, el eventTimeLogger) (*buildlet.Client, error)
String() string // TODO(bradfitz): more status stuff
}
+// GetBuildlets creates up to n buildlets and sends them on the returned channel
+// before closing the channel.
+func GetBuildlets(cancel Cancel, pool BuildletPool, n int, machineType, rev string, el eventTimeLogger) <-chan *buildlet.Client {
+ ch := make(chan *buildlet.Client) // NOT buffered
+ var wg sync.WaitGroup
+ wg.Add(n)
+ for i := 0; i < n; i++ {
+ go func() {
+ defer wg.Done()
+ bc, err := pool.GetBuildlet(cancel, machineType, rev, el)
+ if err != nil {
+ if err != ErrCanceled {
+ log.Printf("failed to get a %s buildlet for rev %s: %v", machineType, rev, err)
+ }
+ return
+ }
+ el.logEventTime("helper_ready")
+ select {
+ case ch <- bc:
+ case <-cancel:
+ el.logEventTime("helper_killed_before_use")
+ bc.Close()
+ return
+ }
+ }()
+ }
+ go func() {
+ wg.Wait()
+ close(ch)
+ }()
+ return ch
+}
+
func poolForConf(conf dashboard.BuildConfig) (BuildletPool, error) {
if conf.VMImage != "" {
return gcePool, nil
@@ -979,21 +1043,107 @@
}()
}
-func (st *buildStatus) build() (retErr error) {
- buildletType := st.conf.BuildletType
- if buildletType == "" {
- buildletType = st.conf.Name
+func (st *buildStatus) buildletType() string {
+ if v := st.conf.BuildletType; v != "" {
+ return v
}
+ return st.conf.Name
+}
+
+func (st *buildStatus) buildletPool() (BuildletPool, error) {
+ buildletType := st.buildletType()
bconf, ok := dashboard.Builders[buildletType]
if !ok {
- return fmt.Errorf("invalid BuildletType %q for %q", buildletType, st.conf.Name)
+ return nil, fmt.Errorf("invalid BuildletType %q for %q", buildletType, st.conf.Name)
}
- pool, err := poolForConf(bconf)
+ return poolForConf(bconf)
+}
+
+func (st *buildStatus) expectedMakeBashDuration() time.Duration {
+ // TODO: base this on historical measurements, instead of statically configured.
+ // TODO: move this to dashboard/builders.go? But once we based on on historical
+ // measurements, it'll need GCE services (bigtable/bigquery?), so it's probably
+ // better in this file.
+ goos, goarch := st.conf.GOOS(), st.conf.GOARCH()
+
+ if goos == "plan9" {
+ return 2500 * time.Millisecond
+ }
+ if goos == "linux" {
+ if goarch == "arm" {
+ return 4 * time.Minute
+ }
+ return 1000 * time.Millisecond
+ }
+ if goos == "windows" {
+ return 1000 * time.Millisecond
+ }
+
+ return 1500 * time.Millisecond
+}
+
+func (st *buildStatus) expectedBuildletStartDuration() time.Duration {
+ // TODO: move this to dashboard/builders.go? But once we based on on historical
+ // measurements, it'll need GCE services (bigtable/bigquery?), so it's probably
+ // better in this file.
+ pool, _ := st.buildletPool()
+ switch pool.(type) {
+ case *gceBuildletPool:
+ return time.Minute
+ case *reverseBuildletPool:
+ goos, arch := st.conf.GOOS(), st.conf.GOARCH()
+ if goos == "darwin" {
+ if arch == "arm" && arch == "arm64" {
+ // iOS; idle or it's not.
+ return 0
+ }
+ if arch == "amd64" || arch == "386" {
+ return 0 // TODO: remove this once we're using VMware
+ return 1 * time.Minute // VMware boot of hermetic OS X
+ }
+ }
+ if goos == "linux" && arch == "arm" {
+ // Scaleway. Ready or not.
+ return 0
+ }
+ }
+ return 0
+}
+
+// getHelpersReadySoon waits a bit (as a function of the build
+// configuration) and starts getting the buildlets for test sharding
+// ready, such that they're ready when make.bash is done. But we don't
+// want to start too early, lest we waste idle resources during make.bash.
+func (st *buildStatus) getHelpersReadySoon() {
+ if st.conf.NumTestHelpers == 0 {
+ return
+ }
+ time.AfterFunc(st.expectedMakeBashDuration()-st.expectedBuildletStartDuration(),
+ func() {
+ st.logEventTime("starting_helpers")
+ st.getHelpers() // and ignore the result.
+ })
+}
+
+// getHelpers returns a channel of buildlet test helpers, with an item
+// sent as they become available. The channel is closed at the end.
+func (st *buildStatus) getHelpers() <-chan *buildlet.Client {
+ st.onceInitHelpers.Do(st.onceInitHelpersFunc)
+ return st.helpers
+}
+
+func (st *buildStatus) onceInitHelpersFunc() {
+ pool, _ := st.buildletPool() // won't return an error since we called it already
+ st.helpers = GetBuildlets(st.donec, pool, st.conf.NumTestHelpers, st.buildletType(), st.rev, st)
+}
+
+func (st *buildStatus) build() (retErr error) {
+ pool, err := st.buildletPool()
if err != nil {
return err
}
st.logEventTime("get_buildlet")
- bc, err := pool.GetBuildlet(buildletType, st.rev, st)
+ bc, err := pool.GetBuildlet(nil, st.buildletType(), st.rev, st)
if err != nil {
return fmt.Errorf("failed to get a buildlet: %v", err)
}
@@ -1002,7 +1152,7 @@
st.bc = bc
st.mu.Unlock()
- st.logEventTime("got_buildlet")
+ st.logEventTime("got_buildlet", bc.IPPort())
goodRes := func(res *http.Response, err error, what string) bool {
if err != nil {
retErr = fmt.Errorf("%s: %v", what, err)
@@ -1063,13 +1213,14 @@
var lastScript string
var remoteErr error
if st.conf.SplitMakeRun() {
+ st.getHelpersReadySoon()
makeScript := st.conf.MakeScript()
lastScript = makeScript
+ maket0 := time.Now()
remoteErr, err = bc.Exec(path.Join("go", makeScript), buildlet.ExecOpts{
Output: st,
OnStartExec: func() {
- st.logEventTime("running_exec")
- st.logEventTime("make_exec")
+ st.logEventTime("running_exec", makeScript)
},
ExtraEnv: st.conf.Env(),
Debug: true,
@@ -1078,7 +1229,7 @@
if err != nil {
return err
}
- st.logEventTime("make_done")
+ st.logEventTime("exec_done", fmt.Sprintf("%s in %v", makeScript, time.Since(maket0)))
if remoteErr == nil {
if err := st.cleanForSnapshot(); err != nil {
@@ -1089,40 +1240,23 @@
return fmt.Errorf("writeSnapshot: %v", err)
}
- tests, err := st.distTestList()
+ lastScript = "runTests"
+ remoteErr, err = st.runTests(st.getHelpers())
if err != nil {
- return fmt.Errorf("distTestList: %v", err)
+ return fmt.Errorf("runTests: %v", err)
}
- fmt.Fprintf(st, "Number of dist tests to run: %d\n", len(tests))
-
- runScript := st.conf.RunScript()
- lastScript = runScript
- remoteErr, err = bc.Exec(path.Join("go", runScript), buildlet.ExecOpts{
- Output: st,
- OnStartExec: func() { st.logEventTime("run_exec") },
- ExtraEnv: st.conf.Env(),
- // all.X sources make.X which adds $GOROOT/bin to $PATH,
- // so run.X expects to find the go binary in $PATH.
- Path: []string{"$WORKDIR/go/bin", "$PATH"},
- Debug: true,
- Args: st.conf.RunScriptArgs(),
- })
- if err != nil {
- return err
- }
- st.logEventTime("run_done")
}
} else {
// Old way.
// TOOD(bradfitz,adg): delete this block when all builders
// can split make & run (and then delete the SplitMakeRun method)
+ st.logEventTime("legacy_all_path")
allScript := st.conf.AllScript()
lastScript = allScript
remoteErr, err = bc.Exec(path.Join("go", allScript), buildlet.ExecOpts{
Output: st,
OnStartExec: func() {
st.logEventTime("running_exec")
- st.logEventTime("all_exec")
},
ExtraEnv: st.conf.Env(),
Debug: true,
@@ -1131,9 +1265,17 @@
if err != nil {
return err
}
- st.logEventTime("all_done")
}
- st.logEventTime("done")
+ doneMsg := "all tests passed"
+ if remoteErr != nil {
+ doneMsg = "with test failures"
+ }
+ st.logEventTime("done", doneMsg) // "done" is a magic value
+
+ if devPause {
+ st.logEventTime("DEV_MAIN_SLEEP")
+ time.Sleep(5 * time.Minute)
+ }
if st.trySet == nil {
var buildLog string
@@ -1165,6 +1307,14 @@
return st.bc.RemoveAll(cleanForSnapshotFiles...)
}
+func (st *buildStatus) snapshotObjectName() string {
+ return fmt.Sprintf("%v/%v/%v.tar.gz", "go", st.name, st.rev)
+}
+
+func (st *buildStatus) snapshotURL() string {
+ return fmt.Sprintf("https://storage.googleapis.com/%s/%s", snapBucket(), st.snapshotObjectName())
+}
+
func (st *buildStatus) writeSnapshot() error {
st.logEventTime("write_snapshot")
defer st.logEventTime("write_snapshot_done")
@@ -1175,8 +1325,7 @@
}
defer tgz.Close()
- objName := fmt.Sprintf("%v/%v/%v.tar.gz", "go", st.name, st.rev)
- wr := storage.NewWriter(serviceCtx, snapBucket(), objName)
+ wr := storage.NewWriter(serviceCtx, snapBucket(), st.snapshotObjectName())
wr.ContentType = "application/octet-stream"
wr.ACL = append(wr.ACL, storage.ACLRule{Entity: storage.AllUsers, Role: storage.RoleReader})
if _, err := io.Copy(wr, tgz); err != nil {
@@ -1205,9 +1354,622 @@
return strings.Fields(buf.String()), nil
}
+func (st *buildStatus) newTestSet(names []string) *testSet {
+ set := &testSet{
+ st: st,
+ retryc: make(chan *testItem, len(names)),
+ }
+ for _, name := range names {
+ set.items = append(set.items, &testItem{
+ set: set,
+ name: name,
+ duration: testDuration(name),
+ take: make(chan token, 1),
+ done: make(chan token),
+ })
+ }
+ return set
+}
+
+func partitionGoTests(tests []string) (sets [][]string) {
+ var srcTests []string
+ var cmdTests []string
+ for _, name := range tests {
+ if strings.HasPrefix(name, "go_test:cmd/") {
+ cmdTests = append(cmdTests, name)
+ } else if strings.HasPrefix(name, "go_test:") {
+ srcTests = append(srcTests, name)
+ }
+ }
+ sort.Strings(srcTests)
+ sort.Strings(cmdTests)
+ goTests := append(srcTests, cmdTests...)
+
+ const sizeThres = 10 * time.Second
+
+ var curSet []string
+ var curDur time.Duration
+
+ flush := func() {
+ if len(curSet) > 0 {
+ sets = append(sets, curSet)
+ curSet = nil
+ curDur = 0
+ }
+ }
+ for _, name := range goTests {
+ d := testDuration(name) - minGoTestSpeed // subtract 'go' tool overhead
+ if curDur+d > sizeThres {
+ flush() // no-op if empty
+ }
+ curSet = append(curSet, name)
+ curDur += d
+ }
+
+ flush()
+ return
+}
+
+var minGoTestSpeed = (func() time.Duration {
+ var min Seconds
+ for name, secs := range fixedTestDuration {
+ if !strings.HasPrefix(name, "go_test:") {
+ continue
+ }
+ if min == 0 || secs < min {
+ min = secs
+ }
+ }
+ return min.Duration()
+})()
+
+type Seconds float64
+
+func (s Seconds) Duration() time.Duration {
+ return time.Duration(float64(s) * float64(time.Second))
+}
+
+// in seconds on Linux/amd64 (once on 2015-05-28), each
+// by themselves. There seems to be a 0.6s+ overhead
+// from the go tool which goes away if they're combined.
+var fixedTestDuration = map[string]Seconds{
+ "go_test:archive/tar": 1.30,
+ "go_test:archive/zip": 1.68,
+ "go_test:bufio": 1.61,
+ "go_test:bytes": 1.50,
+ "go_test:compress/bzip2": 0.82,
+ "go_test:compress/flate": 1.73,
+ "go_test:compress/gzip": 0.82,
+ "go_test:compress/lzw": 0.86,
+ "go_test:compress/zlib": 1.78,
+ "go_test:container/heap": 0.69,
+ "go_test:container/list": 0.72,
+ "go_test:container/ring": 0.64,
+ "go_test:crypto/aes": 0.79,
+ "go_test:crypto/cipher": 0.96,
+ "go_test:crypto/des": 0.96,
+ "go_test:crypto/dsa": 0.75,
+ "go_test:crypto/ecdsa": 0.86,
+ "go_test:crypto/elliptic": 1.06,
+ "go_test:crypto/hmac": 0.67,
+ "go_test:crypto/md5": 0.77,
+ "go_test:crypto/rand": 0.89,
+ "go_test:crypto/rc4": 0.71,
+ "go_test:crypto/rsa": 1.17,
+ "go_test:crypto/sha1": 0.75,
+ "go_test:crypto/sha256": 0.68,
+ "go_test:crypto/sha512": 0.67,
+ "go_test:crypto/subtle": 0.56,
+ "go_test:crypto/tls": 3.29,
+ "go_test:crypto/x509": 2.81,
+ "go_test:database/sql": 1.75,
+ "go_test:database/sql/driver": 0.64,
+ "go_test:debug/dwarf": 0.77,
+ "go_test:debug/elf": 1.41,
+ "go_test:debug/gosym": 1.45,
+ "go_test:debug/macho": 0.97,
+ "go_test:debug/pe": 0.79,
+ "go_test:debug/plan9obj": 0.73,
+ "go_test:encoding/ascii85": 0.64,
+ "go_test:encoding/asn1": 1.16,
+ "go_test:encoding/base32": 0.79,
+ "go_test:encoding/base64": 0.82,
+ "go_test:encoding/binary": 0.96,
+ "go_test:encoding/csv": 0.67,
+ "go_test:encoding/gob": 2.70,
+ "go_test:encoding/hex": 0.66,
+ "go_test:encoding/json": 2.20,
+ "test:errors": 0.54,
+ "go_test:expvar": 1.36,
+ "go_test:flag": 0.92,
+ "go_test:fmt": 2.02,
+ "go_test:go/ast": 1.44,
+ "go_test:go/build": 1.42,
+ "go_test:go/constant": 0.92,
+ "go_test:go/doc": 1.51,
+ "go_test:go/format": 0.73,
+ "go_test:go/internal/gcimporter": 1.30,
+ "go_test:go/parser": 1.30,
+ "go_test:go/printer": 1.61,
+ "go_test:go/scanner": 0.89,
+ "go_test:go/token": 0.92,
+ "go_test:go/types": 5.24,
+ "go_test:hash/adler32": 0.62,
+ "go_test:hash/crc32": 0.68,
+ "go_test:hash/crc64": 0.55,
+ "go_test:hash/fnv": 0.66,
+ "go_test:html": 0.74,
+ "go_test:html/template": 1.93,
+ "go_test:image": 1.42,
+ "go_test:image/color": 0.77,
+ "go_test:image/draw": 1.32,
+ "go_test:image/gif": 1.15,
+ "go_test:image/jpeg": 1.32,
+ "go_test:image/png": 1.23,
+ "go_test:index/suffixarray": 0.79,
+ "go_test:internal/singleflight": 0.66,
+ "go_test:io": 0.97,
+ "go_test:io/ioutil": 0.73,
+ "go_test:log": 0.72,
+ "go_test:log/syslog": 2.93,
+ "go_test:math": 1.59,
+ "go_test:math/big": 3.75,
+ "go_test:math/cmplx": 0.81,
+ "go_test:math/rand": 1.15,
+ "go_test:mime": 1.01,
+ "go_test:mime/multipart": 1.51,
+ "go_test:mime/quotedprintable": 0.95,
+ "go_test:net": 6.71,
+ "go_test:net/http": 9.41,
+ "go_test:net/http/cgi": 2.00,
+ "go_test:net/http/cookiejar": 1.51,
+ "go_test:net/http/fcgi": 1.43,
+ "go_test:net/http/httptest": 1.36,
+ "go_test:net/http/httputil": 1.54,
+ "go_test:net/http/internal": 0.68,
+ "go_test:net/internal/socktest": 0.58,
+ "go_test:net/mail": 0.92,
+ "go_test:net/rpc": 1.95,
+ "go_test:net/rpc/jsonrpc": 1.50,
+ "go_test:net/smtp": 1.43,
+ "go_test:net/textproto": 1.01,
+ "go_test:net/url": 1.45,
+ "go_test:os": 1.88,
+ "go_test:os/exec": 2.13,
+ "go_test:os/signal": 4.22,
+ "go_test:os/user": 0.93,
+ "go_test:path": 0.68,
+ "go_test:path/filepath": 1.14,
+ "go_test:reflect": 3.42,
+ "go_test:regexp": 1.65,
+ "go_test:regexp/syntax": 1.40,
+ "go_test:runtime": 21.02,
+ "go_test:runtime/debug": 0.79,
+ "go_test:runtime/pprof": 8.01,
+ "go_test:sort": 0.96,
+ "go_test:strconv": 1.60,
+ "go_test:strings": 1.51,
+ "go_test:sync": 1.05,
+ "go_test:sync/atomic": 1.13,
+ "go_test:syscall": 1.69,
+ "go_test:testing": 3.70,
+ "go_test:testing/quick": 0.74,
+ "go_test:text/scanner": 0.79,
+ "go_test:text/tabwriter": 0.71,
+ "go_test:text/template": 1.65,
+ "go_test:text/template/parse": 1.25,
+ "go_test:time": 4.20,
+ "go_test:unicode": 0.68,
+ "go_test:unicode/utf16": 0.77,
+ "go_test:unicode/utf8": 0.71,
+ "go_test:cmd/addr2line": 1.73,
+ "go_test:cmd/api": 1.33,
+ "go_test:cmd/asm/internal/asm": 1.24,
+ "go_test:cmd/asm/internal/lex": 0.91,
+ "go_test:cmd/compile/internal/big": 5.26,
+ "go_test:cmd/cover": 3.32,
+ "go_test:cmd/fix": 1.26,
+ "go_test:cmd/go": 3.63,
+ "go_test:cmd/gofmt": 1.06,
+ "go_test:cmd/internal/goobj": 0.65,
+ "go_test:cmd/internal/obj": 1.16,
+ "go_test:cmd/internal/obj/x86": 1.04,
+ "go_test:cmd/internal/rsc.io/arm/armasm": 1.92,
+ "go_test:cmd/internal/rsc.io/x86/x86asm": 2.22,
+ "go_test:cmd/newlink": 1.48,
+ "go_test:cmd/nm": 1.84,
+ "go_test:cmd/objdump": 3.60,
+ "go_test:cmd/pack": 2.64,
+ "go_test:cmd/pprof/internal/profile": 1.29,
+ "runtime:cpu124": 44.78,
+ "sync_cpu": 1.01,
+ "cgo_stdio": 1.53,
+ "cgo_life": 1.56,
+ "cgo_test": 45.60,
+ "race": 42.55,
+ "testgodefs": 2.37,
+ "testso": 2.72,
+ "testcarchive": 11.11,
+ "testcshared": 15.80,
+ "testshared": 7.13,
+ "testasan": 2.56,
+ "cgo_errors": 7.03,
+ "testsigfwd": 2.74,
+ "doc_progs": 5.38,
+ "wiki": 3.56,
+ "shootout": 11.34,
+ "bench_go1": 3.72,
+ "test": 45, // old, but valid for a couple weeks from 2015-06-04
+ "test:0_5": 10,
+ "test:1_5": 10,
+ "test:2_5": 10,
+ "test:3_5": 10,
+ "test:4_5": 10,
+ "codewalk": 2.42,
+ "api": 7.38,
+}
+
+// testDuration predicts how long the dist test 'name' will take.
+// It's only a scheduling guess.
+func testDuration(name string) time.Duration {
+ if secs, ok := fixedTestDuration[name]; ok {
+ return secs.Duration()
+ }
+ return minGoTestSpeed * 2
+}
+
+// runTests is only called for builders which support a split make/run
+// (should be everything, at least soon). Currently (2015-05-27) iOS
+// and Android and Nacl may not. Untested.
+func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err error) {
+ testNames, err := st.distTestList()
+ if err != nil {
+ return nil, fmt.Errorf("distTestList: %v", err)
+ }
+ set := st.newTestSet(testNames)
+ st.logEventTime("starting_tests", fmt.Sprintf("%d tests", len(set.items)))
+ startTime := time.Now()
+
+ // We use our original buildlet to run the tests in order, to
+ // make the streaming somewhat smooth and not incredibly
+ // lumpy. The rest of the buildlets run the largest tests
+ // first (critical path scheduling).
+ go func() {
+ goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
+ for tis := range set.itemsInOrder() {
+ st.runTestsOnBuildlet(st.bc, tis, goroot)
+ }
+ }()
+ helperWork := set.itemsBiggestFirst()
+ go func() {
+ for helper := range helpers {
+ go func(bc *buildlet.Client) {
+ defer st.logEventTime("closed_helper", bc.IPPort())
+ defer bc.Close()
+ if devPause {
+ defer time.Sleep(5 * time.Minute)
+ defer st.logEventTime("DEV_HELPER_SLEEP", bc.IPPort())
+ }
+ st.logEventTime("got_helper", bc.IPPort())
+ if err := bc.PutTarFromURL(st.snapshotURL(), "go"); err != nil {
+ log.Printf("failed to extract snapshot for helper %s: %v", bc.IPPort(), err)
+ return
+ }
+ workDir, err := bc.WorkDir()
+ if err != nil {
+ log.Printf("error discovering workdir for helper %s: %v", bc.IPPort(), err)
+ return
+ }
+ goroot := st.conf.FilePathJoin(workDir, "go")
+ st.logEventTime("setup_helper", bc.IPPort())
+ for tis := range helperWork {
+ st.runTestsOnBuildlet(bc, tis, goroot)
+ }
+ }(helper)
+ }
+ }()
+
+ var lastBanner string
+ var serialDuration time.Duration
+ for _, ti := range set.items {
+ <-ti.done // wait for success
+
+ serialDuration += ti.execDuration
+ if len(ti.output) > 0 {
+ banner, out := parseOutputAndBanner(ti.output)
+ if banner != lastBanner {
+ lastBanner = banner
+ fmt.Fprintf(st, "\n##### %s\n", banner)
+ }
+ if devCluster {
+ out = bytes.TrimSuffix(out, nl)
+ st.Write(out)
+ fmt.Fprintf(st, " (shard %s; par=%d)\n", ti.shardIPPort, ti.groupSize)
+ } else {
+ st.Write(out)
+ }
+ }
+
+ if ti.remoteErr != nil {
+ set.cancelAll()
+ return fmt.Errorf("dist test failed: %s: %v", ti.name, ti.remoteErr), nil
+ }
+ }
+ shardedDuration := time.Since(startTime)
+ st.logEventTime("tests_complete", fmt.Sprintf("took %v; aggregate %v; saved %v", shardedDuration, serialDuration, serialDuration-shardedDuration))
+ fmt.Fprintf(st, "\nAll tests passed.\n")
+ return nil, nil
+}
+
+const (
+ banner = "XXXBANNERXXX:" // flag passed to dist
+ bannerPrefix = "\n" + banner // with the newline added by dist
+)
+
+var bannerPrefixBytes = []byte(bannerPrefix)
+
+func parseOutputAndBanner(b []byte) (banner string, out []byte) {
+ if bytes.HasPrefix(b, bannerPrefixBytes) {
+ b = b[len(bannerPrefixBytes):]
+ nl := bytes.IndexByte(b, '\n')
+ if nl != -1 {
+ banner = string(b[:nl])
+ b = b[nl+1:]
+ }
+ }
+ return banner, b
+}
+
+// maxTestExecError is the number of test execution failures at which
+// we give up and stop trying and instead permanently fail the test.
+// Note that this is not related to whether the test failed remotely,
+// but whether we were unable to start or complete watching it run.
+// (A communication error)
+const maxTestExecErrors = 3
+
+// runTestsOnBuildlet runs tis on bc, using the optional goroot environment variable.
+func (st *buildStatus) runTestsOnBuildlet(bc *buildlet.Client, tis []*testItem, goroot string) {
+ names := make([]string, len(tis))
+ for i, ti := range tis {
+ names[i] = ti.name
+ if i > 0 && !strings.HasPrefix(ti.name, "go_test:") {
+ panic("only go_test:* tests may be merged")
+ }
+ }
+ which := fmt.Sprintf("%s: %v", bc.IPPort(), names)
+ st.logEventTime("start_tests", which)
+
+ // TODO(bradfitz,adg): a few weeks after
+ // https://go-review.googlesource.com/10688 is submitted,
+ // around Jun 18th 2015, remove this innerRx stuff and just
+ // pass a list of test names to dist instead. We don't want to
+ // do it right away, so people don't have to rebase their CLs
+ // to avoid trybot failures.
+ var innerRx string
+ if len(tis) > 1 {
+ innerRx = "(" + strings.Join(names, "|") + ")"
+ } else {
+ innerRx = names[0]
+ }
+
+ var buf bytes.Buffer
+ t0 := time.Now()
+ remoteErr, err := bc.Exec(path.Join("go", "bin", "go"), buildlet.ExecOpts{
+ // We set Dir to "." instead of the default ("go/bin") so when the dist tests
+ // try to run os/exec.Command("go", "test", ...), the LookPath of "go" doesn't
+ // return "./go.exe" (which exists in the current directory: "go/bin") and then
+ // fail when dist tries to run the binary in dir "$GOROOT/src", since
+ // "$GOROOT/src" + "./go.exe" doesn't exist. Perhaps LookPath should return
+ // an absolute path.
+ Dir: ".",
+ Output: &buf, // see "maybe stream lines" TODO below
+ ExtraEnv: append(st.conf.Env(), "GOROOT="+goroot),
+ Path: []string{"$WORKDIR/go/bin", "$PATH"},
+ Args: []string{"tool", "dist", "test", "--no-rebuild", "--banner=" + banner, "--run=^" + innerRx + "$"},
+ })
+ summary := "ok"
+ if err != nil {
+ summary = "commErr=" + err.Error()
+ } else if remoteErr != nil {
+ summary = "test failed remotely"
+ }
+ execDuration := time.Since(t0)
+ st.logEventTime("end_tests", fmt.Sprintf("%s; %s after %v", which, summary, execDuration))
+ if err != nil {
+ for _, ti := range tis {
+ ti.numFail++
+ st.logf("Execution error running %s on %s: %v (numFails = %d)", ti.name, bc, err, ti.numFail)
+ if ti.numFail >= maxTestExecErrors {
+ msg := fmt.Sprintf("Failed to schedule %q test after %d tries.\n", ti.name, maxTestExecErrors)
+ ti.output = []byte(msg)
+ ti.remoteErr = errors.New(msg)
+ close(ti.done)
+ } else {
+ ti.retry()
+ }
+ }
+ return
+ }
+
+ out := buf.Bytes()
+ out = bytes.Replace(out, []byte("\nALL TESTS PASSED (some were excluded)\n"), nil, 1)
+ out = bytes.Replace(out, []byte("\nALL TESTS PASSED\n"), nil, 1)
+
+ for _, ti := range tis {
+ ti.output = out
+ ti.remoteErr = remoteErr
+ ti.execDuration = execDuration
+ ti.groupSize = len(tis)
+ ti.shardIPPort = bc.IPPort()
+ close(ti.done)
+
+ // After the first one, make the rest succeed with no output.
+ // TODO: maybe stream lines (set Output to a line-reading
+ // Writer instead of &buf). for now we just wait for them in
+ // ~10 second batches. Doesn't look as smooth on the output,
+ // though.
+ out = nil
+ remoteErr = nil
+ execDuration = 0
+ }
+}
+
+type testSet struct {
+ st *buildStatus
+ items []*testItem
+
+ // retryc communicates failures to watch a test. The channel is
+ // never closed. Sends should also select on reading st.donec
+ // to see if the things have stopped early due to another test
+ // failing and aborting the build.
+ retryc chan *testItem
+}
+
+// cancelAll cancels all pending tests.
+func (s *testSet) cancelAll() {
+ for _, ti := range s.items {
+ ti.tryTake() // ignore return value
+ }
+}
+
+// itemsInOrder returns a channel of items mostly in their original order.
+// The exception is that an item which fails to execute may happen later
+// in a different order.
+// Each item sent in the channel has been took. (ti.tryTake returned true)
+// The returned channel is closed when no items remain.
+func (s *testSet) itemsInOrder() <-chan []*testItem {
+ return s.itemChan(s.items)
+}
+
+func (s *testSet) itemsBiggestFirst() <-chan []*testItem {
+ items := append([]*testItem(nil), s.items...)
+ sort.Sort(sort.Reverse(byTestDuration(items)))
+ return s.itemChan(items)
+}
+
+// itemChan returns a channel which yields the provided items, usually
+// in the same order given items, but grouped with others tests they
+// should be run with. (only stdlib tests are are grouped)
+func (s *testSet) itemChan(items []*testItem) <-chan []*testItem {
+ names := make([]string, len(items))
+ namedItem := map[string]*testItem{}
+ for i, ti := range items {
+ names[i] = ti.name
+ namedItem[ti.name] = ti
+ }
+ stdSets := partitionGoTests(names)
+ setForTest := map[string][]*testItem{}
+ for _, set := range stdSets {
+ tis := make([]*testItem, len(set))
+ for i, name := range set {
+ tis[i] = namedItem[name]
+ setForTest[name] = tis
+ }
+ }
+
+ ch := make(chan []*testItem)
+ go func() {
+ defer close(ch)
+ for _, ti := range items {
+ if !ti.tryTake() {
+ continue
+ }
+ send := []*testItem{ti}
+ for _, other := range setForTest[ti.name] {
+ if other != ti && other.tryTake() {
+ send = append(send, other)
+ }
+ }
+ select {
+ case ch <- send:
+ case <-s.st.donec:
+ return
+ }
+ }
+ for {
+ select {
+ case ti := <-s.retryc:
+ if ti.tryTake() {
+ select {
+ case ch <- []*testItem{ti}:
+ case <-s.st.donec:
+ return
+ }
+ }
+ case <-s.st.donec:
+ return
+ }
+ }
+ }()
+ return ch
+}
+
+type testItem struct {
+ set *testSet
+ name string // "go_test:sort"
+ duration time.Duration // optional approximate size
+
+ take chan token // buffered size 1: sending takes ownership of rest of fields:
+
+ done chan token // closed when done; guards output & failed
+ numFail int // how many times it's failed to execute
+
+ // groupSize is the number of tests which were run together
+ // along with this one with "go dist test".
+ // This is 1 for non-std/cmd tests, and usually >1 for std/cmd tests.
+ groupSize int
+ shardIPPort string // buildlet's IPPort, for debugging
+
+ // the following are only set for the first item in a group:
+ output []byte
+ remoteErr error // real test failure (not a communications failure)
+ execDuration time.Duration // actual time
+}
+
+func (ti *testItem) tryTake() bool {
+ select {
+ case ti.take <- token{}:
+ return true
+ default:
+ return false
+ }
+}
+
+func (ti *testItem) isDone() bool {
+ select {
+ case <-ti.done:
+ return true
+ default:
+ return false
+ }
+}
+
+// retry reschedules the test to run again, if a machine died before
+// or during execution, so its results aren't yet known.
+// The caller must own the 'take' semaphore.
+func (ti *testItem) retry() {
+ // release it to make it available for somebody else to try later:
+ <-ti.take
+
+ // Enqueue this test to retry, unless the build is
+ // only proceeding to the first failure and it's
+ // already failed.
+ select {
+ case ti.set.retryc <- ti:
+ case <-ti.set.st.donec:
+ }
+}
+
+type byTestDuration []*testItem
+
+func (s byTestDuration) Len() int { return len(s) }
+func (s byTestDuration) Less(i, j int) bool { return s[i].duration < s[j].duration }
+func (s byTestDuration) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
+
type eventAndTime struct {
- evt string
- t time.Time
+ t time.Time
+ evt string
+ text string
}
// buildStatus is the status of a build.
@@ -1219,6 +1981,9 @@
trySet *trySet // or nil
donec chan struct{} // closed when done
+ onceInitHelpers sync.Once // guards call of onceInitHelpersFunc, to init::
+ helpers <-chan *buildlet.Client
+
mu sync.Mutex // guards following
bc *buildlet.Client // nil initially, until pool returns one
done time.Time // finished running
@@ -1245,10 +2010,25 @@
func (st *buildStatus) isRunningLocked() bool { return st.done.IsZero() }
-func (st *buildStatus) logEventTime(event string) {
+func (st *buildStatus) logf(format string, args ...interface{}) {
+ log.Printf("[build %s %s]: %s", st.name, st.rev, fmt.Sprintf(format, args...))
+}
+
+func (st *buildStatus) logEventTime(event string, optText ...string) {
st.mu.Lock()
defer st.mu.Unlock()
- st.events = append(st.events, eventAndTime{event, time.Now()})
+ var text string
+ if len(optText) > 0 {
+ if len(optText) > 1 {
+ panic("usage")
+ }
+ text = optText[0]
+ }
+ st.events = append(st.events, eventAndTime{
+ t: time.Now(),
+ evt: event,
+ text: text,
+ })
}
func (st *buildStatus) hasEvent(event string) bool {
@@ -1305,18 +2085,27 @@
}
// st.mu must be held.
-func (st *buildStatus) writeEventsLocked(w io.Writer, html bool) {
+func (st *buildStatus) writeEventsLocked(w io.Writer, htmlMode bool) {
+ var lastT time.Time
for i, evt := range st.events {
+ lastT = evt.t
var elapsed string
if i != 0 {
elapsed = fmt.Sprintf("+%0.1fs", evt.t.Sub(st.events[i-1].t).Seconds())
}
- msg := evt.evt
- if msg == "running_exec" && html {
- msg = fmt.Sprintf("<a href='%s'>%s</a>", st.logsURL(), msg)
+ e := evt.evt
+ text := evt.text
+ if htmlMode {
+ if e == "running_exec" {
+ e = fmt.Sprintf("<a href='%s'>%s</a>", st.logsURL(), e)
+ }
+ e = "<b>" + e + "</b>"
+ text = "<i>" + html.EscapeString(text) + "</i>"
}
- fmt.Fprintf(w, " %7s %v %s\n", elapsed, evt.t.Format(time.RFC3339), msg)
+ fmt.Fprintf(w, " %7s %v %s %s\n", elapsed, evt.t.Format(time.RFC3339), e, text)
}
+ fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds()))
+
}
func (st *buildStatus) logs() string {
@@ -1424,3 +2213,5 @@
check(zw.Close())
return bytes.NewReader(buf.Bytes())
}
+
+var nl = []byte("\n")