cmd/coordinator, buildlet: reliability fixes
Add background heartbeats to detect dead GCE VMs (the OpenBSD
buildlets seem to hang a lot),
Add timeouts to test executions.
Take helper buildlets out of service once they're marked bad.
Keep the in-order buildlet running forever when sharding tests, in
case all the helpers die. (observed once)
Keep a cache of recently deleted VMs and don't try to delete VMs again
if we've recently deleted them. (they're slow to delete)
More reverse buildlets more paranoid in their health checking and closing
of the connection.
Make status page link to /try set URLs.
Also, better logging (more sometimes, less others0, and misc bug fixes.
Change-Id: I57a5e8e39381234006cac4dd799b655d64be71bb
Reviewed-on: https://go-review.googlesource.com/10981
Reviewed-by: Andrew Gerrand <adg@golang.org>
diff --git a/buildlet/buildletclient.go b/buildlet/buildletclient.go
index 1400eb1..7ca7b22 100644
--- a/buildlet/buildletclient.go
+++ b/buildlet/buildletclient.go
@@ -33,6 +33,7 @@
ipPort: ipPort,
tls: kp,
password: kp.Password(),
+ peerDead: make(chan struct{}),
httpClient: &http.Client{
Transport: &http.Transport{
Dial: defaultDialer(),
@@ -49,6 +50,7 @@
}
func (c *Client) Close() error {
+ c.setPeerDead(errors.New("Close called"))
var err error
if c.closeFunc != nil {
err = c.closeFunc()
@@ -57,6 +59,14 @@
return err
}
+// To be called only via c.setPeerDeadOnce.Do(s.setPeerDead)
+func (c *Client) setPeerDead(err error) {
+ c.setPeerDeadOnce.Do(func() {
+ c.deadErr = err
+ close(c.peerDead)
+ })
+}
+
// SetDescription sets a short description of where the buildlet
// connection came from. This is used by the build coordinator status
// page, mostly for debugging.
@@ -65,10 +75,21 @@
}
// SetHTTPClient replaces the underlying HTTP client.
+// It should only be called before the Client is used.
func (c *Client) SetHTTPClient(httpClient *http.Client) {
c.httpClient = httpClient
}
+// EnableHeartbeats enables background heartbeating
+// against the peer.
+// It should only be called before the Client is used.
+func (c *Client) EnableHeartbeats() {
+ // TODO(bradfitz): make this always enabled, once the
+ // reverse buildlet connection model supports
+ // multiple connections at once.
+ c.heartbeat = true
+}
+
// defaultDialer returns the net/http package's default Dial function.
// Notably, this sets TCP keep-alive values, so when we kill VMs
// (whose TCP stacks stop replying, forever), we don't leak file
@@ -86,10 +107,16 @@
tls KeyPair
password string // basic auth password or empty for none
httpClient *http.Client
+ heartbeat bool // whether to heartbeat in the background
closeFunc func() error
desc string
+ initHeartbeatOnce sync.Once
+ setPeerDeadOnce sync.Once
+ peerDead chan struct{} // closed on peer death
+ deadErr error // guarded by peerDead's close
+
mu sync.Mutex
broken bool // client is broken in some way
}
@@ -126,12 +153,41 @@
}
func (c *Client) do(req *http.Request) (*http.Response, error) {
+ c.initHeartbeatOnce.Do(c.initHeartbeats)
if c.password != "" {
req.SetBasicAuth("gomote", c.password)
}
return c.httpClient.Do(req)
}
+func (c *Client) initHeartbeats() {
+ if !c.heartbeat {
+ // TODO(bradfitz): make this always enabled later, once
+ // reverse buildlets are fixed.
+ return
+ }
+ go c.heartbeatLoop()
+}
+
+func (c *Client) heartbeatLoop() {
+ for {
+ select {
+ case <-c.peerDead:
+ // Already dead by something else.
+ // Most likely: c.Close was called.
+ return
+ case <-time.After(10 * time.Second):
+ t0 := time.Now()
+ if _, err := c.Status(); err != nil {
+ err := fmt.Errorf("Buildlet %v failed heartbeat after %v; marking dead; err=%v", c, time.Since(t0), err)
+ c.MarkBroken()
+ c.setPeerDead(err)
+ return
+ }
+ }
+ }
+}
+
var errHeaderTimeout = errors.New("timeout waiting for headers")
// doHeaderTimeout calls c.do(req) and returns its results, or
@@ -150,15 +206,20 @@
timer := time.NewTimer(max)
defer timer.Stop()
+ cleanup := func() {
+ if re := <-resErrc; re.res != nil {
+ re.res.Body.Close()
+ }
+ }
+
select {
case re := <-resErrc:
return re.res, re.err
+ case <-c.peerDead:
+ go cleanup()
+ return nil, c.deadErr
case <-timer.C:
- go func() {
- if re := <-resErrc; re.res != nil {
- res.Body.Close()
- }
- }()
+ go cleanup()
return nil, errHeaderTimeout
}
}
@@ -279,8 +340,13 @@
// response from the buildlet, but before the output begins
// writing to Output.
OnStartExec func()
+
+ // Timeout is an optional duration before ErrTimeout is returned.
+ Timeout time.Duration
}
+var ErrTimeout = errors.New("buildlet: timeout waiting for command to complete")
+
// Exec runs cmd on the buildlet.
//
// Two errors are returned: one is whether the command succeeded
@@ -315,9 +381,9 @@
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
// The first thing the buildlet's exec handler does is flush the headers, so
- // 5 seconds should be plenty of time, regardless of where on the planet
+ // 10 seconds should be plenty of time, regardless of where on the planet
// (Atlanta, Paris, etc) the reverse buildlet is:
- res, err := c.doHeaderTimeout(req, 5*time.Second)
+ res, err := c.doHeaderTimeout(req, 10*time.Second)
if err == errHeaderTimeout {
c.MarkBroken()
return nil, errors.New("buildlet: timeout waiting for exec header response")
@@ -332,28 +398,52 @@
}
condRun(opts.OnStartExec)
- // Stream the output:
- out := opts.Output
- if out == nil {
- out = ioutil.Discard
+ type errs struct {
+ remoteErr, execErr error
}
- if _, err := io.Copy(out, res.Body); err != nil {
- return nil, fmt.Errorf("error copying response: %v", err)
- }
+ resc := make(chan errs, 1)
+ go func() {
+ // Stream the output:
+ out := opts.Output
+ if out == nil {
+ out = ioutil.Discard
+ }
+ if _, err := io.Copy(out, res.Body); err != nil {
+ resc <- errs{execErr: fmt.Errorf("error copying response: %v", err)}
+ return
+ }
- // Don't record to the dashboard unless we heard the trailer from
- // the buildlet, otherwise it was probably some unrelated error
- // (like the VM being killed, or the buildlet crashing due to
- // e.g. https://golang.org/issue/9309, since we require a tip
- // build of the buildlet to get Trailers support)
- state := res.Trailer.Get("Process-State")
- if state == "" {
- return nil, errors.New("missing Process-State trailer from HTTP response; buildlet built with old (<= 1.4) Go?")
+ // Don't record to the dashboard unless we heard the trailer from
+ // the buildlet, otherwise it was probably some unrelated error
+ // (like the VM being killed, or the buildlet crashing due to
+ // e.g. https://golang.org/issue/9309, since we require a tip
+ // build of the buildlet to get Trailers support)
+ state := res.Trailer.Get("Process-State")
+ if state == "" {
+ resc <- errs{execErr: errors.New("missing Process-State trailer from HTTP response; buildlet built with old (<= 1.4) Go?")}
+ return
+ }
+ if state != "ok" {
+ resc <- errs{remoteErr: errors.New(state)}
+ } else {
+ resc <- errs{} // success
+ }
+ }()
+ var timer <-chan time.Time
+ if opts.Timeout > 0 {
+ t := time.NewTimer(opts.Timeout)
+ defer t.Stop()
+ timer = t.C
}
- if state != "ok" {
- return errors.New(state), nil
+ select {
+ case <-timer:
+ c.MarkBroken()
+ return nil, ErrTimeout
+ case res := <-resc:
+ return res.remoteErr, res.execErr
+ case <-c.peerDead:
+ return nil, c.deadErr
}
- return nil, nil
}
// Destroy shuts down the buildlet, destroying all state immediately.
@@ -437,7 +527,7 @@
if err != nil {
return Status{}, err
}
- resp, err := c.do(req)
+ resp, err := c.doHeaderTimeout(req, 10*time.Second) // plenty of time
if err != nil {
return Status{}, err
}
@@ -462,7 +552,7 @@
if err != nil {
return "", err
}
- resp, err := c.do(req)
+ resp, err := c.doHeaderTimeout(req, 10*time.Second) // plenty of time
if err != nil {
return "", err
}
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index 01345e5..24cf217 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -115,12 +115,13 @@
"plan9-386",
"nacl-386",
"nacl-amd64p32",
- "linux-arm-shard_test",
- "linux-arm-shard_std_am",
- "linux-arm-shard_std_nz",
- "linux-arm-shard_runtimecpu",
- "linux-arm-shard_cgotest",
- "linux-arm-shard_misc",
+ /* "linux-arm-shard_test",
+ "linux-arm-shard_std_am",
+ "linux-arm-shard_std_nz",
+ "linux-arm-shard_runtimecpu",
+ "linux-arm-shard_cgotest",
+ "linux-arm-shard_misc",
+ */
}
for _, bname := range tryList {
conf, ok := dashboard.Builders[bname]
@@ -512,8 +513,11 @@
w.Header().Set("X-Content-Type-Options", "nosniff")
writeStatusHeader(w, st)
- if r.FormValue("nostream") != "" {
- fmt.Fprintf(w, "\n\n(no live streaming. reload manually to see status)\n")
+ nostream := r.FormValue("nostream") != ""
+ if nostream || !st.isRunning() {
+ if nostream {
+ fmt.Fprintf(w, "\n\n(live streaming disabled; reload manually to see status)\n")
+ }
st.mu.Lock()
defer st.mu.Unlock()
w.Write(st.output.Bytes())
@@ -1268,6 +1272,13 @@
doneMsg := "all tests passed"
if remoteErr != nil {
doneMsg = "with test failures"
+ } else if err != nil {
+ doneMsg = "comm error: " + err.Error()
+ }
+ if err != nil {
+ // Return the error *before* we create the magic
+ // "done" event. (which the try coordinator looks for)
+ return err
}
st.logEventTime("done", doneMsg) // "done" is a magic value
@@ -1857,7 +1868,7 @@
// runTests is only called for builders which support a split make/run
// (should be everything, at least soon). Currently (2015-05-27) iOS
-// and Android and Nacl may not. Untested.
+// and Android and Nacl do not.
func (st *buildStatus) runTests(helpers <-chan *buildlet.Client) (remoteErr, err error) {
testNames, err := st.distTestList()
if err != nil {
@@ -1875,8 +1886,12 @@
for {
tis, ok := set.testsToRunInOrder()
if !ok {
- st.logEventTime("in_order_tests_complete")
- return
+ select {
+ case <-st.donec:
+ return
+ case <-time.After(5 * time.Second):
+ }
+ continue
}
goroot := "" // no need to override; main buildlet's GOROOT is baked into binaries
st.runTestsOnBuildlet(st.bc, tis, goroot)
@@ -1892,7 +1907,7 @@
defer time.Sleep(5 * time.Minute)
defer st.logEventTime("DEV_HELPER_SLEEP", bc.IPPort())
}
- st.logEventTime("got_helper", bc.IPPort())
+ st.logEventTime("got_helper", bc.String())
if err := bc.PutTarFromURL(st.snapshotURL(), "go"); err != nil {
log.Printf("failed to extract snapshot for helper %s: %v", bc.IPPort(), err)
return
@@ -1902,9 +1917,9 @@
log.Printf("error discovering workdir for helper %s: %v", bc.IPPort(), err)
return
}
- st.logEventTime("setup_helper", bc.IPPort())
+ st.logEventTime("setup_helper", bc.String())
goroot := st.conf.FilePathJoin(workDir, "go")
- for {
+ for !bc.IsBroken() {
tis, ok := set.testsToRunBiggestFirst()
if !ok {
st.logEventTime("biggest_tests_complete", bc.IPPort())
@@ -1950,8 +1965,14 @@
return fmt.Errorf("dist test failed: %s: %v", ti.name, ti.remoteErr), nil
}
}
- shardedDuration := time.Since(startTime)
- st.logEventTime("tests_complete", fmt.Sprintf("took %v; aggregate %v; saved %v", shardedDuration, serialDuration, serialDuration-shardedDuration))
+ elapsed := time.Since(startTime)
+ var msg string
+ if st.conf.NumTestHelpers > 0 {
+ msg = fmt.Sprintf("took %v; aggregate %v; saved %v", elapsed, serialDuration, serialDuration-elapsed)
+ } else {
+ msg = fmt.Sprintf("took %v", elapsed)
+ }
+ st.logEventTime("tests_complete", msg)
fmt.Fprintf(st, "\nAll tests passed.\n")
return nil, nil
}
@@ -1982,6 +2003,11 @@
// (A communication error)
const maxTestExecErrors = 3
+func execTimeout(testNames []string) time.Duration {
+ // TODO(bradfitz): something smarter probably.
+ return 10 * time.Minute
+}
+
// runTestsOnBuildlet runs tis on bc, using the optional goroot environment variable.
func (st *buildStatus) runTestsOnBuildlet(bc *buildlet.Client, tis []*testItem, goroot string) {
names := make([]string, len(tis))
@@ -1994,19 +2020,6 @@
which := fmt.Sprintf("%s: %v", bc.IPPort(), names)
st.logEventTime("start_tests", which)
- // TODO(bradfitz,adg): a few weeks after
- // https://go-review.googlesource.com/10688 is submitted,
- // around Jun 18th 2015, remove this innerRx stuff and just
- // pass a list of test names to dist instead. We don't want to
- // do it right away, so people don't have to rebase their CLs
- // to avoid trybot failures.
- var innerRx string
- if len(tis) > 1 {
- innerRx = "(" + strings.Join(names, "|") + ")"
- } else {
- innerRx = names[0]
- }
-
var buf bytes.Buffer
t0 := time.Now()
remoteErr, err := bc.Exec(path.Join("go", "bin", "go"), buildlet.ExecOpts{
@@ -2019,8 +2032,11 @@
Dir: ".",
Output: &buf, // see "maybe stream lines" TODO below
ExtraEnv: append(st.conf.Env(), "GOROOT="+goroot),
+ Timeout: execTimeout(names),
Path: []string{"$WORKDIR/go/bin", "$PATH"},
- Args: []string{"tool", "dist", "test", "--no-rebuild", "--banner=" + banner, "--run=^" + innerRx + "$"},
+ Args: append([]string{
+ "tool", "dist", "test", "--no-rebuild", "--banner=" + banner,
+ }, names...),
})
summary := "ok"
if err != nil {
@@ -2029,7 +2045,7 @@
summary = "test failed remotely"
}
execDuration := time.Since(t0)
- st.logEventTime("end_tests", fmt.Sprintf("%s; %s after %v", which, summary, execDuration))
+ st.logEventTime("end_tests", fmt.Sprintf("%s; %s (test exec = %v)", which, summary, execDuration))
if err != nil {
for _, ti := range tis {
ti.numFail++
@@ -2306,26 +2322,27 @@
var buf bytes.Buffer
fmt.Fprintf(&buf, "<a href='https://github.com/golang/go/wiki/DashboardBuilders'>%s</a> rev <a href='%s%s'>%s</a>",
- st.name, urlPrefix, st.rev, st.rev)
+ st.name, urlPrefix, st.rev, st.rev[:8])
if st.isSubrepo() {
fmt.Fprintf(&buf, " (sub-repo %s rev <a href='%s%s'>%s</a>)",
- st.subName, urlPrefix, st.subRev, st.subRev)
+ st.subName, urlPrefix, st.subRev, st.subRev[:8])
}
if ts := st.trySet; ts != nil {
- fmt.Fprintf(&buf, " (trying <a href='https://go-review.googlesource.com/#/q/%s'>%s</a>)",
+ fmt.Fprintf(&buf, " (<a href='/try?commit=%v'>trybot set</a> for <a href='https://go-review.googlesource.com/#/q/%s'>%s</a>)",
+ ts.Commit[:8],
ts.ChangeID, ts.ChangeID[:8])
}
if st.done.IsZero() {
buf.WriteString(", running")
+ fmt.Fprintf(&buf, "; <a href='%s'>build log</a>; %s", st.logsURLLocked(), html.EscapeString(st.bc.String()))
} else if st.succeeded {
buf.WriteString(", succeeded")
} else {
buf.WriteString(", failed")
+ fmt.Fprintf(&buf, "; <a href='%s'>build log</a>; %s", st.logsURLLocked(), html.EscapeString(st.bc.String()))
}
- fmt.Fprintf(&buf, "; <a href='%s'>build log</a>; %s", st.logsURLLocked(), html.EscapeString(st.bc.String()))
-
t := st.done
if t.IsZero() {
t = st.startTime
diff --git a/cmd/coordinator/gce.go b/cmd/coordinator/gce.go
index 6b73ad0..3ca7004 100644
--- a/cmd/coordinator/gce.go
+++ b/cmd/coordinator/gce.go
@@ -23,6 +23,7 @@
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/gerrit"
+ "golang.org/x/build/internal/lru"
"golang.org/x/net/context"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
@@ -252,6 +253,7 @@
p.setInstanceUsed(instName, false)
return nil, err
}
+ bc.EnableHeartbeats()
bc.SetDescription("GCE VM: " + instName)
bc.SetCloseFunc(func() error { return p.putBuildlet(bc, typ, instName) })
return bc, nil
@@ -461,18 +463,23 @@
// Gophers for interactive development & debugging
// (non-builder users); those are named "mote-*".
if sawDeleteAt && strings.HasPrefix(inst.Name, "buildlet-") && !p.instanceUsed(inst.Name) {
- log.Printf("Deleting VM %q in zone %q from an earlier coordinator generation ...", inst.Name, zone)
- deleteVM(zone, inst.Name)
+ if _, ok := deletedVMCache.Get(inst.Name); !ok {
+ log.Printf("Deleting VM %q in zone %q from an earlier coordinator generation ...", inst.Name, zone)
+ deleteVM(zone, inst.Name)
+ }
}
}
return nil
}
+var deletedVMCache = lru.New(100) // keyed by instName
+
// deleteVM starts a delete of an instance in a given zone.
//
// It either returns an operation name (if delete is pending) or the
// empty string if the instance didn't exist.
func deleteVM(zone, instName string) (operation string, err error) {
+ deletedVMCache.Add(instName, token{})
gceAPIGate()
op, err := computeService.Instances.Delete(projectID, zone, instName).Do()
apiErr, ok := err.(*googleapi.Error)
diff --git a/cmd/coordinator/reverse.go b/cmd/coordinator/reverse.go
index 01a85e5..668edb5 100644
--- a/cmd/coordinator/reverse.go
+++ b/cmd/coordinator/reverse.go
@@ -29,6 +29,7 @@
import (
"bufio"
+ "bytes"
"errors"
"fmt"
"io"
@@ -36,6 +37,7 @@
"net"
"net/http"
"sort"
+ "strings"
"sync"
"time"
@@ -51,7 +53,7 @@
func init() {
go func() {
for {
- time.Sleep(5 * time.Second)
+ time.Sleep(15 * time.Second)
reversePool.reverseHealthCheck()
}
}()
@@ -84,14 +86,13 @@
if usable && b.inUseAs == "" {
// Found an unused match.
b.inUseAs = machineType
+ b.inUseTime = time.Now()
b.client.SetCloseFunc(func() error {
p.mu.Lock()
b.inUseAs = ""
+ b.inUseTime = time.Now()
p.mu.Unlock()
- select {
- case p.buildletReturned <- token{}:
- default:
- }
+ p.noteBuildletReturned()
return nil
})
return b.client, nil
@@ -103,6 +104,13 @@
return nil, errInUse
}
+func (p *reverseBuildletPool) noteBuildletReturned() {
+ select {
+ case p.buildletReturned <- token{}:
+ default:
+ }
+}
+
// nukeBuildlet wipes out victim as a buildlet we'll ever return again,
// and closes its TCP connection in hopes that it will fix itself
// later.
@@ -131,6 +139,7 @@
continue // skip busy buildlets
}
b.inUseAs = "health"
+ b.inUseTime = time.Now()
res := make(chan error, 1)
responses[b] = res
client := b.client
@@ -152,6 +161,8 @@
continue
}
b.inUseAs = ""
+ b.inUseTime = time.Now()
+ p.noteBuildletReturned()
var err error
select {
case err = <-res:
@@ -220,8 +231,15 @@
inUse := make(map[string]int)
inUseOther := make(map[string]int)
+ var machineBuf bytes.Buffer
p.mu.Lock()
for _, b := range p.buildlets {
+ machStatus := "<i>idle</i>"
+ if b.inUseAs != "" {
+ machStatus = "working as <b>" + b.inUseAs + "</b>"
+ }
+ fmt.Fprintf(&machineBuf, "<li>%s, %s: %s for %v</li>\n",
+ b.conn.RemoteAddr(), strings.Join(b.modes, ", "), machStatus, time.Since(b.inUseTime))
for _, mode := range b.modes {
if b.inUseAs != "" && b.inUseAs != "health" {
if mode == b.inUseAs {
@@ -241,7 +259,7 @@
}
sort.Strings(modes)
- io.WriteString(w, "<b>Reverse pool</b><ul>")
+ io.WriteString(w, "<b>Reverse pool summary</b><ul>")
if len(modes) == 0 {
io.WriteString(w, "<li>no connections</li>")
}
@@ -254,6 +272,8 @@
}
}
io.WriteString(w, "</ul>")
+
+ fmt.Fprintf(w, "<b>Reverse pool machine detail</b><ul>%s</ul>", machineBuf.Bytes())
}
func (p *reverseBuildletPool) String() string {
@@ -320,9 +340,10 @@
modes []string
// inUseAs signifies that the buildlet is in use as the named mode.
- // guarded by mutex on reverseBuildletPool.
- inUseAs string
- // TODO: inUseTime time.Time + more HTML status
+ // inUseTime is when it entered that state.
+ // Both are guarded by the mutex on reverseBuildletPool.
+ inUseAs string
+ inUseTime time.Time
}
func handleReverse(w http.ResponseWriter, r *http.Request) {
@@ -349,18 +370,20 @@
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
- log.Printf("Registering reverse buildlet %s", r.RemoteAddr)
+ log.Printf("Registering reverse buildlet %s for modes %v", r.RemoteAddr, modes)
// The server becomes a (very simple) http client.
(&http.Response{StatusCode: 200, Proto: "HTTP/1.1"}).Write(conn)
client := buildlet.NewClient("none", buildlet.NoKeyPair)
client.SetHTTPClient(&http.Client{
- Transport: newRoundTripper(bufrw),
+ Transport: newRoundTripper(conn, bufrw),
})
+ client.SetDescription(fmt.Sprintf("reverse peer %s for modes %v", r.RemoteAddr, modes))
+ tstatus := time.Now()
status, err := client.Status()
if err != nil {
- log.Printf("Reverse connection did not answer status: %v", err)
+ log.Printf("Reverse connection %s for modes %v did not answer status after %v: %v", r.RemoteAddr, modes, time.Since(tstatus), err)
conn.Close()
return
}
@@ -376,9 +399,10 @@
reversePool.mu.Lock()
defer reversePool.mu.Unlock()
b := &reverseBuildlet{
- modes: modes,
- client: client,
- conn: conn,
+ modes: modes,
+ client: client,
+ conn: conn,
+ inUseTime: time.Now(),
}
reversePool.buildlets = append(reversePool.buildlets, b)
registerBuildlet(modes)
@@ -386,8 +410,9 @@
var registerBuildlet = func(modes []string) {} // test hook
-func newRoundTripper(bufrw *bufio.ReadWriter) *reverseRoundTripper {
+func newRoundTripper(conn net.Conn, bufrw *bufio.ReadWriter) *reverseRoundTripper {
return &reverseRoundTripper{
+ conn: conn,
bufrw: bufrw,
sema: make(chan bool, 1),
}
@@ -398,6 +423,7 @@
//
// Attempts at concurrent requests return an error.
type reverseRoundTripper struct {
+ conn net.Conn
bufrw *bufio.ReadWriter
sema chan bool
}
@@ -406,29 +432,37 @@
// Serialize trips. It is up to callers to avoid deadlocking.
c.sema <- true
if err := req.Write(c.bufrw); err != nil {
+ go c.conn.Close()
<-c.sema
return nil, err
}
if err := c.bufrw.Flush(); err != nil {
+ go c.conn.Close()
<-c.sema
return nil, err
}
resp, err = http.ReadResponse(c.bufrw.Reader, req)
if err != nil {
+ go c.conn.Close()
<-c.sema
return nil, err
}
- resp.Body = &reverseLockedBody{resp.Body, c.sema}
+ resp.Body = &reverseLockedBody{c, resp.Body, c.sema}
return resp, err
}
type reverseLockedBody struct {
+ rt *reverseRoundTripper
body io.ReadCloser
sema chan bool
}
func (b *reverseLockedBody) Read(p []byte) (n int, err error) {
- return b.body.Read(p)
+ n, err = b.body.Read(p)
+ if err != nil && err != io.EOF {
+ go b.rt.conn.Close()
+ }
+ return
}
func (b *reverseLockedBody) Close() error {
diff --git a/cmd/coordinator/status.go b/cmd/coordinator/status.go
index c5c0d88..4a59d26 100644
--- a/cmd/coordinator/status.go
+++ b/cmd/coordinator/status.go
@@ -37,7 +37,8 @@
for _, key := range tryList {
if ts := tries[key]; ts != nil {
state := ts.state()
- fmt.Fprintf(&buf, "Change-ID: %v Commit: %v\n", key.ChangeID, key.Commit)
+ fmt.Fprintf(&buf, "Change-ID: %v Commit: %v (<a href='/try?commit=%v'>status</a>)\n",
+ key.ChangeID, key.Commit, key.Commit[:8])
fmt.Fprintf(&buf, " Remain: %d, fails: %v\n", state.remain, state.failed)
for _, bs := range ts.builds {
fmt.Fprintf(&buf, " %s: running=%v\n", bs.name, bs.isRunning())
@@ -51,7 +52,7 @@
if errTryDeps != nil {
data.TrybotsErr = errTryDeps.Error()
} else {
- data.Trybots = buf.String()
+ data.Trybots = template.HTML(buf.String())
}
buf.Reset()
@@ -81,7 +82,7 @@
Active []*buildStatus
Recent []*buildStatus
TrybotsErr string
- Trybots string
+ Trybots template.HTML
GCEPoolStatus template.HTML // TODO: embed template
ReversePoolStatus template.HTML // TODO: embed template
DiskFree string