buildlet, cmd/coordinator: GCE quota accounting, fixes
And deal with Preemptible resource exhaustion errors.
And change all-compile to misc-compile and only do the builders
not covered otherwise (Fixes #11073)
And make the watcher serve git source.
And cache and singleflight fetching of git source.
And a million other things.
Fixes golang/go#11073
Change-Id: I0f45610f0c6a06bd0c8ba9632b8624e00aeb52fc
Reviewed-on: https://go-review.googlesource.com/10750
Reviewed-by: Andrew Gerrand <adg@golang.org>
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index 0aac886..d7c9ccf 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -35,6 +35,8 @@
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/gerrit"
+ "golang.org/x/build/internal/lru"
+ "golang.org/x/build/internal/singleflight"
"golang.org/x/build/types"
"google.golang.org/cloud/storage"
)
@@ -90,7 +92,7 @@
func init() {
tryList := []string{
- "all-compile",
+ "misc-compile",
"darwin-amd64-10_10",
"linux-386",
"linux-amd64",
@@ -573,7 +575,8 @@
func findWorkLoop(work chan<- builderRev) {
// Useful for debugging a single run:
if devCluster && false {
- work <- builderRev{name: "linux-amd64-race", rev: "54789eff385780c54254f822e09505b6222918e2"}
+ work <- builderRev{name: "linux-amd64", rev: "54789eff385780c54254f822e09505b6222918e2"}
+ work <- builderRev{name: "windows-amd64-gce", rev: "54789eff385780c54254f822e09505b6222918e2"}
return
}
ticker := time.NewTicker(15 * time.Second)
@@ -780,12 +783,17 @@
msg := "TryBots beginning. Status page: http://farmer.golang.org/try?commit=" + ts.Commit[:8]
if ci, err := gerritClient.GetChangeDetail(ts.ChangeID); err == nil {
+ if len(ci.Messages) == 0 {
+ log.Printf("No Gerrit comments retrieved on %v", ts.ChangeID)
+ }
for _, cmi := range ci.Messages {
- if cmi.Message == msg {
+ if strings.Contains(cmi.Message, msg) {
// Dup. Don't spam.
return
}
}
+ } else {
+ log.Printf("Error getting Gerrit comments on %s: %v", ts.ChangeID, err)
}
// Ignore error. This isn't critical.
@@ -1137,7 +1145,7 @@
st.helpers = GetBuildlets(st.donec, pool, st.conf.NumTestHelpers, st.buildletType(), st.rev, st)
}
-func (st *buildStatus) build() (retErr error) {
+func (st *buildStatus) build() error {
pool, err := st.buildletPool()
if err != nil {
return err
@@ -1153,20 +1161,6 @@
st.mu.Unlock()
st.logEventTime("got_buildlet", bc.IPPort())
- goodRes := func(res *http.Response, err error, what string) bool {
- if err != nil {
- retErr = fmt.Errorf("%s: %v", what, err)
- return false
- }
- if res.StatusCode/100 != 2 {
- slurp, _ := ioutil.ReadAll(io.LimitReader(res.Body, 4<<10))
- retErr = fmt.Errorf("%s: %v; body: %s", what, res.Status, slurp)
- res.Body.Close()
- return false
-
- }
- return true
- }
// Write the VERSION file.
st.logEventTime("start_write_version_tar")
@@ -1174,19 +1168,15 @@
return fmt.Errorf("writing VERSION tgz: %v", err)
}
- // Feed the buildlet a tar file for it to extract.
- // TODO: cache these.
- st.logEventTime("start_fetch_gerrit_tgz")
- tarRes, err := http.Get("https://go.googlesource.com/go/+archive/" + st.rev + ".tar.gz")
- if !goodRes(tarRes, err, "fetching tarball from Gerrit") {
- return
- }
-
var grp syncutil.Group
grp.Go(func() error {
+ st.logEventTime("fetch_go_tar")
+ tarReader, err := getSourceTgz(st, "go", st.rev)
+ if err != nil {
+ return err
+ }
st.logEventTime("start_write_go_tar")
- if err := bc.PutTar(tarRes.Body, "go"); err != nil {
- tarRes.Body.Close()
+ if err := bc.PutTar(tarReader, "go"); err != nil {
return fmt.Errorf("writing tarball from Gerrit: %v", err)
}
st.logEventTime("end_write_go_tar")
@@ -1356,8 +1346,7 @@
func (st *buildStatus) newTestSet(names []string) *testSet {
set := &testSet{
- st: st,
- retryc: make(chan *testItem, len(names)),
+ st: st,
}
for _, name := range names {
set.items = append(set.items, &testItem{
@@ -1635,12 +1624,16 @@
// lumpy. The rest of the buildlets run the largest tests
// first (critical path scheduling).
go func() {
- goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
- for tis := range set.itemsInOrder() {
+ for {
+ tis, ok := set.testsToRunInOrder()
+ if !ok {
+ st.logEventTime("in_order_tests_complete")
+ return
+ }
+ goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
st.runTestsOnBuildlet(st.bc, tis, goroot)
}
}()
- helperWork := set.itemsBiggestFirst()
go func() {
for helper := range helpers {
go func(bc *buildlet.Client) {
@@ -1660,9 +1653,14 @@
log.Printf("error discovering workdir for helper %s: %v", bc.IPPort(), err)
return
}
- goroot := st.conf.FilePathJoin(workDir, "go")
st.logEventTime("setup_helper", bc.IPPort())
- for tis := range helperWork {
+ goroot := st.conf.FilePathJoin(workDir, "go")
+ for {
+ tis, ok := set.testsToRunBiggestFirst()
+ if !ok {
+ st.logEventTime("biggest_tests_complete", bc.IPPort())
+ return
+ }
st.runTestsOnBuildlet(bc, tis, goroot)
}
}(helper)
@@ -1672,7 +1670,15 @@
var lastBanner string
var serialDuration time.Duration
for _, ti := range set.items {
- <-ti.done // wait for success
+ AwaitDone:
+ for {
+ select {
+ case <-ti.done: // wait for success
+ break AwaitDone
+ case <-time.After(30 * time.Second):
+ st.logEventTime("still_waiting_on_test", ti.name)
+ }
+ }
serialDuration += ti.execDuration
if len(ti.output) > 0 {
@@ -1818,11 +1824,9 @@
st *buildStatus
items []*testItem
- // retryc communicates failures to watch a test. The channel is
- // never closed. Sends should also select on reading st.donec
- // to see if the things have stopped early due to another test
- // failing and aborting the build.
- retryc chan *testItem
+ mu sync.Mutex
+ inOrder [][]*testItem
+ biggestFirst [][]*testItem
}
// cancelAll cancels all pending tests.
@@ -1832,76 +1836,72 @@
}
}
-// itemsInOrder returns a channel of items mostly in their original order.
-// The exception is that an item which fails to execute may happen later
-// in a different order.
-// Each item sent in the channel has been took. (ti.tryTake returned true)
-// The returned channel is closed when no items remain.
-func (s *testSet) itemsInOrder() <-chan []*testItem {
- return s.itemChan(s.items)
+func (s *testSet) testsToRunInOrder() (chunk []*testItem, ok bool) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.inOrder == nil {
+ s.initInOrder()
+ }
+ return s.testsFromSlice(s.inOrder)
}
-func (s *testSet) itemsBiggestFirst() <-chan []*testItem {
- items := append([]*testItem(nil), s.items...)
- sort.Sort(sort.Reverse(byTestDuration(items)))
- return s.itemChan(items)
+func (s *testSet) testsToRunBiggestFirst() (chunk []*testItem, ok bool) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+ if s.biggestFirst == nil {
+ s.initBiggestFirst()
+ }
+ return s.testsFromSlice(s.biggestFirst)
}
-// itemChan returns a channel which yields the provided items, usually
-// in the same order given items, but grouped with others tests they
-// should be run with. (only stdlib tests are are grouped)
-func (s *testSet) itemChan(items []*testItem) <-chan []*testItem {
- names := make([]string, len(items))
+func (s *testSet) testsFromSlice(chunkList [][]*testItem) (chunk []*testItem, ok bool) {
+ for _, candChunk := range chunkList {
+ for _, ti := range candChunk {
+ if ti.tryTake() {
+ chunk = append(chunk, ti)
+ }
+ }
+ if len(chunk) > 0 {
+ return chunk, true
+ }
+ }
+ return nil, false
+}
+
+func (s *testSet) initInOrder() {
+ names := make([]string, len(s.items))
namedItem := map[string]*testItem{}
- for i, ti := range items {
+ for i, ti := range s.items {
names[i] = ti.name
namedItem[ti.name] = ti
}
+
+ // First do the go_test:* ones. partitionGoTests
+ // only returns those, which are the ones we merge together.
stdSets := partitionGoTests(names)
- setForTest := map[string][]*testItem{}
for _, set := range stdSets {
tis := make([]*testItem, len(set))
for i, name := range set {
tis[i] = namedItem[name]
- setForTest[name] = tis
}
+ s.inOrder = append(s.inOrder, tis)
}
- ch := make(chan []*testItem)
- go func() {
- defer close(ch)
- for _, ti := range items {
- if !ti.tryTake() {
- continue
- }
- send := []*testItem{ti}
- for _, other := range setForTest[ti.name] {
- if other != ti && other.tryTake() {
- send = append(send, other)
- }
- }
- select {
- case ch <- send:
- case <-s.st.donec:
- return
- }
+ // Then do the misc tests, which are always by themselves.
+ // (No benefit to merging them)
+ for _, ti := range s.items {
+ if !strings.HasPrefix(ti.name, "go_test:") {
+ s.inOrder = append(s.inOrder, []*testItem{ti})
}
- for {
- select {
- case ti := <-s.retryc:
- if ti.tryTake() {
- select {
- case ch <- []*testItem{ti}:
- case <-s.st.donec:
- return
- }
- }
- case <-s.st.donec:
- return
- }
- }
- }()
- return ch
+ }
+}
+
+func (s *testSet) initBiggestFirst() {
+ items := append([]*testItem(nil), s.items...)
+ sort.Sort(sort.Reverse(byTestDuration(items)))
+ for _, item := range items {
+ s.biggestFirst = append(s.biggestFirst, []*testItem{item})
+ }
}
type testItem struct {
@@ -1950,14 +1950,6 @@
func (ti *testItem) retry() {
// release it to make it available for somebody else to try later:
<-ti.take
-
- // Enqueue this test to retry, unless the build is
- // only proceeding to the first failure and it's
- // already failed.
- select {
- case ti.set.retryc <- ti:
- case <-ti.set.st.donec:
- }
}
type byTestDuration []*testItem
@@ -2104,7 +2096,9 @@
}
fmt.Fprintf(w, " %7s %v %s %s\n", elapsed, evt.t.Format(time.RFC3339), e, text)
}
- fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds()))
+ if st.isRunningLocked() {
+ fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds()))
+ }
}
@@ -2214,4 +2208,75 @@
return bytes.NewReader(buf.Bytes())
}
+var sourceGroup singleflight.Group
+
+var sourceCache = lru.New(20) // git rev -> []byte
+
+// repo is go.googlesource.com repo ("go", "net", etc)
+// rev is git revision.
+func getSourceTgz(el eventTimeLogger, repo, rev string) (tgz io.Reader, err error) {
+ fromCache := false
+ vi, err, shared := sourceGroup.Do(rev, func() (interface{}, error) {
+ if tgzBytes, ok := sourceCache.Get(rev); ok {
+ fromCache = true
+ return tgzBytes, nil
+ }
+
+ for i := 0; i < 10; i++ {
+ el.logEventTime("fetching_source", "from watcher")
+ tgzBytes, err := getSourceTgzFromWatcher(repo, rev)
+ if err == nil {
+ sourceCache.Add(rev, tgzBytes)
+ return tgzBytes, nil
+ }
+ log.Printf("Error fetching source %s/%s from watcher (after %v uptime): %v",
+ repo, rev, time.Since(processStartTime), err)
+ // Wait for watcher to start up. Give it a minute until
+ // we try Gerrit.
+ time.Sleep(6 * time.Second)
+ }
+
+ el.logEventTime("fetching_source", "from gerrit")
+ tgzBytes, err := getSourceTgzFromGerrit(repo, rev)
+ if err == nil {
+ sourceCache.Add(rev, tgzBytes)
+ }
+ return tgzBytes, err
+ })
+ if err != nil {
+ return nil, err
+ }
+ el.logEventTime("got_source", fmt.Sprintf("cache=%v shared=%v", fromCache, shared))
+ return bytes.NewReader(vi.([]byte)), nil
+}
+
+func getSourceTgzFromGerrit(repo, rev string) (tgz []byte, err error) {
+ return getSourceTgzFromURL("gerrit", repo, rev, "https://go.googlesource.com/"+repo+"/+archive/"+rev+".tar.gz")
+}
+
+func getSourceTgzFromWatcher(repo, rev string) (tgz []byte, err error) {
+ return getSourceTgzFromURL("watcher", repo, rev, "http://"+gitArchiveAddr+"/"+repo+".tar.gz?rev="+rev)
+}
+
+func getSourceTgzFromURL(source, repo, rev, urlStr string) (tgz []byte, err error) {
+ res, err := http.Get(urlStr)
+ if err != nil {
+ return nil, fmt.Errorf("fetching %s/%s from %s: %v", repo, rev, source, err)
+ }
+ defer res.Body.Close()
+ if res.StatusCode/100 != 2 {
+ slurp, _ := ioutil.ReadAll(io.LimitReader(res.Body, 4<<10))
+ return nil, fmt.Errorf("fetching %s/%s from %s: %v; body: %s", repo, rev, source, res.Status, slurp)
+ }
+ const maxSize = 25 << 20 // seems unlikely; go source is 7.8MB on 2015-06-15
+ slurp, err := ioutil.ReadAll(io.LimitReader(res.Body, maxSize+1))
+ if len(slurp) > maxSize && err == nil {
+ err = fmt.Errorf("body over %d bytes", maxSize)
+ }
+ if err != nil {
+ return nil, fmt.Errorf("reading %s/%s from %s: %v", repo, rev, source, err)
+ }
+ return slurp, nil
+}
+
var nl = []byte("\n")