buildlet, cmd/coordinator: GCE quota accounting, fixes

And deal with Preemptible resource exhaustion errors.

And change all-compile to misc-compile and only do the builders
not covered otherwise (Fixes #11073)

And make the watcher serve git source.

And cache and singleflight fetching of git source.

And a million other things.

Fixes golang/go#11073

Change-Id: I0f45610f0c6a06bd0c8ba9632b8624e00aeb52fc
Reviewed-on: https://go-review.googlesource.com/10750
Reviewed-by: Andrew Gerrand <adg@golang.org>
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index 0aac886..d7c9ccf 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -35,6 +35,8 @@
 	"golang.org/x/build/buildlet"
 	"golang.org/x/build/dashboard"
 	"golang.org/x/build/gerrit"
+	"golang.org/x/build/internal/lru"
+	"golang.org/x/build/internal/singleflight"
 	"golang.org/x/build/types"
 	"google.golang.org/cloud/storage"
 )
@@ -90,7 +92,7 @@
 
 func init() {
 	tryList := []string{
-		"all-compile",
+		"misc-compile",
 		"darwin-amd64-10_10",
 		"linux-386",
 		"linux-amd64",
@@ -573,7 +575,8 @@
 func findWorkLoop(work chan<- builderRev) {
 	// Useful for debugging a single run:
 	if devCluster && false {
-		work <- builderRev{name: "linux-amd64-race", rev: "54789eff385780c54254f822e09505b6222918e2"}
+		work <- builderRev{name: "linux-amd64", rev: "54789eff385780c54254f822e09505b6222918e2"}
+		work <- builderRev{name: "windows-amd64-gce", rev: "54789eff385780c54254f822e09505b6222918e2"}
 		return
 	}
 	ticker := time.NewTicker(15 * time.Second)
@@ -780,12 +783,17 @@
 	msg := "TryBots beginning. Status page: http://farmer.golang.org/try?commit=" + ts.Commit[:8]
 
 	if ci, err := gerritClient.GetChangeDetail(ts.ChangeID); err == nil {
+		if len(ci.Messages) == 0 {
+			log.Printf("No Gerrit comments retrieved on %v", ts.ChangeID)
+		}
 		for _, cmi := range ci.Messages {
-			if cmi.Message == msg {
+			if strings.Contains(cmi.Message, msg) {
 				// Dup. Don't spam.
 				return
 			}
 		}
+	} else {
+		log.Printf("Error getting Gerrit comments on %s: %v", ts.ChangeID, err)
 	}
 
 	// Ignore error. This isn't critical.
@@ -1137,7 +1145,7 @@
 	st.helpers = GetBuildlets(st.donec, pool, st.conf.NumTestHelpers, st.buildletType(), st.rev, st)
 }
 
-func (st *buildStatus) build() (retErr error) {
+func (st *buildStatus) build() error {
 	pool, err := st.buildletPool()
 	if err != nil {
 		return err
@@ -1153,20 +1161,6 @@
 	st.mu.Unlock()
 
 	st.logEventTime("got_buildlet", bc.IPPort())
-	goodRes := func(res *http.Response, err error, what string) bool {
-		if err != nil {
-			retErr = fmt.Errorf("%s: %v", what, err)
-			return false
-		}
-		if res.StatusCode/100 != 2 {
-			slurp, _ := ioutil.ReadAll(io.LimitReader(res.Body, 4<<10))
-			retErr = fmt.Errorf("%s: %v; body: %s", what, res.Status, slurp)
-			res.Body.Close()
-			return false
-
-		}
-		return true
-	}
 
 	// Write the VERSION file.
 	st.logEventTime("start_write_version_tar")
@@ -1174,19 +1168,15 @@
 		return fmt.Errorf("writing VERSION tgz: %v", err)
 	}
 
-	// Feed the buildlet a tar file for it to extract.
-	// TODO: cache these.
-	st.logEventTime("start_fetch_gerrit_tgz")
-	tarRes, err := http.Get("https://go.googlesource.com/go/+archive/" + st.rev + ".tar.gz")
-	if !goodRes(tarRes, err, "fetching tarball from Gerrit") {
-		return
-	}
-
 	var grp syncutil.Group
 	grp.Go(func() error {
+		st.logEventTime("fetch_go_tar")
+		tarReader, err := getSourceTgz(st, "go", st.rev)
+		if err != nil {
+			return err
+		}
 		st.logEventTime("start_write_go_tar")
-		if err := bc.PutTar(tarRes.Body, "go"); err != nil {
-			tarRes.Body.Close()
+		if err := bc.PutTar(tarReader, "go"); err != nil {
 			return fmt.Errorf("writing tarball from Gerrit: %v", err)
 		}
 		st.logEventTime("end_write_go_tar")
@@ -1356,8 +1346,7 @@
 
 func (st *buildStatus) newTestSet(names []string) *testSet {
 	set := &testSet{
-		st:     st,
-		retryc: make(chan *testItem, len(names)),
+		st: st,
 	}
 	for _, name := range names {
 		set.items = append(set.items, &testItem{
@@ -1635,12 +1624,16 @@
 	// lumpy.  The rest of the buildlets run the largest tests
 	// first (critical path scheduling).
 	go func() {
-		goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
-		for tis := range set.itemsInOrder() {
+		for {
+			tis, ok := set.testsToRunInOrder()
+			if !ok {
+				st.logEventTime("in_order_tests_complete")
+				return
+			}
+			goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
 			st.runTestsOnBuildlet(st.bc, tis, goroot)
 		}
 	}()
-	helperWork := set.itemsBiggestFirst()
 	go func() {
 		for helper := range helpers {
 			go func(bc *buildlet.Client) {
@@ -1660,9 +1653,14 @@
 					log.Printf("error discovering workdir for helper %s: %v", bc.IPPort(), err)
 					return
 				}
-				goroot := st.conf.FilePathJoin(workDir, "go")
 				st.logEventTime("setup_helper", bc.IPPort())
-				for tis := range helperWork {
+				goroot := st.conf.FilePathJoin(workDir, "go")
+				for {
+					tis, ok := set.testsToRunBiggestFirst()
+					if !ok {
+						st.logEventTime("biggest_tests_complete", bc.IPPort())
+						return
+					}
 					st.runTestsOnBuildlet(bc, tis, goroot)
 				}
 			}(helper)
@@ -1672,7 +1670,15 @@
 	var lastBanner string
 	var serialDuration time.Duration
 	for _, ti := range set.items {
-		<-ti.done // wait for success
+	AwaitDone:
+		for {
+			select {
+			case <-ti.done: // wait for success
+				break AwaitDone
+			case <-time.After(30 * time.Second):
+				st.logEventTime("still_waiting_on_test", ti.name)
+			}
+		}
 
 		serialDuration += ti.execDuration
 		if len(ti.output) > 0 {
@@ -1818,11 +1824,9 @@
 	st    *buildStatus
 	items []*testItem
 
-	// retryc communicates failures to watch a test. The channel is
-	// never closed. Sends should also select on reading st.donec
-	// to see if the things have stopped early due to another test
-	// failing and aborting the build.
-	retryc chan *testItem
+	mu           sync.Mutex
+	inOrder      [][]*testItem
+	biggestFirst [][]*testItem
 }
 
 // cancelAll cancels all pending tests.
@@ -1832,76 +1836,72 @@
 	}
 }
 
-// itemsInOrder returns a channel of items mostly in their original order.
-// The exception is that an item which fails to execute may happen later
-// in a different order.
-// Each item sent in the channel has been took. (ti.tryTake returned true)
-// The returned channel is closed when no items remain.
-func (s *testSet) itemsInOrder() <-chan []*testItem {
-	return s.itemChan(s.items)
+func (s *testSet) testsToRunInOrder() (chunk []*testItem, ok bool) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.inOrder == nil {
+		s.initInOrder()
+	}
+	return s.testsFromSlice(s.inOrder)
 }
 
-func (s *testSet) itemsBiggestFirst() <-chan []*testItem {
-	items := append([]*testItem(nil), s.items...)
-	sort.Sort(sort.Reverse(byTestDuration(items)))
-	return s.itemChan(items)
+func (s *testSet) testsToRunBiggestFirst() (chunk []*testItem, ok bool) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+	if s.biggestFirst == nil {
+		s.initBiggestFirst()
+	}
+	return s.testsFromSlice(s.biggestFirst)
 }
 
-// itemChan returns a channel which yields the provided items, usually
-// in the same order given items, but grouped with others tests they
-// should be run with. (only stdlib tests are are grouped)
-func (s *testSet) itemChan(items []*testItem) <-chan []*testItem {
-	names := make([]string, len(items))
+func (s *testSet) testsFromSlice(chunkList [][]*testItem) (chunk []*testItem, ok bool) {
+	for _, candChunk := range chunkList {
+		for _, ti := range candChunk {
+			if ti.tryTake() {
+				chunk = append(chunk, ti)
+			}
+		}
+		if len(chunk) > 0 {
+			return chunk, true
+		}
+	}
+	return nil, false
+}
+
+func (s *testSet) initInOrder() {
+	names := make([]string, len(s.items))
 	namedItem := map[string]*testItem{}
-	for i, ti := range items {
+	for i, ti := range s.items {
 		names[i] = ti.name
 		namedItem[ti.name] = ti
 	}
+
+	// First do the go_test:* ones. partitionGoTests
+	// only returns those, which are the ones we merge together.
 	stdSets := partitionGoTests(names)
-	setForTest := map[string][]*testItem{}
 	for _, set := range stdSets {
 		tis := make([]*testItem, len(set))
 		for i, name := range set {
 			tis[i] = namedItem[name]
-			setForTest[name] = tis
 		}
+		s.inOrder = append(s.inOrder, tis)
 	}
 
-	ch := make(chan []*testItem)
-	go func() {
-		defer close(ch)
-		for _, ti := range items {
-			if !ti.tryTake() {
-				continue
-			}
-			send := []*testItem{ti}
-			for _, other := range setForTest[ti.name] {
-				if other != ti && other.tryTake() {
-					send = append(send, other)
-				}
-			}
-			select {
-			case ch <- send:
-			case <-s.st.donec:
-				return
-			}
+	// Then do the misc tests, which are always by themselves.
+	// (No benefit to merging them)
+	for _, ti := range s.items {
+		if !strings.HasPrefix(ti.name, "go_test:") {
+			s.inOrder = append(s.inOrder, []*testItem{ti})
 		}
-		for {
-			select {
-			case ti := <-s.retryc:
-				if ti.tryTake() {
-					select {
-					case ch <- []*testItem{ti}:
-					case <-s.st.donec:
-						return
-					}
-				}
-			case <-s.st.donec:
-				return
-			}
-		}
-	}()
-	return ch
+	}
+}
+
+func (s *testSet) initBiggestFirst() {
+	items := append([]*testItem(nil), s.items...)
+	sort.Sort(sort.Reverse(byTestDuration(items)))
+	for _, item := range items {
+		s.biggestFirst = append(s.biggestFirst, []*testItem{item})
+	}
 }
 
 type testItem struct {
@@ -1950,14 +1950,6 @@
 func (ti *testItem) retry() {
 	// release it to make it available for somebody else to try later:
 	<-ti.take
-
-	// Enqueue this test to retry, unless the build is
-	// only proceeding to the first failure and it's
-	// already failed.
-	select {
-	case ti.set.retryc <- ti:
-	case <-ti.set.st.donec:
-	}
 }
 
 type byTestDuration []*testItem
@@ -2104,7 +2096,9 @@
 		}
 		fmt.Fprintf(w, " %7s %v %s %s\n", elapsed, evt.t.Format(time.RFC3339), e, text)
 	}
-	fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds()))
+	if st.isRunningLocked() {
+		fmt.Fprintf(w, " %7s (now)\n", fmt.Sprintf("+%0.1fs", time.Since(lastT).Seconds()))
+	}
 
 }
 
@@ -2214,4 +2208,75 @@
 	return bytes.NewReader(buf.Bytes())
 }
 
+var sourceGroup singleflight.Group
+
+var sourceCache = lru.New(20) // git rev -> []byte
+
+// repo is go.googlesource.com repo ("go", "net", etc)
+// rev is git revision.
+func getSourceTgz(el eventTimeLogger, repo, rev string) (tgz io.Reader, err error) {
+	fromCache := false
+	vi, err, shared := sourceGroup.Do(rev, func() (interface{}, error) {
+		if tgzBytes, ok := sourceCache.Get(rev); ok {
+			fromCache = true
+			return tgzBytes, nil
+		}
+
+		for i := 0; i < 10; i++ {
+			el.logEventTime("fetching_source", "from watcher")
+			tgzBytes, err := getSourceTgzFromWatcher(repo, rev)
+			if err == nil {
+				sourceCache.Add(rev, tgzBytes)
+				return tgzBytes, nil
+			}
+			log.Printf("Error fetching source %s/%s from watcher (after %v uptime): %v",
+				repo, rev, time.Since(processStartTime), err)
+			// Wait for watcher to start up. Give it a minute until
+			// we try Gerrit.
+			time.Sleep(6 * time.Second)
+		}
+
+		el.logEventTime("fetching_source", "from gerrit")
+		tgzBytes, err := getSourceTgzFromGerrit(repo, rev)
+		if err == nil {
+			sourceCache.Add(rev, tgzBytes)
+		}
+		return tgzBytes, err
+	})
+	if err != nil {
+		return nil, err
+	}
+	el.logEventTime("got_source", fmt.Sprintf("cache=%v shared=%v", fromCache, shared))
+	return bytes.NewReader(vi.([]byte)), nil
+}
+
+func getSourceTgzFromGerrit(repo, rev string) (tgz []byte, err error) {
+	return getSourceTgzFromURL("gerrit", repo, rev, "https://go.googlesource.com/"+repo+"/+archive/"+rev+".tar.gz")
+}
+
+func getSourceTgzFromWatcher(repo, rev string) (tgz []byte, err error) {
+	return getSourceTgzFromURL("watcher", repo, rev, "http://"+gitArchiveAddr+"/"+repo+".tar.gz?rev="+rev)
+}
+
+func getSourceTgzFromURL(source, repo, rev, urlStr string) (tgz []byte, err error) {
+	res, err := http.Get(urlStr)
+	if err != nil {
+		return nil, fmt.Errorf("fetching %s/%s from %s: %v", repo, rev, source, err)
+	}
+	defer res.Body.Close()
+	if res.StatusCode/100 != 2 {
+		slurp, _ := ioutil.ReadAll(io.LimitReader(res.Body, 4<<10))
+		return nil, fmt.Errorf("fetching %s/%s from %s: %v; body: %s", repo, rev, source, res.Status, slurp)
+	}
+	const maxSize = 25 << 20 // seems unlikely; go source is 7.8MB on 2015-06-15
+	slurp, err := ioutil.ReadAll(io.LimitReader(res.Body, maxSize+1))
+	if len(slurp) > maxSize && err == nil {
+		err = fmt.Errorf("body over %d bytes", maxSize)
+	}
+	if err != nil {
+		return nil, fmt.Errorf("reading %s/%s from %s: %v", repo, rev, source, err)
+	}
+	return slurp, nil
+}
+
 var nl = []byte("\n")