blob: 9462b68777a74af93dcee959a3422ea389b22e80 [file] [log] [blame]
// Copyright 2014 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// The coordinator runs on GCE and coordinates builds in Docker containers.
package main // import "golang.org/x/build/cmd/coordinator"
import (
"archive/tar"
"bytes"
"compress/gzip"
"crypto/hmac"
"crypto/md5"
"crypto/rand"
"encoding/json"
"errors"
"flag"
"fmt"
"html"
"io"
"io/ioutil"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"path"
"sort"
"strconv"
"strings"
"sync"
"time"
"camlistore.org/pkg/syncutil"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/types"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"google.golang.org/cloud/compute/metadata"
)
func init() {
delete(dashboard.Builders, "plan9-386-gcepartial")
}
var (
masterKeyFile = flag.String("masterkey", "", "Path to builder master key. Else fetched using GCE project attribute 'builder-master-key'.")
maxLocalBuilds = flag.Int("maxbuilds", 6, "Max concurrent Docker builds (VM builds don't count)")
cleanZones = flag.String("zones", "us-central1-a,us-central1-b,us-central1-f", "Comma-separated list of zones to periodically clean of stale build VMs (ones that failed to shut themselves down)")
// Debug flags:
just = flag.String("just", "", "If non-empty, run single build in the foreground. Requires rev.")
rev = flag.String("rev", "", "Revision to build.")
)
var (
startTime = time.Now()
watchers = map[string]watchConfig{} // populated at startup, keyed by repo, e.g. "https://go.googlesource.com/go"
donec = make(chan builderRev) // reports of finished builders
statusMu sync.Mutex // guards both status (ongoing ones) and statusDone (just finished)
status = map[builderRev]*buildStatus{}
statusDone []*buildStatus // finished recently, capped to maxStatusDone
)
const (
maxStatusDone = 30
// vmDeleteTimeout is how long before we delete a VM.
// In practice this need only be as long as the slowest
// builder (plan9 currently), because on startup this program
// already deletes all buildlets it doesn't know about
// (i.e. ones from a previous instance of the coordinator).
vmDeleteTimeout = 45 * time.Minute
)
// Initialized by initGCE:
var (
projectID string
projectZone string
computeService *compute.Service
externalIP string
tokenSource oauth2.TokenSource
)
func initGCE() error {
if !metadata.OnGCE() {
return errors.New("not running on GCE; VM support disabled")
}
var err error
projectID, err = metadata.ProjectID()
if err != nil {
return fmt.Errorf("failed to get current GCE ProjectID: %v", err)
}
projectZone, err = metadata.Get("instance/zone")
if err != nil || projectZone == "" {
return fmt.Errorf("failed to get current GCE zone: %v", err)
}
// Convert the zone from "projects/1234/zones/us-central1-a" to "us-central1-a".
projectZone = path.Base(projectZone)
if !hasComputeScope() {
return errors.New("The coordinator is not running with access to read and write Compute resources. VM support disabled.")
}
externalIP, err = metadata.ExternalIP()
if err != nil {
return fmt.Errorf("ExternalIP: %v", err)
}
tokenSource = google.ComputeTokenSource("default")
computeService, _ = compute.New(oauth2.NewClient(oauth2.NoContext, tokenSource))
return nil
}
type imageInfo struct {
url string // of tar file
mu sync.Mutex
lastMod string
}
var images = map[string]*imageInfo{
"go-commit-watcher": {url: "https://storage.googleapis.com/go-builder-data/docker-commit-watcher.tar.gz"},
"gobuilders/linux-x86-base": {url: "https://storage.googleapis.com/go-builder-data/docker-linux.base.tar.gz"},
"gobuilders/linux-x86-clang": {url: "https://storage.googleapis.com/go-builder-data/docker-linux.clang.tar.gz"},
"gobuilders/linux-x86-gccgo": {url: "https://storage.googleapis.com/go-builder-data/docker-linux.gccgo.tar.gz"},
"gobuilders/linux-x86-nacl": {url: "https://storage.googleapis.com/go-builder-data/docker-linux.nacl.tar.gz"},
"gobuilders/linux-x86-sid": {url: "https://storage.googleapis.com/go-builder-data/docker-linux.sid.tar.gz"},
}
// recordResult sends build results to the dashboard
func recordResult(builderName string, ok bool, hash, buildLog string, runTime time.Duration) error {
req := map[string]interface{}{
"Builder": builderName,
"PackagePath": "",
"Hash": hash,
"GoHash": "",
"OK": ok,
"Log": buildLog,
"RunTime": runTime,
}
args := url.Values{"key": {builderKey(builderName)}, "builder": {builderName}}
return dash("POST", "result", args, req, nil)
}
// pingDashboard is a goroutine that periodically POSTS to build.golang.org/building
// to let it know that we're still working on a build.
func pingDashboard(st *buildStatus) {
u := "https://build.golang.org/building?" + url.Values{
"builder": []string{st.name},
"key": []string{builderKey(st.name)},
"hash": []string{st.rev},
"url": []string{fmt.Sprintf("http://%v/logs?name=%s&rev=%s&st=%p", externalIP, st.name, st.rev, st)},
}.Encode()
for {
st.mu.Lock()
done := st.done
st.mu.Unlock()
if !done.IsZero() {
return
}
if res, _ := http.PostForm(u, nil); res != nil {
res.Body.Close()
}
time.Sleep(60 * time.Second)
}
}
type watchConfig struct {
repo string // "https://go.googlesource.com/go"
dash string // "https://build.golang.org/" (must end in /)
interval time.Duration // Polling interval
}
func main() {
flag.Parse()
if err := initGCE(); err != nil {
log.Printf("VM support disabled due to error initializing GCE: %v", err)
}
addWatcher(watchConfig{repo: "https://go.googlesource.com/go", dash: "https://build.golang.org/"})
// TODO(adg,cmang): fix gccgo watcher
// addWatcher(watchConfig{repo: "https://code.google.com/p/gofrontend", dash: "https://build.golang.org/gccgo/"})
if (*just != "") != (*rev != "") {
log.Fatalf("--just and --rev must be used together")
}
if *just != "" {
conf, ok := dashboard.Builders[*just]
if !ok {
log.Fatalf("unknown builder %q", *just)
}
args, err := conf.DockerRunArgs(*rev, builderKey(*just))
if err != nil {
log.Fatal(err)
}
cmd := exec.Command("docker", append([]string{"run"}, args...)...)
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
log.Fatalf("Build failed: %v", err)
}
return
}
http.HandleFunc("/", handleStatus)
http.HandleFunc("/logs", handleLogs)
go http.ListenAndServe(":80", nil)
go cleanUpOldContainers()
go cleanUpOldVMs()
stopWatchers() // clean up before we start new ones
for _, watcher := range watchers {
if err := startWatching(watchers[watcher.repo]); err != nil {
log.Printf("Error starting watcher for %s: %v", watcher.repo, err)
}
}
workc := make(chan builderRev)
go findWorkLoop(workc)
// TODO(cmang): gccgo will need its own findWorkLoop
ticker := time.NewTicker(1 * time.Minute)
for {
select {
case work := <-workc:
log.Printf("workc received %+v; len(status) = %v, maxLocalBuilds = %v; cur = %p", work, len(status), *maxLocalBuilds, status[work])
if mayBuildRev(work) {
conf, _ := dashboard.Builders[work.name]
if st, err := startBuilding(conf, work.rev); err == nil {
setStatus(work, st)
go pingDashboard(st)
} else {
log.Printf("Error starting to build %v: %v", work, err)
}
}
case done := <-donec:
log.Printf("%v done", done)
markDone(done)
case <-ticker.C:
if numCurrentBuilds() == 0 && time.Now().After(startTime.Add(10*time.Minute)) {
// TODO: halt the whole machine to kill the VM or something
}
}
}
}
func numCurrentBuilds() int {
statusMu.Lock()
defer statusMu.Unlock()
return len(status)
}
func isBuilding(work builderRev) bool {
statusMu.Lock()
defer statusMu.Unlock()
_, building := status[work]
return building
}
// mayBuildRev reports whether the build type & revision should be started.
// It returns true if it's not already building, and there is capacity.
func mayBuildRev(work builderRev) bool {
conf, ok := dashboard.Builders[work.name]
if !ok {
return false
}
statusMu.Lock()
_, building := status[work]
statusMu.Unlock()
if building {
return false
}
if conf.UsesVM() {
// These don't count towards *maxLocalBuilds.
return true
}
numDocker, err := numDockerBuilds()
if err != nil {
log.Printf("not starting %v due to docker ps failure: %v", work, err)
return false
}
return numDocker < *maxLocalBuilds
}
func setStatus(work builderRev, st *buildStatus) {
statusMu.Lock()
defer statusMu.Unlock()
status[work] = st
}
func markDone(work builderRev) {
statusMu.Lock()
defer statusMu.Unlock()
st, ok := status[work]
if !ok {
return
}
delete(status, work)
if len(statusDone) == maxStatusDone {
copy(statusDone, statusDone[1:])
statusDone = statusDone[:len(statusDone)-1]
}
statusDone = append(statusDone, st)
}
func vmIsBuilding(instName string) bool {
if instName == "" {
log.Printf("bogus empty instance name passed to vmIsBuilding")
return false
}
statusMu.Lock()
defer statusMu.Unlock()
for _, st := range status {
if st.instName == instName {
return true
}
}
return false
}
// statusPtrStr disambiguates which status to return if there are
// multiple in the history (e.g. recent failures where the build
// didn't finish for reasons outside of all.bash failing)
func getStatus(work builderRev, statusPtrStr string) *buildStatus {
statusMu.Lock()
defer statusMu.Unlock()
match := func(st *buildStatus) bool {
return statusPtrStr == "" || fmt.Sprintf("%p", st) == statusPtrStr
}
if st, ok := status[work]; ok && match(st) {
return st
}
for _, st := range statusDone {
if st.builderRev == work && match(st) {
return st
}
}
return nil
}
type byAge []*buildStatus
func (s byAge) Len() int { return len(s) }
func (s byAge) Less(i, j int) bool { return s[i].start.Before(s[j].start) }
func (s byAge) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func handleStatus(w http.ResponseWriter, r *http.Request) {
var active []*buildStatus
var recent []*buildStatus
statusMu.Lock()
for _, st := range status {
active = append(active, st)
}
recent = append(recent, statusDone...)
numTotal := len(status)
numDocker, err := numDockerBuilds()
statusMu.Unlock()
sort.Sort(byAge(active))
sort.Sort(sort.Reverse(byAge(recent)))
io.WriteString(w, "<html><body><h1>Go build coordinator</h1>")
if err != nil {
fmt.Fprintf(w, "<h2>Error</h2>Error fetching Docker build count: <i>%s</i>\n", html.EscapeString(err.Error()))
}
fmt.Fprintf(w, "<h2>running</h2><p>%d total builds active (Docker: %d/%d; VMs: %d/∞):",
numTotal, numDocker, *maxLocalBuilds, numTotal-numDocker)
io.WriteString(w, "<pre>")
for _, st := range active {
io.WriteString(w, st.htmlStatusLine())
}
io.WriteString(w, "</pre>")
io.WriteString(w, "<h2>recently completed</h2><pre>")
for _, st := range recent {
io.WriteString(w, st.htmlStatusLine())
}
io.WriteString(w, "</pre>")
fmt.Fprintf(w, "<h2>disk space</h2><pre>%s</pre></body></html>", html.EscapeString(diskFree()))
}
func diskFree() string {
out, _ := exec.Command("df", "-h").Output()
return string(out)
}
func handleLogs(w http.ResponseWriter, r *http.Request) {
st := getStatus(builderRev{r.FormValue("name"), r.FormValue("rev")}, r.FormValue("st"))
if st == nil {
http.NotFound(w, r)
return
}
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
writeStatusHeader(w, st)
io.WriteString(w, st.logs())
// TODO: if st is still building, stream them to the user with
// http.Flusher.Flush and CloseNotifier and registering interest
// of new writes with the buildStatus. Will require moving the
// BUILDERKEY scrubbing into the Write method.
}
func writeStatusHeader(w http.ResponseWriter, st *buildStatus) {
st.mu.Lock()
defer st.mu.Unlock()
fmt.Fprintf(w, " builder: %s\n", st.name)
fmt.Fprintf(w, " rev: %s\n", st.rev)
if st.container != "" {
fmt.Fprintf(w, "container: %s\n", st.container)
}
if st.instName != "" {
fmt.Fprintf(w, " vm name: %s\n", st.instName)
}
fmt.Fprintf(w, " started: %v\n", st.start)
done := !st.done.IsZero()
if done {
fmt.Fprintf(w, " started: %v\n", st.done)
fmt.Fprintf(w, " success: %v\n", st.succeeded)
} else {
fmt.Fprintf(w, " status: still running\n")
}
if len(st.events) > 0 {
io.WriteString(w, "\nEvents:\n")
st.writeEventsLocked(w, false)
}
io.WriteString(w, "\nBuild log:\n")
}
// findWorkLoop polls http://build.golang.org/?mode=json looking for new work
// for the main dashboard. It does not support gccgo.
// TODO(bradfitz): it also currently does not support subrepos.
func findWorkLoop(work chan<- builderRev) {
ticker := time.NewTicker(15 * time.Second)
for {
if err := findWork(work); err != nil {
log.Printf("failed to find new work: %v", err)
}
<-ticker.C
}
}
func findWork(work chan<- builderRev) error {
var bs types.BuildStatus
res, err := http.Get("https://build.golang.org/?mode=json")
if err != nil {
return err
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&bs); err != nil {
return err
}
if res.StatusCode != 200 {
return fmt.Errorf("unexpected http status %v", res.Status)
}
knownToDashboard := map[string]bool{} // keys are builder
for _, b := range bs.Builders {
knownToDashboard[b] = true
}
var goRevisions []string
for _, br := range bs.Revisions {
if br.Repo == "go" {
goRevisions = append(goRevisions, br.Revision)
} else {
// TODO(bradfitz): support these: golang.org/issue/9506
continue
}
if len(br.Results) != len(bs.Builders) {
return errors.New("bogus JSON response from dashboard: results is too long.")
}
for i, res := range br.Results {
if res != "" {
// It's either "ok" or a failure URL.
continue
}
builder := bs.Builders[i]
if _, ok := dashboard.Builders[builder]; !ok {
// Not managed by the coordinator.
continue
}
br := builderRev{bs.Builders[i], br.Revision}
if !isBuilding(br) {
work <- br
}
}
}
// And to bootstrap new builders, see if we have any builders
// that the dashboard doesn't know about.
for b := range dashboard.Builders {
if knownToDashboard[b] {
continue
}
for _, rev := range goRevisions {
br := builderRev{b, rev}
if !isBuilding(br) {
work <- br
}
}
}
return nil
}
// builderRev is a build configuration type and a revision.
type builderRev struct {
name string // e.g. "linux-amd64-race"
rev string // lowercase hex git hash
}
// returns the part after "docker run"
func (conf watchConfig) dockerRunArgs() (args []string) {
log.Printf("Running watcher with master key %q", masterKey())
if key := masterKey(); len(key) > 0 {
tmpKey := "/tmp/watcher.buildkey"
if _, err := os.Stat(tmpKey); err != nil {
if err := ioutil.WriteFile(tmpKey, key, 0600); err != nil {
log.Fatal(err)
}
}
// Images may look for .gobuildkey in / or /root, so provide both.
// TODO(adg): fix images that look in the wrong place.
args = append(args, "-v", tmpKey+":/.gobuildkey")
args = append(args, "-v", tmpKey+":/root/.gobuildkey")
}
args = append(args,
"go-commit-watcher",
"/usr/local/bin/watcher",
"-repo="+conf.repo,
"-dash="+conf.dash,
"-poll="+conf.interval.String(),
)
return
}
func addWatcher(c watchConfig) {
if c.repo == "" {
c.repo = "https://go.googlesource.com/go"
}
if c.dash == "" {
c.dash = "https://build.golang.org/"
}
if c.interval == 0 {
c.interval = 10 * time.Second
}
watchers[c.repo] = c
}
func condUpdateImage(img string) error {
ii := images[img]
if ii == nil {
return fmt.Errorf("image %q doesn't exist", img)
}
ii.mu.Lock()
defer ii.mu.Unlock()
res, err := http.Head(ii.url)
if err != nil {
return fmt.Errorf("Error checking %s: %v", ii.url, err)
}
if res.StatusCode != 200 {
return fmt.Errorf("Error checking %s: %v", ii.url, res.Status)
}
if res.Header.Get("Last-Modified") == ii.lastMod {
return nil
}
res, err = http.Get(ii.url)
if err != nil || res.StatusCode != 200 {
return fmt.Errorf("Get after Head failed for %s: %v, %v", ii.url, err, res)
}
defer res.Body.Close()
log.Printf("Running: docker load of %s\n", ii.url)
cmd := exec.Command("docker", "load")
cmd.Stdin = res.Body
var out bytes.Buffer
cmd.Stdout = &out
cmd.Stderr = &out
if cmd.Run(); err != nil {
log.Printf("Failed to pull latest %s from %s and pipe into docker load: %v, %s", img, ii.url, err, out.Bytes())
return err
}
ii.lastMod = res.Header.Get("Last-Modified")
return nil
}
// numDockerBuilds finds the number of go builder instances currently running.
func numDockerBuilds() (n int, err error) {
out, err := exec.Command("docker", "ps").Output()
if err != nil {
return 0, err
}
for _, line := range strings.Split(string(out), "\n") {
if strings.Contains(line, "gobuilders/") {
n++
}
}
return n, nil
}
func startBuilding(conf dashboard.BuildConfig, rev string) (*buildStatus, error) {
if conf.UsesVM() {
return startBuildingInVM(conf, rev)
} else {
return startBuildingInDocker(conf, rev)
}
}
func startBuildingInDocker(conf dashboard.BuildConfig, rev string) (*buildStatus, error) {
if err := condUpdateImage(conf.Image); err != nil {
log.Printf("Failed to setup container for %v %v: %v", conf.Name, rev, err)
return nil, err
}
runArgs, err := conf.DockerRunArgs(rev, builderKey(conf.Name))
if err != nil {
return nil, err
}
cmd := exec.Command("docker", append([]string{"run", "-d"}, runArgs...)...)
all, err := cmd.CombinedOutput()
log.Printf("Docker run for %v %v = err:%v, output:%s", conf.Name, rev, err, all)
if err != nil {
return nil, err
}
container := strings.TrimSpace(string(all))
brev := builderRev{
name: conf.Name,
rev: rev,
}
st := &buildStatus{
builderRev: brev,
container: container,
start: time.Now(),
}
log.Printf("%v now building in Docker container %v", brev, st.container)
go func() {
all, err := exec.Command("docker", "wait", container).CombinedOutput()
output := strings.TrimSpace(string(all))
var ok bool
if err == nil {
exit, err := strconv.Atoi(output)
ok = (err == nil && exit == 0)
}
st.setDone(ok)
log.Printf("docker wait %s/%s: %v, %s", container, rev, err, output)
donec <- builderRev{conf.Name, rev}
exec.Command("docker", "rm", container).Run()
}()
go func() {
cmd := exec.Command("docker", "logs", "-f", container)
cmd.Stdout = st
cmd.Stderr = st
if err := cmd.Run(); err != nil {
// The docker logs subcommand always returns
// success, even if the underlying process
// fails.
log.Printf("failed to follow docker logs of %s: %v", container, err)
}
}()
return st, nil
}
func randHex(n int) string {
buf := make([]byte, n/2)
_, err := rand.Read(buf)
if err != nil {
panic("Failed to get randomness: " + err.Error())
}
return fmt.Sprintf("%x", buf)
}
// startBuildingInVM starts a VM on GCE running the buildlet binary to build rev.
// TODO(bradfitz): move this into a buildlet client package.
func startBuildingInVM(conf dashboard.BuildConfig, rev string) (*buildStatus, error) {
brev := builderRev{
name: conf.Name,
rev: rev,
}
// name is the project-wide unique name of the GCE instance. It can't be longer
// than 61 bytes, so we only use the first 8 bytes of the rev.
name := "buildlet-" + conf.Name + "-" + rev[:8] + "-rn" + randHex(6)
st := &buildStatus{
builderRev: brev,
start: time.Now(),
instName: name,
}
go func() {
err := buildInVM(conf, st)
if err != nil {
if st.hasEvent("instance_created") {
go deleteVM(projectZone, st.instName)
}
}
st.setDone(err == nil)
if err != nil {
fmt.Fprintf(st, "\n\nError: %v\n", err)
}
donec <- builderRev{conf.Name, rev}
}()
return st, nil
}
func buildInVM(conf dashboard.BuildConfig, st *buildStatus) (retErr error) {
bc, err := buildlet.StartNewVM(tokenSource, st.instName, conf.Name, buildlet.VMOpts{
ProjectID: projectID,
Zone: projectZone,
Description: fmt.Sprintf("Go Builder building %s %s", conf.Name, st.rev),
DeleteIn: vmDeleteTimeout,
OnInstanceRequested: func() {
st.logEventTime("instance_create_requested")
log.Printf("%v now booting VM %v for build", st.builderRev, st.instName)
},
OnInstanceCreated: func() {
st.logEventTime("instance_created")
},
OnGotInstanceInfo: func() {
st.logEventTime("waiting_for_buildlet")
},
})
if err != nil {
return err
}
st.logEventTime("buildlet_up")
goodRes := func(res *http.Response, err error, what string) bool {
if err != nil {
retErr = fmt.Errorf("%s: %v", what, err)
return false
}
if res.StatusCode/100 != 2 {
slurp, _ := ioutil.ReadAll(io.LimitReader(res.Body, 4<<10))
retErr = fmt.Errorf("%s: %v; body: %s", what, res.Status, slurp)
res.Body.Close()
return false
}
return true
}
// Write the VERSION file.
st.logEventTime("start_write_version_tar")
if err := bc.PutTar(versionTgz(st.rev), "go"); err != nil {
return fmt.Errorf("writing VERSION tgz: %v", err)
}
// Feed the buildlet a tar file for it to extract.
// TODO: cache these.
st.logEventTime("start_fetch_gerrit_tgz")
tarRes, err := http.Get("https://go.googlesource.com/go/+archive/" + st.rev + ".tar.gz")
if !goodRes(tarRes, err, "fetching tarball from Gerrit") {
return
}
var grp syncutil.Group
grp.Go(func() error {
st.logEventTime("start_write_go_tar")
if err := bc.PutTar(tarRes.Body, "go"); err != nil {
tarRes.Body.Close()
return fmt.Errorf("writing tarball from Gerrit: %v", err)
}
st.logEventTime("end_write_go_tar")
return nil
})
if conf.Go14URL != "" {
grp.Go(func() error {
st.logEventTime("start_write_go14_tar")
if err := bc.PutTarFromURL(conf.Go14URL, "go1.4"); err != nil {
return err
}
st.logEventTime("end_write_go14_tar")
return nil
})
}
if err := grp.Err(); err != nil {
return err
}
execStartTime := time.Now()
st.logEventTime("pre_exec")
remoteErr, err := bc.Exec(path.Join("go", conf.AllScript()), buildlet.ExecOpts{
Output: st,
OnStartExec: func() { st.logEventTime("running_exec") },
ExtraEnv: conf.Env(),
})
if err != nil {
return err
}
st.logEventTime("done")
var log string
if remoteErr != nil {
log = st.logs()
}
if err := recordResult(st.name, remoteErr == nil, st.rev, log, time.Since(execStartTime)); err != nil {
if remoteErr != nil {
return fmt.Errorf("Remote error was %q but failed to report it to the dashboard: %v", remoteErr, err)
}
return fmt.Errorf("Build succeeded but failed to report it to the dashboard: %v", err)
}
if remoteErr != nil {
return fmt.Errorf("%s failed: %v", conf.AllScript(), remoteErr)
}
return nil
}
type eventAndTime struct {
evt string
t time.Time
}
// buildStatus is the status of a build.
type buildStatus struct {
// Immutable:
builderRev
start time.Time
container string // container ID for docker, else it's a VM
// Immutable, used by VM only:
instName string
mu sync.Mutex // guards following
done time.Time // finished running
succeeded bool // set when done
output bytes.Buffer // stdout and stderr
events []eventAndTime
}
func (st *buildStatus) setDone(succeeded bool) {
st.mu.Lock()
defer st.mu.Unlock()
st.succeeded = succeeded
st.done = time.Now()
}
func (st *buildStatus) logEventTime(event string) {
st.mu.Lock()
defer st.mu.Unlock()
st.events = append(st.events, eventAndTime{event, time.Now()})
}
func (st *buildStatus) hasEvent(event string) bool {
st.mu.Lock()
defer st.mu.Unlock()
for _, e := range st.events {
if e.evt == event {
return true
}
}
return false
}
// htmlStatusLine returns the HTML to show within the <pre> block on
// the main page's list of active builds.
func (st *buildStatus) htmlStatusLine() string {
st.mu.Lock()
defer st.mu.Unlock()
urlPrefix := "https://go-review.googlesource.com/#/q/"
if strings.Contains(st.name, "gccgo") {
urlPrefix = "https://code.google.com/p/gofrontend/source/detail?r="
}
var buf bytes.Buffer
fmt.Fprintf(&buf, "<a href='https://github.com/golang/go/wiki/DashboardBuilders'>%s</a> rev <a href='%s%s'>%s</a>",
st.name, urlPrefix, st.rev, st.rev)
if st.done.IsZero() {
buf.WriteString(", running")
} else if st.succeeded {
buf.WriteString(", succeeded")
} else {
buf.WriteString(", failed")
}
if st.container != "" {
fmt.Fprintf(&buf, " in container <a href='%s'>%s</a>", st.logsURL(), st.container)
} else {
fmt.Fprintf(&buf, " in VM <a href='%s'>%s</a>", st.logsURL(), st.instName)
}
t := st.done
if t.IsZero() {
t = st.start
}
fmt.Fprintf(&buf, ", %v ago\n", time.Since(t))
st.writeEventsLocked(&buf, true)
return buf.String()
}
func (st *buildStatus) logsURL() string {
return fmt.Sprintf("/logs?name=%s&rev=%s&st=%p", st.name, st.rev, st)
}
// st.mu must be held.
func (st *buildStatus) writeEventsLocked(w io.Writer, html bool) {
for i, evt := range st.events {
var elapsed string
if i != 0 {
elapsed = fmt.Sprintf("+%0.1fs", evt.t.Sub(st.events[i-1].t).Seconds())
}
msg := evt.evt
if msg == "running_exec" && html {
msg = fmt.Sprintf("<a href='%s'>%s</a>", st.logsURL(), msg)
}
fmt.Fprintf(w, " %7s %v %s\n", elapsed, evt.t.Format(time.RFC3339), msg)
}
}
func (st *buildStatus) logs() string {
st.mu.Lock()
logs := st.output.String()
st.mu.Unlock()
key := builderKey(st.name)
return strings.Replace(string(logs), key, "BUILDERKEY", -1)
}
func (st *buildStatus) Write(p []byte) (n int, err error) {
st.mu.Lock()
defer st.mu.Unlock()
const maxBufferSize = 2 << 20 // 2MB of output is way more than we expect.
plen := len(p)
if st.output.Len()+len(p) > maxBufferSize {
p = p[:maxBufferSize-st.output.Len()]
}
st.output.Write(p) // bytes.Buffer can't fail
return plen, nil
}
// Stop any previous go-commit-watcher Docker tasks, so they don't
// pile up upon restarts of the coordinator.
func stopWatchers() {
out, err := exec.Command("docker", "ps").Output()
if err != nil {
return
}
for _, line := range strings.Split(string(out), "\n") {
if !strings.Contains(line, "go-commit-watcher:") {
continue
}
f := strings.Fields(line)
exec.Command("docker", "rm", "-f", "-v", f[0]).Run()
}
}
func startWatching(conf watchConfig) (err error) {
defer func() {
if err != nil {
restartWatcherSoon(conf)
}
}()
log.Printf("Starting watcher for %v", conf.repo)
if err := condUpdateImage("go-commit-watcher"); err != nil {
log.Printf("Failed to setup container for commit watcher: %v", err)
return err
}
cmd := exec.Command("docker", append([]string{"run", "-d"}, conf.dockerRunArgs()...)...)
all, err := cmd.CombinedOutput()
if err != nil {
log.Printf("Docker run for commit watcher = err:%v, output: %s", err, all)
return err
}
container := strings.TrimSpace(string(all))
// Start a goroutine to wait for the watcher to die.
go func() {
exec.Command("docker", "wait", container).Run()
exec.Command("docker", "rm", "-v", container).Run()
log.Printf("Watcher crashed. Restarting soon.")
restartWatcherSoon(conf)
}()
return nil
}
func restartWatcherSoon(conf watchConfig) {
time.AfterFunc(30*time.Second, func() {
startWatching(conf)
})
}
func builderKey(builder string) string {
master := masterKey()
if len(master) == 0 {
return ""
}
h := hmac.New(md5.New, master)
io.WriteString(h, builder)
return fmt.Sprintf("%x", h.Sum(nil))
}
func masterKey() []byte {
keyOnce.Do(loadKey)
return masterKeyCache
}
var (
keyOnce sync.Once
masterKeyCache []byte
)
func loadKey() {
if *masterKeyFile != "" {
b, err := ioutil.ReadFile(*masterKeyFile)
if err != nil {
log.Fatal(err)
}
masterKeyCache = bytes.TrimSpace(b)
return
}
masterKey, err := metadata.ProjectAttributeValue("builder-master-key")
if err != nil {
log.Fatalf("No builder master key available: %v", err)
}
masterKeyCache = []byte(strings.TrimSpace(masterKey))
}
func cleanUpOldContainers() {
for {
for _, cid := range oldContainers() {
log.Printf("Cleaning old container %v", cid)
exec.Command("docker", "rm", "-v", cid).Run()
}
time.Sleep(30 * time.Second)
}
}
func oldContainers() []string {
out, _ := exec.Command("docker", "ps", "-a", "--filter=status=exited", "--no-trunc", "-q").Output()
return strings.Fields(string(out))
}
// cleanUpOldVMs loops forever and periodically enumerates virtual
// machines and deletes those which have expired.
//
// A VM is considered expired if it has a "delete-at" metadata
// attribute having a unix timestamp before the current time.
//
// This is the safety mechanism to delete VMs which stray from the
// normal deleting process. VMs are created to run a single build and
// should be shut down by a controlling process. Due to various types
// of failures, they might get stranded. To prevent them from getting
// stranded and wasting resources forever, we instead set the
// "delete-at" metadata attribute on them when created to some time
// that's well beyond their expected lifetime.
func cleanUpOldVMs() {
if computeService == nil {
return
}
for {
for _, zone := range strings.Split(*cleanZones, ",") {
zone = strings.TrimSpace(zone)
if err := cleanZoneVMs(zone); err != nil {
log.Printf("Error cleaning VMs in zone %q: %v", zone, err)
}
}
time.Sleep(time.Minute)
}
}
// cleanZoneVMs is part of cleanUpOldVMs, operating on a single zone.
func cleanZoneVMs(zone string) error {
// Fetch the first 500 (default) running instances and clean
// thoes. We expect that we'll be running many fewer than
// that. Even if we have more, eventually the first 500 will
// either end or be cleaned, and then the next call will get a
// partially-different 500.
// TODO(bradfitz): revist this code if we ever start running
// thousands of VMs.
list, err := computeService.Instances.List(projectID, zone).Do()
if err != nil {
return fmt.Errorf("listing instances: %v", err)
}
for _, inst := range list.Items {
if inst.Metadata == nil {
// Defensive. Not seen in practice.
continue
}
sawDeleteAt := false
for _, it := range inst.Metadata.Items {
if it.Key == "delete-at" {
sawDeleteAt = true
unixDeadline, err := strconv.ParseInt(it.Value, 10, 64)
if err != nil {
log.Printf("invalid delete-at value %q seen; ignoring", it.Value)
}
if err == nil && time.Now().Unix() > unixDeadline {
log.Printf("Deleting expired VM %q in zone %q ...", inst.Name, zone)
deleteVM(zone, inst.Name)
}
}
}
// Delete buildlets (things we made) from previous
// generations. Thenaming restriction (buildlet-*)
// prevents us from deleting buildlet VMs used by
// Gophers for interactive development & debugging
// (non-builder users); those are named "mote-*".
if sawDeleteAt && strings.HasPrefix(inst.Name, "buildlet-") && !vmIsBuilding(inst.Name) {
log.Printf("Deleting VM %q in zone %q from an earlier coordinator generation ...", inst.Name, zone)
deleteVM(zone, inst.Name)
}
}
return nil
}
func deleteVM(zone, instName string) {
op, err := computeService.Instances.Delete(projectID, zone, instName).Do()
if err != nil {
log.Printf("Failed to delete instance %q in zone %q: %v", instName, zone, err)
return
}
log.Printf("Sent request to delete instance %q in zone %q. Operation ID == %v", instName, zone, op.Id)
}
func hasComputeScope() bool {
if !metadata.OnGCE() {
return false
}
scopes, err := metadata.Scopes("default")
if err != nil {
log.Printf("failed to query metadata default scopes: %v", err)
return false
}
for _, v := range scopes {
if v == compute.ComputeScope {
return true
}
}
return false
}
// dash is copied from the builder binary. It runs the given method and command on the dashboard.
//
// TODO(bradfitz,adg): unify this somewhere?
//
// If args is non-nil it is encoded as the URL query string.
// If req is non-nil it is JSON-encoded and passed as the body of the HTTP POST.
// If resp is non-nil the server's response is decoded into the value pointed
// to by resp (resp must be a pointer).
func dash(meth, cmd string, args url.Values, req, resp interface{}) error {
const builderVersion = 1 // keep in sync with dashboard/app/build/handler.go
argsCopy := url.Values{"version": {fmt.Sprint(builderVersion)}}
for k, v := range args {
if k == "version" {
panic(`dash: reserved args key: "version"`)
}
argsCopy[k] = v
}
var r *http.Response
var err error
cmd = "https://build.golang.org/" + cmd + "?" + argsCopy.Encode()
switch meth {
case "GET":
if req != nil {
log.Panicf("%s to %s with req", meth, cmd)
}
r, err = http.Get(cmd)
case "POST":
var body io.Reader
if req != nil {
b, err := json.Marshal(req)
if err != nil {
return err
}
body = bytes.NewBuffer(b)
}
r, err = http.Post(cmd, "text/json", body)
default:
log.Panicf("%s: invalid method %q", cmd, meth)
panic("invalid method: " + meth)
}
if err != nil {
return err
}
defer r.Body.Close()
if r.StatusCode != http.StatusOK {
return fmt.Errorf("bad http response: %v", r.Status)
}
body := new(bytes.Buffer)
if _, err := body.ReadFrom(r.Body); err != nil {
return err
}
// Read JSON-encoded Response into provided resp
// and return an error if present.
var result = struct {
Response interface{}
Error string
}{
// Put the provided resp in here as it can be a pointer to
// some value we should unmarshal into.
Response: resp,
}
if err = json.Unmarshal(body.Bytes(), &result); err != nil {
log.Printf("json unmarshal %#q: %s\n", body.Bytes(), err)
return err
}
if result.Error != "" {
return errors.New(result.Error)
}
return nil
}
func versionTgz(rev string) io.Reader {
var buf bytes.Buffer
zw := gzip.NewWriter(&buf)
tw := tar.NewWriter(zw)
contents := fmt.Sprintf("devel " + rev)
check(tw.WriteHeader(&tar.Header{
Name: "VERSION",
Mode: 0644,
Size: int64(len(contents)),
}))
_, err := io.WriteString(tw, contents)
check(err)
check(tw.Close())
check(zw.Close())
return bytes.NewReader(buf.Bytes())
}
// check is only for things which should be impossible (not even rare)
// to fail.
func check(err error) {
if err != nil {
panic("previously assumed to never fail: " + err.Error())
}
}