blob: beba59a00e0473d537bf17fa7f4818cd73ab7e80 [file] [log] [blame]
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// +build linux darwin
package main
/*
This file implements reverse buildlets. These are buildlets that are not
started by the coordinator. They dial the coordinator and then accept
instructions. This feature is used for machines that cannot be started by
an API, for example real OS X machines with iOS and Android devices attached.
You can test this setup locally. In one terminal start a coordinator.
It will default to dev mode, using a dummy TLS cert and not talking to GCE.
$ coordinator
In another terminal, start a reverse buildlet:
$ buildlet -reverse "darwin-amd64"
It will dial and register itself with the coordinator. To confirm the
coordinator can see the buildlet, check the logs output or visit its
diagnostics page: https://localhost:8119. To send the buildlet some
work, go to:
https://localhost:8119/dosomework
*/
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"math/rand"
"net"
"net/http"
"sort"
"sync"
"time"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/revdial"
revdialv2 "golang.org/x/build/revdial/v2"
"golang.org/x/build/types"
)
const minBuildletVersion = 1
var reversePool = &reverseBuildletPool{
oldInUse: make(map[*buildlet.Client]bool),
}
const maxOldRevdialUsers = 10
type token struct{}
type reverseBuildletPool struct {
// mu guards all 4 fields below and also fields of
// *reverseBuildlet in buildlets
mu sync.Mutex
// buildlets are the currently connected buildlets.
// TODO: switch to a map[hostType][]buildlets or map of set.
buildlets []*reverseBuildlet
wakeChan map[string]chan token // hostType => best-effort wake-up chan when buildlet free
waiters map[string]int // hostType => number waiters blocked in GetBuildlet
// oldInUse tracks which buildlets with the old revdial code are currently in use.
// These are a liability due to runaway memory issues (Issue 31639) so
// we bound how many can be running at once. Fortunately there aren't many left.
oldInUse map[*buildlet.Client]bool
}
func (p *reverseBuildletPool) ServeReverseStatusJSON(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
status := p.buildReverseStatusJSON()
j, _ := json.MarshalIndent(status, "", "\t")
w.Write(j)
}
func (p *reverseBuildletPool) buildReverseStatusJSON() *types.ReverseBuilderStatus {
status := &types.ReverseBuilderStatus{}
p.mu.Lock()
defer p.mu.Unlock()
for _, b := range p.buildlets {
hs := status.Host(b.hostType)
if hs.Machines == nil {
hs.Machines = make(map[string]*types.ReverseBuilder)
}
hs.Connected++
bs := &types.ReverseBuilder{
Name: b.hostname,
HostType: b.hostType,
ConnectedSec: time.Since(b.regTime).Seconds(),
Version: b.version,
}
if b.inUse && !b.inHealthCheck {
hs.Busy++
bs.Busy = true
bs.BusySec = time.Since(b.inUseTime).Seconds()
} else {
hs.Idle++
bs.IdleSec = time.Since(b.inUseTime).Seconds()
}
hs.Machines[b.hostname] = bs
}
for hostType, waiters := range p.waiters {
status.Host(hostType).Waiters = waiters
}
for hostType, hc := range dashboard.Hosts {
if hc.ExpectNum > 0 {
status.Host(hostType).Expect = hc.ExpectNum
}
}
return status
}
// tryToGrab returns non-nil bc on success if a buildlet is free.
//
// Otherwise it returns how many were busy, which might be 0 if none
// were (yet?) registered. The busy valid is only valid if bc == nil.
func (p *reverseBuildletPool) tryToGrab(hostType string) (bc *buildlet.Client, busy int) {
p.mu.Lock()
defer p.mu.Unlock()
for _, b := range p.buildlets {
if b.hostType != hostType {
continue
}
if b.inUse {
busy++
continue
}
if b.isOldRevDial && len(p.oldInUse) >= maxOldRevdialUsers {
continue
}
// Found an unused match.
b.inUse = true
b.inUseTime = time.Now()
if b.isOldRevDial {
p.oldInUse[b.client] = true
}
return b.client, 0
}
return nil, busy
}
func (p *reverseBuildletPool) getWakeChan(hostType string) chan token {
p.mu.Lock()
defer p.mu.Unlock()
if p.wakeChan == nil {
p.wakeChan = make(map[string]chan token)
}
c, ok := p.wakeChan[hostType]
if !ok {
c = make(chan token)
p.wakeChan[hostType] = c
}
return c
}
func (p *reverseBuildletPool) noteBuildletAvailable(hostType string) {
wake := p.getWakeChan(hostType)
select {
case wake <- token{}:
default:
}
}
// nukeBuildlet wipes out victim as a buildlet we'll ever return again,
// and closes its TCP connection in hopes that it will fix itself
// later.
func (p *reverseBuildletPool) nukeBuildlet(victim *buildlet.Client) {
p.mu.Lock()
defer p.mu.Unlock()
delete(p.oldInUse, victim)
for i, rb := range p.buildlets {
if rb.client == victim {
defer rb.conn.Close()
p.buildlets = append(p.buildlets[:i], p.buildlets[i+1:]...)
return
}
}
}
// healthCheckBuildletLoop periodically requests the status from b.
// If the buildlet fails to respond promptly, it is removed from the pool.
func (p *reverseBuildletPool) healthCheckBuildletLoop(b *reverseBuildlet) {
for {
time.Sleep(time.Duration(10+rand.Intn(5)) * time.Second)
if !p.healthCheckBuildlet(b) {
return
}
}
}
func (p *reverseBuildletPool) healthCheckBuildlet(b *reverseBuildlet) bool {
if b.client.IsBroken() {
return false
}
p.mu.Lock()
if b.inHealthCheck { // sanity check
panic("previous health check still running")
}
if b.inUse {
p.mu.Unlock()
return true // skip busy buildlets
}
b.inUse = true
b.inHealthCheck = true
b.inUseTime = time.Now()
res := make(chan error, 1)
go func() {
_, err := b.client.Status()
res <- err
}()
p.mu.Unlock()
t := time.NewTimer(5 * time.Second) // give buildlets time to respond
var err error
select {
case err = <-res:
t.Stop()
case <-t.C:
err = errors.New("health check timeout")
}
if err != nil {
// remove bad buildlet
log.Printf("Health check fail; removing reverse buildlet %v (type %v): %v", b.hostname, b.hostType, err)
go b.client.Close()
go p.nukeBuildlet(b.client)
return false
}
p.mu.Lock()
defer p.mu.Unlock()
if !b.inHealthCheck {
// buildlet was grabbed while lock was released; harmless.
return true
}
b.inUse = false
b.inHealthCheck = false
b.inUseTime = time.Now()
go p.noteBuildletAvailable(b.hostType)
return true
}
var (
highPriorityBuildletMu sync.Mutex
highPriorityBuildlet = make(map[string]chan *buildlet.Client)
)
func highPriChan(hostType string) chan *buildlet.Client {
highPriorityBuildletMu.Lock()
defer highPriorityBuildletMu.Unlock()
if c, ok := highPriorityBuildlet[hostType]; ok {
return c
}
c := make(chan *buildlet.Client)
highPriorityBuildlet[hostType] = c
return c
}
func (p *reverseBuildletPool) updateWaiterCounter(hostType string, delta int) {
p.mu.Lock()
defer p.mu.Unlock()
if p.waiters == nil {
p.waiters = make(map[string]int)
}
p.waiters[hostType] += delta
}
func (p *reverseBuildletPool) HasCapacity(hostType string) bool {
p.mu.Lock()
defer p.mu.Unlock()
for _, b := range p.buildlets {
if b.hostType != hostType {
continue
}
if b.inUse {
continue
}
return true
}
return false
}
func (p *reverseBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg logger) (*buildlet.Client, error) {
p.updateWaiterCounter(hostType, 1)
defer p.updateWaiterCounter(hostType, -1)
seenErrInUse := false
isHighPriority, _ := ctx.Value(highPriorityOpt{}).(bool)
sp := lg.CreateSpan("wait_static_builder", hostType)
for {
bc, busy := p.tryToGrab(hostType)
if bc != nil {
select {
case highPriChan(hostType) <- bc:
// Somebody else was more important.
default:
sp.Done(nil)
return p.cleanedBuildlet(bc, lg)
}
}
if busy > 0 && !seenErrInUse {
lg.LogEventTime("waiting_machine_in_use")
seenErrInUse = true
}
var highPri chan *buildlet.Client
if isHighPriority {
highPri = highPriChan(hostType)
}
select {
case <-ctx.Done():
return nil, sp.Done(ctx.Err())
case bc := <-highPri:
sp.Done(nil)
return p.cleanedBuildlet(bc, lg)
case <-time.After(10 * time.Second):
// As multiple goroutines can be listening for
// the available signal, it must be treated as
// a best effort signal. So periodically try
// to grab a buildlet again.
case <-p.getWakeChan(hostType):
}
}
}
func (p *reverseBuildletPool) cleanedBuildlet(b *buildlet.Client, lg logger) (*buildlet.Client, error) {
// Clean up any files from previous builds.
sp := lg.CreateSpan("clean_buildlet", b.String())
err := b.RemoveAll(".")
sp.Done(err)
if err != nil {
b.Close()
return nil, err
}
return b, nil
}
func (p *reverseBuildletPool) WriteHTMLStatus(w io.Writer) {
// total maps from a host type to the number of machines which are
// capable of that role.
total := make(map[string]int)
for typ, host := range dashboard.Hosts {
if host.ExpectNum > 0 {
total[typ] = 0
}
}
// inUse track the number of non-idle host types.
inUse := make(map[string]int)
var buf bytes.Buffer
p.mu.Lock()
buildlets := append([]*reverseBuildlet(nil), p.buildlets...)
sort.Sort(byTypeThenHostname(buildlets))
numInUse := 0
for _, b := range buildlets {
machStatus := "<i>idle</i>"
if b.inUse {
machStatus = "working"
numInUse++
}
fmt.Fprintf(&buf, "<li>%s (%s) version %s, %s: connected %s, %s for %s</li>\n",
b.hostname,
b.conn.RemoteAddr(),
b.version,
b.hostType,
friendlyDuration(time.Since(b.regTime)),
machStatus,
friendlyDuration(time.Since(b.inUseTime)))
total[b.hostType]++
if b.inUse && !b.inHealthCheck {
inUse[b.hostType]++
}
}
numOldInUse := len(p.oldInUse)
numConnected := len(buildlets)
p.mu.Unlock()
var typs []string
for typ := range total {
typs = append(typs, typ)
}
sort.Strings(typs)
io.WriteString(w, "<b>Reverse pool stats</b><ul>")
fmt.Fprintf(w, "<li>Buildlets connected: %d</li>\n", numConnected)
fmt.Fprintf(w, "<li>Buildlets in use: %d</li>\n", numInUse)
fmt.Fprintf(w, "<li>Old revdial buildlets in use: %d</li>\n", numOldInUse)
io.WriteString(w, "</ul>")
io.WriteString(w, "<b>Reverse pool by host type</b> (in use / total)<ul>")
if len(typs) == 0 {
io.WriteString(w, "<li>no connections</li>")
}
for _, typ := range typs {
if dashboard.Hosts[typ] != nil && total[typ] < dashboard.Hosts[typ].ExpectNum {
fmt.Fprintf(w, "<li>%s: %d/%d (%d missing)</li>",
typ, inUse[typ], total[typ], dashboard.Hosts[typ].ExpectNum-total[typ])
} else {
fmt.Fprintf(w, "<li>%s: %d/%d</li>", typ, inUse[typ], total[typ])
}
}
io.WriteString(w, "</ul>")
fmt.Fprintf(w, "<b>Reverse pool machine detail</b><ul>%s</ul>", buf.Bytes())
}
// hostTypeCount iterates through the running reverse buildlets, and
// constructs a count of running buildlets per hostType.
func (p *reverseBuildletPool) hostTypeCount() map[string]int {
total := map[string]int{}
p.mu.Lock()
for _, b := range p.buildlets {
total[b.hostType]++
}
p.mu.Unlock()
return total
}
func (p *reverseBuildletPool) String() string {
// This doesn't currently show up anywhere, so ignore it for now.
return "TODO: some reverse buildlet summary"
}
// HostTypes returns the a deduplicated list of buildlet types curently supported
// by the pool.
func (p *reverseBuildletPool) HostTypes() (types []string) {
s := make(map[string]bool)
p.mu.Lock()
for _, b := range p.buildlets {
s[b.hostType] = true
}
p.mu.Unlock()
for t := range s {
types = append(types, t)
}
sort.Strings(types)
return types
}
// CanBuild reports whether the pool has a machine capable of building mode,
// even if said machine isn't currently idle.
func (p *reverseBuildletPool) CanBuild(hostType string) bool {
p.mu.Lock()
defer p.mu.Unlock()
for _, b := range p.buildlets {
if b.hostType == hostType {
return true
}
}
return false
}
func (p *reverseBuildletPool) addBuildlet(b *reverseBuildlet) {
p.mu.Lock()
defer p.noteBuildletAvailable(b.hostType)
defer p.mu.Unlock()
p.buildlets = append(p.buildlets, b)
go p.healthCheckBuildletLoop(b)
}
// reverseBuildlet is a registered reverse buildlet.
// Its immediate fields are guarded by the reverseBuildletPool mutex.
type reverseBuildlet struct {
// hostname is the name of the buildlet host.
// It doesn't have to be a complete DNS name.
hostname string
// version is the reverse buildlet's version.
version string
isOldRevDial bool // version 22 or under: using the v1 revdial package (Issue 31639)
// sessRand is the unique random number for every unique buildlet session.
sessRand string
client *buildlet.Client
conn net.Conn
regTime time.Time // when it was first connected
// hostType is the configuration of this machine.
// It is the key into the dashboard.Hosts map.
hostType string
// inUseAs signifies that the buildlet is in use.
// inUseTime is when it entered that state.
// inHealthCheck is whether it's inUse due to a health check.
// All three are guarded by the mutex on reverseBuildletPool.
inUse bool
inUseTime time.Time
inHealthCheck bool
}
func handleReverse(w http.ResponseWriter, r *http.Request) {
if r.TLS == nil {
http.Error(w, "buildlet registration requires SSL", http.StatusInternalServerError)
return
}
// Check build keys.
// modes can be either 1 buildlet type (new way) or builder mode(s) (the old way)
hostType := r.Header.Get("X-Go-Host-Type")
modes := r.Header["X-Go-Builder-Type"] // old way
gobuildkeys := r.Header["X-Go-Builder-Key"]
buildletVersion := r.Header.Get("X-Go-Builder-Version")
revDialVersion := r.Header.Get("X-Revdial-Version")
switch revDialVersion {
case "":
// Old.
revDialVersion = "1"
case "2":
// Current.
default:
http.Error(w, "unknown revdial version", http.StatusBadRequest)
return
}
// Convert the new argument style (X-Go-Host-Type) into the
// old way, to minimize changes in the rest of this code.
if hostType != "" {
if len(modes) > 0 {
http.Error(w, "invalid mix of X-Go-Host-Type and X-Go-Builder-Type", http.StatusBadRequest)
return
}
modes = []string{hostType}
}
if len(modes) == 0 || len(modes) != len(gobuildkeys) {
http.Error(w, fmt.Sprintf("need at least one mode and matching key, got %d/%d", len(modes), len(gobuildkeys)), http.StatusPreconditionFailed)
return
}
hostname := r.Header.Get("X-Go-Builder-Hostname")
for i, m := range modes {
if gobuildkeys[i] != builderKey(m) {
http.Error(w, fmt.Sprintf("bad key for mode %q", m), http.StatusPreconditionFailed)
return
}
}
// For older builders using the buildlet's -reverse flag only,
// collapse their builder modes down into a singular hostType.
legacyNote := ""
if hostType == "" {
hostType = mapBuilderToHostType(modes)
legacyNote = fmt.Sprintf(" (mapped from legacy modes %q)", modes)
}
conn, bufrw, err := w.(http.Hijacker).Hijack()
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if err := (&http.Response{StatusCode: http.StatusSwitchingProtocols, Proto: "HTTP/1.1"}).Write(conn); err != nil {
log.Printf("error writing upgrade response to reverse buildlet %s (%s) at %s: %v", hostname, hostType, r.RemoteAddr, err)
conn.Close()
return
}
log.Printf("Registering reverse buildlet %q (%s) for host type %v %s; buildletVersion=%v; revDialVersion=%v",
hostname, r.RemoteAddr, hostType, legacyNote, buildletVersion, revDialVersion)
var dialer func(context.Context) (net.Conn, error)
var revDialerDone <-chan struct{}
switch revDialVersion {
case "1":
revDialer := revdial.NewDialer(bufrw, conn)
revDialerDone = revDialer.Done()
dialer = func(ctx context.Context) (net.Conn, error) {
// ignoring context.
return revDialer.Dial()
}
case "2":
revDialer := revdialv2.NewDialer(conn, "/revdial")
revDialerDone = revDialer.Done()
dialer = revDialer.Dial
}
client := buildlet.NewClient(hostname, buildlet.NoKeyPair)
client.SetHTTPClient(&http.Client{
Transport: &http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
return dialer(ctx)
},
},
})
client.SetDialer(dialer)
client.SetDescription(fmt.Sprintf("reverse peer %s/%s for host type %v", hostname, r.RemoteAddr, hostType))
var isDead struct {
sync.Mutex
v bool
}
client.SetOnHeartbeatFailure(func() {
isDead.Lock()
isDead.v = true
isDead.Unlock()
conn.Close()
reversePool.nukeBuildlet(client)
})
// If the reverse dialer (which is always reading from the
// conn) detects that the remote went away, close the buildlet
// client proactively show
go func() {
<-revDialerDone
isDead.Lock()
defer isDead.Unlock()
if !isDead.v {
client.Close()
}
}()
tstatus := time.Now()
status, err := client.Status()
if err != nil {
log.Printf("Reverse connection %s/%s for modes %v did not answer status after %v: %v",
hostname, r.RemoteAddr, modes, time.Since(tstatus), err)
conn.Close()
return
}
if status.Version < minBuildletVersion {
log.Printf("Buildlet too old: %s, %+v", r.RemoteAddr, status)
conn.Close()
return
}
log.Printf("Buildlet %s/%s: %+v for %s", hostname, r.RemoteAddr, status, modes)
now := time.Now()
b := &reverseBuildlet{
hostname: hostname,
version: buildletVersion,
isOldRevDial: status.Version < 23,
hostType: hostType,
client: client,
conn: conn,
inUseTime: now,
regTime: now,
}
reversePool.addBuildlet(b)
registerBuildlet(modes) // testing only
}
var registerBuildlet = func(modes []string) {} // test hook
type byTypeThenHostname []*reverseBuildlet
func (s byTypeThenHostname) Len() int { return len(s) }
func (s byTypeThenHostname) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s byTypeThenHostname) Less(i, j int) bool {
bi, bj := s[i], s[j]
ti, tj := bi.hostType, bj.hostType
if ti == tj {
return bi.hostname < bj.hostname
}
return ti < tj
}
// mapBuilderToHostType maps from the user's Request.Header["X-Go-Builder-Type"]
// mode list down into a single host type, or the empty string if unknown.
func mapBuilderToHostType(modes []string) string {
// First, see if any of the provided modes are a host type.
// If so, this is an updated client.
for _, v := range modes {
if _, ok := dashboard.Hosts[v]; ok {
return v
}
}
// Else, it's an old client, still speaking in terms of
// builder names. See if any are registered aliases. First
// one wins. (There are no ambiguities in the wild.)
for hostType, hconf := range dashboard.Hosts {
for _, alias := range hconf.ReverseAliases {
for _, v := range modes {
if v == alias {
return hostType
}
}
}
}
return ""
}