blob: e4e335736237de8c40c6382058538c040de6ae61 [file] [log] [blame]
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build go1.13 && (linux || darwin)
// +build go1.13
// +build linux darwin
// Code interacting with Google Compute Engine (GCE) and
// a GCE implementation of the BuildletPool interface.
package pool
import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"log"
"net/http"
"os"
"path"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"cloud.google.com/go/compute/metadata"
"cloud.google.com/go/datastore"
"cloud.google.com/go/errorreporting"
"cloud.google.com/go/storage"
"golang.org/x/build/buildenv"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/gerrit"
"golang.org/x/build/internal/buildgo"
"golang.org/x/build/internal/buildstats"
"golang.org/x/build/internal/lru"
"golang.org/x/build/internal/secret"
"golang.org/x/build/internal/spanlog"
"golang.org/x/oauth2"
"golang.org/x/oauth2/google"
"google.golang.org/api/compute/v1"
"google.golang.org/api/googleapi"
)
func init() {
buildlet.GCEGate = gceAPIGate
}
// apiCallTicker ticks regularly, preventing us from accidentally making
// GCE API calls too quickly. Our quota is 20 QPS, but we temporarily
// limit ourselves to less than that.
var apiCallTicker = time.NewTicker(time.Second / 10)
func gceAPIGate() {
<-apiCallTicker.C
}
// Initialized by InitGCE:
//
// TODO(golang.org/issue/38337): These should be moved into a struct as
// part of the effort to reduce package level variables.
var (
buildEnv *buildenv.Environment
// dsClient is a datastore client for the build project (symbolic-datum-552), where build progress is stored.
dsClient *datastore.Client
// goDSClient is a datastore client for golang-org, where build status is stored.
goDSClient *datastore.Client
// oAuthHTTPClient is the OAuth2 HTTP client used to make API calls to Google Cloud APIs.
oAuthHTTPClient *http.Client
computeService *compute.Service
gcpCreds *google.Credentials
errTryDeps error // non-nil if try bots are disabled
gerritClient *gerrit.Client
storageClient *storage.Client
inStaging bool // are we running in the staging project? (named -dev)
errorsClient *errorreporting.Client // Stackdriver errors client
gkeNodeHostname string
// values created due to seperating the buildlet pools into a seperate package
gceMode string
deleteTimeout time.Duration
testFiles map[string]string
basePinErr *atomic.Value
isGCERemoteBuildlet IsRemoteBuildletFunc
)
// InitGCE initializes the GCE buildlet pool.
func InitGCE(sc *secret.Client, vmDeleteTimeout time.Duration, tFiles map[string]string, basePin *atomic.Value, fn IsRemoteBuildletFunc, buildEnvName, mode string) error {
gceMode = mode
deleteTimeout = vmDeleteTimeout
testFiles = tFiles
basePinErr = basePin
isGCERemoteBuildlet = fn
ctx := context.Background()
var err error
// If the coordinator is running on a GCE instance and a
// buildEnv was not specified with the env flag, set the
// buildEnvName to the project ID
if buildEnvName == "" {
if mode == "dev" {
buildEnvName = "dev"
} else if metadata.OnGCE() {
buildEnvName, err = metadata.ProjectID()
if err != nil {
log.Fatalf("metadata.ProjectID: %v", err)
}
}
}
buildEnv = buildenv.ByProjectID(buildEnvName)
inStaging = buildEnv == buildenv.Staging
// If running on GCE, override the zone and static IP, and check service account permissions.
if metadata.OnGCE() {
projectZone, err := metadata.Get("instance/zone")
if err != nil || projectZone == "" {
return fmt.Errorf("failed to get current GCE zone: %v", err)
}
gkeNodeHostname, err = metadata.Get("instance/hostname")
if err != nil {
return fmt.Errorf("failed to get current instance hostname: %v", err)
}
// Convert the zone from "projects/1234/zones/us-central1-a" to "us-central1-a".
projectZone = path.Base(projectZone)
buildEnv.ControlZone = projectZone
if buildEnv.StaticIP == "" {
buildEnv.StaticIP, err = metadata.ExternalIP()
if err != nil {
return fmt.Errorf("ExternalIP: %v", err)
}
}
if !hasComputeScope() {
return errors.New("coordinator is not running with access to read and write Compute resources. VM support disabled")
}
}
cfgDump, _ := json.MarshalIndent(buildEnv, "", " ")
log.Printf("Loaded configuration %q for project %q:\n%s", buildEnvName, buildEnv.ProjectName, cfgDump)
if mode != "dev" {
storageClient, err = storage.NewClient(ctx)
if err != nil {
log.Fatalf("storage.NewClient: %v", err)
}
}
dsClient, err = datastore.NewClient(ctx, buildEnv.ProjectName)
if err != nil {
if mode == "dev" {
log.Printf("Error creating datastore client for %q: %v", buildEnv.ProjectName, err)
} else {
log.Fatalf("Error creating datastore client for %q: %v", buildEnv.ProjectName, err)
}
}
goDSClient, err = datastore.NewClient(ctx, buildEnv.GoProjectName)
if err != nil {
if mode == "dev" {
log.Printf("Error creating datastore client for %q: %v", buildEnv.GoProjectName, err)
} else {
log.Fatalf("Error creating datastore client for %q: %v", buildEnv.GoProjectName, err)
}
}
// don't send dev errors to Stackdriver.
if mode != "dev" {
errorsClient, err = errorreporting.NewClient(ctx, buildEnv.ProjectName, errorreporting.Config{
ServiceName: "coordinator",
})
if err != nil {
// don't exit, we still want to run coordinator
log.Printf("Error creating errors client: %v", err)
}
}
gcpCreds, err = buildEnv.Credentials(ctx)
if err != nil {
if mode == "dev" {
// don't try to do anything else with GCE, as it will likely fail
return nil
}
log.Fatalf("failed to get a token source: %v", err)
}
oAuthHTTPClient = oauth2.NewClient(ctx, gcpCreds.TokenSource)
computeService, _ = compute.New(oAuthHTTPClient)
errTryDeps = checkTryBuildDeps(ctx, sc)
if errTryDeps != nil {
log.Printf("TryBot builders disabled due to error: %v", errTryDeps)
} else {
log.Printf("TryBot builders enabled.")
}
if mode != "dev" && metadata.OnGCE() && (buildEnv == buildenv.Production || buildEnv == buildenv.Staging) {
go syncBuildStatsLoop(buildEnv)
go gcePool.pollQuotaLoop()
go createBasepinDisks(ctx)
}
return nil
}
// TODO(golang.org/issue/38337): These should be moved into a struct as
// part of the effort to reduce package level variables.
// GCEConfiguration manages and contains all of the GCE configuration.
type GCEConfiguration struct{}
// NewGCEConfiguration creates a new GCEConfiguration.
func NewGCEConfiguration() *GCEConfiguration { return &GCEConfiguration{} }
// StorageClient retrieves the GCE storage client.
func (c *GCEConfiguration) StorageClient() *storage.Client {
return storageClient
}
// BuildEnv retrieves the GCE build env.
func (c *GCEConfiguration) BuildEnv() *buildenv.Environment {
return buildEnv
}
// SetBuildEnv sets the GCE build env. This is primarily reserved for
// testing purposes.
func (c *GCEConfiguration) SetBuildEnv(b *buildenv.Environment) {
buildEnv = b
}
// BuildletPool retrieves the GCE buildlet pool.
func (c *GCEConfiguration) BuildletPool() *GCEBuildlet {
return gcePool
}
// InStaging returns a boolean denoting if the enviornment is stageing.
func (c *GCEConfiguration) InStaging() bool {
return inStaging
}
// GerritClient retrieves a gerrit client.
func (c *GCEConfiguration) GerritClient() *gerrit.Client {
return gerritClient
}
// GKENodeHostname retrieves the GKE node hostname.
func (c *GCEConfiguration) GKENodeHostname() string {
return gkeNodeHostname
}
// DSClient retrieves the datastore client.
func (c *GCEConfiguration) DSClient() *datastore.Client {
return dsClient
}
// GoDSClient retrieves the datastore client for golang.org project.
func (c *GCEConfiguration) GoDSClient() *datastore.Client {
return goDSClient
}
// TryDepsErr retrives any Trybot dependency error.
func (c *GCEConfiguration) TryDepsErr() error {
return errTryDeps
}
// ErrorsClient retrieves the stackdriver errors client.
func (c *GCEConfiguration) ErrorsClient() *errorreporting.Client {
return errorsClient
}
// OAuthHTTPClient retrieves an OAuth2 HTTP client used to make API calls to GCP.
func (c *GCEConfiguration) OAuthHTTPClient() *http.Client {
return oAuthHTTPClient
}
// GCPCredentials retrieves the GCP credentials.
func (c *GCEConfiguration) GCPCredentials() *google.Credentials {
return gcpCreds
}
func checkTryBuildDeps(ctx context.Context, sc *secret.Client) error {
if !hasStorageScope() {
return errors.New("coordinator's GCE instance lacks the storage service scope")
}
if gceMode == "dev" {
return errors.New("running in dev mode")
}
wr := storageClient.Bucket(buildEnv.LogBucket).Object("hello.txt").NewWriter(context.Background())
fmt.Fprintf(wr, "Hello, world! Coordinator start-up at %v", time.Now())
if err := wr.Close(); err != nil {
return fmt.Errorf("test write of a GCS object to bucket %q failed: %v", buildEnv.LogBucket, err)
}
if inStaging {
// Don't expect to write to Gerrit in staging mode.
gerritClient = gerrit.NewClient("https://go-review.googlesource.com", gerrit.NoAuth)
} else {
ctxSec, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
gobotPass, err := sc.Retrieve(ctxSec, secret.NameGobotPassword)
if err != nil {
return fmt.Errorf("failed to get project metadata 'gobot-password': %v", err)
}
gerritClient = gerrit.NewClient("https://go-review.googlesource.com",
gerrit.BasicAuth("git-gobot.golang.org", strings.TrimSpace(string(gobotPass))))
}
return nil
}
var gcePool = &GCEBuildlet{}
var _ Buildlet = (*GCEBuildlet)(nil)
// maxInstances is a temporary hack because we can't get buildlets to boot
// without IPs, and we only have 200 IP addresses.
// TODO(bradfitz): remove this once fixed.
const maxInstances = 190
// GCEBuildlet manages a pool of GCE buildlets.
type GCEBuildlet struct {
mu sync.Mutex // guards all following
disabled bool
// CPU quota usage & limits.
cpuLeft int // dead-reckoning CPUs remain
instLeft int // dead-reckoning instances remain
instUsage int
cpuUsage int
addrUsage int
inst map[string]time.Time // GCE VM instance name -> creationTime
}
func (p *GCEBuildlet) pollQuotaLoop() {
for {
p.pollQuota()
time.Sleep(5 * time.Second)
}
}
func (p *GCEBuildlet) pollQuota() {
gceAPIGate()
reg, err := computeService.Regions.Get(buildEnv.ProjectName, buildEnv.Region()).Do()
if err != nil {
log.Printf("Failed to get quota for %s/%s: %v", buildEnv.ProjectName, buildEnv.Region(), err)
return
}
p.mu.Lock()
defer p.mu.Unlock()
for _, quota := range reg.Quotas {
switch quota.Metric {
case "CPUS":
p.cpuLeft = int(quota.Limit) - int(quota.Usage)
p.cpuUsage = int(quota.Usage)
case "INSTANCES":
p.instLeft = int(quota.Limit) - int(quota.Usage)
p.instUsage = int(quota.Usage)
case "IN_USE_ADDRESSES":
p.addrUsage = int(quota.Usage)
}
}
}
// SetEnabled marks the buildlet pool as enabled.
func (p *GCEBuildlet) SetEnabled(enabled bool) {
p.mu.Lock()
defer p.mu.Unlock()
p.disabled = !enabled
}
// GetBuildlet retrieves a buildlet client for an available buildlet.
func (p *GCEBuildlet) GetBuildlet(ctx context.Context, hostType string, lg Logger) (bc *buildlet.Client, err error) {
hconf, ok := dashboard.Hosts[hostType]
if !ok {
return nil, fmt.Errorf("gcepool: unknown host type %q", hostType)
}
qsp := lg.CreateSpan("awaiting_gce_quota")
err = p.awaitVMCountQuota(ctx, hconf.GCENumCPU())
qsp.Done(err)
if err != nil {
return nil, err
}
deleteIn := deleteTimeoutFromContextOrValue(ctx, deleteTimeout)
instName := instanceName(hostType, 7)
instName = strings.Replace(instName, "_", "-", -1) // Issue 22905; can't use underscores in GCE VMs
p.setInstanceUsed(instName, true)
gceBuildletSpan := lg.CreateSpan("create_gce_buildlet", instName)
defer func() { gceBuildletSpan.Done(err) }()
var (
needDelete bool
createSpan = lg.CreateSpan("create_gce_instance", instName)
waitBuildlet spanlog.Span // made after create is done
curSpan = createSpan // either instSpan or waitBuildlet
)
zone := buildEnv.RandomVMZone()
log.Printf("Creating GCE VM %q for %s at %s", instName, hostType, zone)
bc, err = buildlet.StartNewVM(gcpCreds, buildEnv, instName, hostType, buildlet.VMOpts{
DeleteIn: deleteIn,
OnInstanceRequested: func() {
log.Printf("GCE VM %q now booting", instName)
},
OnInstanceCreated: func() {
needDelete = true
createSpan.Done(nil)
waitBuildlet = lg.CreateSpan("wait_buildlet_start", instName)
curSpan = waitBuildlet
},
OnGotInstanceInfo: func(*compute.Instance) {
lg.LogEventTime("got_instance_info", "waiting_for_buildlet...")
},
Zone: zone,
})
if err != nil {
curSpan.Done(err)
log.Printf("Failed to create VM for %s at %s: %v", hostType, zone, err)
if needDelete {
deleteVM(zone, instName)
p.putVMCountQuota(hconf.GCENumCPU())
}
p.setInstanceUsed(instName, false)
return nil, err
}
waitBuildlet.Done(nil)
bc.SetDescription("GCE VM: " + instName)
bc.SetGCEInstanceName(instName)
bc.SetOnHeartbeatFailure(func() {
p.putBuildlet(bc, hostType, zone, instName)
})
return bc, nil
}
func (p *GCEBuildlet) putBuildlet(bc *buildlet.Client, hostType, zone, instName string) error {
// TODO(bradfitz): add the buildlet to a freelist (of max N
// items) for up to 10 minutes since when it got started if
// it's never seen a command execution failure, and we can
// wipe all its disk content? (perhaps wipe its disk content
// when it's retrieved, like the reverse buildlet pool) But
// this will require re-introducing a distinction in the
// buildlet client library between Close, Destroy/Halt, and
// tracking execution errors. That was all half-baked before
// and thus removed. Now Close always destroys everything.
deleteVM(zone, instName)
p.setInstanceUsed(instName, false)
hconf, ok := dashboard.Hosts[hostType]
if !ok {
panic("failed to lookup conf") // should've worked if we did it before
}
p.putVMCountQuota(hconf.GCENumCPU())
return nil
}
// WriteHTMLStatus writes the status of the buildlet pool to an io.Writer.
func (p *GCEBuildlet) WriteHTMLStatus(w io.Writer) {
fmt.Fprintf(w, "<b>GCE pool</b> capacity: %s", p.capacityString())
const show = 6 // must be even
active := p.instancesActive()
if len(active) > 0 {
fmt.Fprintf(w, "<ul>")
for i, inst := range active {
if i < show/2 || i >= len(active)-(show/2) {
fmt.Fprintf(w, "<li>%v, %s</li>\n", inst.Name, friendlyDuration(time.Since(inst.Creation)))
} else if i == show/2 {
fmt.Fprintf(w, "<li>... %d of %d total omitted ...</li>\n", len(active)-show, len(active))
}
}
fmt.Fprintf(w, "</ul>")
}
}
func (p *GCEBuildlet) String() string {
return fmt.Sprintf("GCE pool capacity: %s", p.capacityString())
}
func (p *GCEBuildlet) capacityString() string {
p.mu.Lock()
defer p.mu.Unlock()
return fmt.Sprintf("%d/%d instances; %d/%d CPUs",
len(p.inst), p.instUsage+p.instLeft,
p.cpuUsage, p.cpuUsage+p.cpuLeft)
}
// awaitVMCountQuota waits for numCPU CPUs of quota to become available,
// or returns ctx.Err.
func (p *GCEBuildlet) awaitVMCountQuota(ctx context.Context, numCPU int) error {
// Poll every 2 seconds, which could be better, but works and
// is simple.
for {
if p.tryAllocateQuota(numCPU) {
return nil
}
select {
case <-time.After(2 * time.Second):
case <-ctx.Done():
return ctx.Err()
}
}
}
// haveQuotaLocked reports whether the current GCE quota permits
// starting numCPU more CPUs.
//
// precondition: p.mu must be held.
func (p *GCEBuildlet) haveQuotaLocked(numCPU int) bool {
return p.cpuLeft >= numCPU && p.instLeft >= 1 && len(p.inst) < maxInstances && p.addrUsage < maxInstances
}
func (p *GCEBuildlet) tryAllocateQuota(numCPU int) bool {
p.mu.Lock()
defer p.mu.Unlock()
if p.disabled {
return false
}
if p.haveQuotaLocked(numCPU) {
p.cpuUsage += numCPU
p.cpuLeft -= numCPU
p.instLeft--
p.addrUsage++
return true
}
return false
}
// putVMCountQuota adjusts the dead-reckoning of our quota usage by
// one instance and cpu CPUs.
func (p *GCEBuildlet) putVMCountQuota(cpu int) {
p.mu.Lock()
defer p.mu.Unlock()
p.cpuUsage -= cpu
p.cpuLeft += cpu
p.instLeft++
}
func (p *GCEBuildlet) setInstanceUsed(instName string, used bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.inst == nil {
p.inst = make(map[string]time.Time)
}
if used {
p.inst[instName] = time.Now()
} else {
delete(p.inst, instName)
}
}
func (p *GCEBuildlet) instanceUsed(instName string) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, ok := p.inst[instName]
return ok
}
func (p *GCEBuildlet) instancesActive() (ret []ResourceTime) {
p.mu.Lock()
defer p.mu.Unlock()
for name, create := range p.inst {
ret = append(ret, ResourceTime{
Name: name,
Creation: create,
})
}
sort.Sort(ByCreationTime(ret))
return ret
}
// ResourceTime is a GCE instance or Kube pod name and its creation time.
type ResourceTime struct {
Name string
Creation time.Time
}
// ByCreationTime provides the functionality to sort resource times by
// the time of creation.
type ByCreationTime []ResourceTime
func (s ByCreationTime) Len() int { return len(s) }
func (s ByCreationTime) Less(i, j int) bool { return s[i].Creation.Before(s[j].Creation) }
func (s ByCreationTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// CleanUpOldVMs loops forever and periodically enumerates virtual
// machines and deletes those which have expired.
//
// A VM is considered expired if it has a "delete-at" metadata
// attribute having a unix timestamp before the current time.
//
// This is the safety mechanism to delete VMs which stray from the
// normal deleting process. VMs are created to run a single build and
// should be shut down by a controlling process. Due to various types
// of failures, they might get stranded. To prevent them from getting
// stranded and wasting resources forever, we instead set the
// "delete-at" metadata attribute on them when created to some time
// that's well beyond their expected lifetime.
func (p *GCEBuildlet) CleanUpOldVMs() {
if gceMode == "dev" {
return
}
if computeService == nil {
return
}
// TODO(bradfitz): remove this list and just query it from the compute API?
// https://godoc.org/google.golang.org/api/compute/v1#RegionsService.Get
// and Region.Zones: https://godoc.org/google.golang.org/api/compute/v1#Region
for {
for _, zone := range buildEnv.VMZones {
if err := p.cleanZoneVMs(zone); err != nil {
log.Printf("Error cleaning VMs in zone %q: %v", zone, err)
}
}
time.Sleep(time.Minute)
}
}
// cleanZoneVMs is part of cleanUpOldVMs, operating on a single zone.
func (p *GCEBuildlet) cleanZoneVMs(zone string) error {
// Fetch the first 500 (default) running instances and clean
// those. We expect that we'll be running many fewer than
// that. Even if we have more, eventually the first 500 will
// either end or be cleaned, and then the next call will get a
// partially-different 500.
// TODO(bradfitz): revisit this code if we ever start running
// thousands of VMs.
gceAPIGate()
list, err := computeService.Instances.List(buildEnv.ProjectName, zone).Do()
if err != nil {
return fmt.Errorf("listing instances: %v", err)
}
for _, inst := range list.Items {
if inst.Metadata == nil {
// Defensive. Not seen in practice.
continue
}
if isGCERemoteBuildlet(inst.Name) {
// Remote buildlets have their own expiration mechanism that respects active SSH sessions.
log.Printf("cleanZoneVMs: skipping remote buildlet %q", inst.Name)
continue
}
var sawDeleteAt bool
var deleteReason string
for _, it := range inst.Metadata.Items {
if it.Key == "delete-at" {
if it.Value == nil {
log.Printf("missing delete-at value; ignoring")
continue
}
unixDeadline, err := strconv.ParseInt(*it.Value, 10, 64)
if err != nil {
log.Printf("invalid delete-at value %q seen; ignoring", *it.Value)
continue
}
sawDeleteAt = true
if time.Now().Unix() > unixDeadline {
deleteReason = "delete-at expiration"
}
}
}
isBuildlet := isBuildlet(inst.Name)
if isBuildlet && !sawDeleteAt && !p.instanceUsed(inst.Name) {
createdAt, _ := time.Parse(time.RFC3339Nano, inst.CreationTimestamp)
if createdAt.Before(time.Now().Add(-3 * time.Hour)) {
deleteReason = fmt.Sprintf("no delete-at, created at %s", inst.CreationTimestamp)
}
}
// Delete buildlets (things we made) from previous
// generations. Only deleting things starting with "buildlet-"
// is a historical restriction, but still fine for paranoia.
if deleteReason == "" && sawDeleteAt && isBuildlet && !p.instanceUsed(inst.Name) {
if _, ok := deletedVMCache.Get(inst.Name); !ok {
deleteReason = "from earlier coordinator generation"
}
}
if deleteReason != "" {
log.Printf("deleting VM %q in zone %q; %s ...", inst.Name, zone, deleteReason)
deleteVM(zone, inst.Name)
}
}
return nil
}
var deletedVMCache = lru.New(100) // keyed by instName
type token struct{}
// deleteVM starts a delete of an instance in a given zone.
//
// It either returns an operation name (if delete is pending) or the
// empty string if the instance didn't exist.
func deleteVM(zone, instName string) (operation string, err error) {
deletedVMCache.Add(instName, token{})
gceAPIGate()
op, err := computeService.Instances.Delete(buildEnv.ProjectName, zone, instName).Do()
apiErr, ok := err.(*googleapi.Error)
if ok {
if apiErr.Code == 404 {
return "", nil
}
}
if err != nil {
log.Printf("Failed to delete instance %q in zone %q: %v", instName, zone, err)
return "", err
}
log.Printf("Sent request to delete instance %q in zone %q. Operation ID, Name: %v, %v", instName, zone, op.Id, op.Name)
return op.Name, nil
}
// HasScope returns true if the GCE metadata contains the default scopes.
func HasScope(want string) bool {
// If not on GCE, assume full access
if !metadata.OnGCE() {
return true
}
scopes, err := metadata.Scopes("default")
if err != nil {
log.Printf("failed to query metadata default scopes: %v", err)
return false
}
for _, v := range scopes {
if v == want {
return true
}
}
return false
}
func hasComputeScope() bool {
return HasScope(compute.ComputeScope) || HasScope(compute.CloudPlatformScope)
}
func hasStorageScope() bool {
return HasScope(storage.ScopeReadWrite) || HasScope(storage.ScopeFullControl) || HasScope(compute.CloudPlatformScope)
}
// ReadGCSFile reads the named file from the GCS bucket.
func ReadGCSFile(name string) ([]byte, error) {
if gceMode == "dev" {
b, ok := testFiles[name]
if !ok {
return nil, &os.PathError{
Op: "open",
Path: name,
Err: os.ErrNotExist,
}
}
return []byte(b), nil
}
r, err := storageClient.Bucket(buildEnv.BuildletBucket).Object(name).NewReader(context.Background())
if err != nil {
return nil, err
}
defer r.Close()
return ioutil.ReadAll(r)
}
// syncBuildStatsLoop runs forever in its own goroutine and syncs the
// coordinator's datastore Build & Span entities to BigQuery
// periodically.
func syncBuildStatsLoop(env *buildenv.Environment) {
ticker := time.NewTicker(5 * time.Minute)
for {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
if err := buildstats.SyncBuilds(ctx, env); err != nil {
log.Printf("buildstats: SyncBuilds: %v", err)
}
if err := buildstats.SyncSpans(ctx, env); err != nil {
log.Printf("buildstats: SyncSpans: %v", err)
}
cancel()
<-ticker.C
}
}
// createBasepinDisks creates zone-local copies of VM disk images, to
// speed up VM creations in the future.
//
// Other than a list call, this a no-op unless new VM images were
// added or updated recently.
func createBasepinDisks(ctx context.Context) {
for {
t0 := time.Now()
bgc, err := buildgo.NewClient(ctx, buildEnv)
if err != nil {
log.Printf("basepin: NewClient: %v", err)
return
}
log.Printf("basepin: creating basepin disks...")
err = bgc.MakeBasepinDisks(ctx)
d := time.Since(t0).Round(time.Second / 10)
if err != nil {
basePinErr.Store(err.Error())
log.Printf("basepin: error creating basepin disks, after %v: %v", d, err)
time.Sleep(5 * time.Minute)
continue
}
basePinErr.Store("")
log.Printf("basepin: created basepin disks after %v", d)
return
}
}