blob: cb6e546829a2503a07d4c492c5bc1ecf876c0a24 [file] [log] [blame]
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
//go:build linux || darwin
package pool
import (
"context"
"errors"
"fmt"
"html"
"io"
"log"
"sync"
"time"
"golang.org/x/build/buildenv"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/internal"
"golang.org/x/build/internal/cloud"
"golang.org/x/build/internal/coordinator/pool/queue"
"golang.org/x/build/internal/spanlog"
)
var _ Buildlet = (*EC2Buildlet)(nil)
// ec2Buildlet is the package level buildlet pool.
//
// TODO(golang.org/issues/38337) remove once a package level variable is no longer
// required by the main package.
var ec2Buildlet *EC2Buildlet
// EC2BuildetPool retrieves the package level EC2Buildlet pool set by the constructor.
//
// TODO(golang.org/issues/38337) remove once a package level variable is no longer
// required by the main package.
func EC2BuildetPool() *EC2Buildlet {
return ec2Buildlet
}
func init() {
// initializes a basic package level ec2Buildlet pool to enable basic testing in other
// packages.
//
// TODO(golang.org/issues/38337) remove once a package level variable is no longer
// required by the main package.
ec2Buildlet = &EC2Buildlet{
ledger: newLedger(),
}
}
// awsClient represents the aws client used to interact with AWS. This is a partial
// implementation of pool.AWSClient.
type awsClient interface {
DestroyInstances(ctx context.Context, instIDs ...string) error
Quota(ctx context.Context, service, code string) (int64, error)
InstanceTypesARM(ctx context.Context) ([]*cloud.InstanceType, error)
RunningInstances(ctx context.Context) ([]*cloud.Instance, error)
}
// EC2Opt is optional configuration for the buildlet.
type EC2Opt func(*EC2Buildlet)
// EC2Buildlet manages a pool of AWS EC2 buildlets.
type EC2Buildlet struct {
// awsClient is the client used to interact with AWS services.
awsClient awsClient
// buildEnv contains the build environment settings.
buildEnv *buildenv.Environment
// buildletClient is the client used to create a buildlet.
buildletClient ec2BuildletClient
// hosts provides the host configuration for all hosts. It is passed in to facilitate
// testing.
hosts map[string]*dashboard.HostConfig
// isRemoteBuildletFunc informs the caller is a VM instance is being used as a remote
// buildlet.
//
// TODO(golang.org/issues/38337) remove once we find a way to pass in remote buildlet
// information at the get buidlet request.
isRemoteBuildlet IsRemoteBuildletFunc
// ledger tracks instances and their resource allocations.
ledger *ledger
// cancelPoll will signal to the pollers to discontinue polling.
cancelPoll context.CancelFunc
// pollWait waits for all pollers to terminate polling.
pollWait sync.WaitGroup
}
// ec2BuildletClient represents an EC2 buildlet client in the buildlet package.
type ec2BuildletClient interface {
StartNewVM(ctx context.Context, buildEnv *buildenv.Environment, hconf *dashboard.HostConfig, vmName, hostType string, opts *buildlet.VMOpts) (buildlet.Client, error)
}
// NewEC2Buildlet creates a new EC2 buildlet pool used to create and manage the lifecycle of
// EC2 buildlets. Information about ARM64 instance types is retrieved before starting the pool.
// EC2 quota types are also retrieved before starting the pool. The pool will continuously poll
// for quotas which limit the resources that can be consumed by the pool. It will also periodically
// search for VMs which are no longer in use or are untracked by the pool in order to delete them.
func NewEC2Buildlet(client *cloud.AWSClient, buildEnv *buildenv.Environment, hosts map[string]*dashboard.HostConfig, fn IsRemoteBuildletFunc, opts ...EC2Opt) (*EC2Buildlet, error) {
if fn == nil {
return nil, errors.New("remote buildlet check function is not set")
}
ctx, cancel := context.WithCancel(context.Background())
b := &EC2Buildlet{
awsClient: client,
buildEnv: buildEnv,
buildletClient: buildlet.NewEC2Client(client),
cancelPoll: cancel,
hosts: hosts,
isRemoteBuildlet: fn,
ledger: newLedger(),
}
for _, opt := range opts {
opt(b)
}
if err := b.retrieveAndSetQuota(ctx); err != nil {
return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
}
if err := b.retrieveAndSetInstanceTypes(); err != nil {
return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
}
b.pollWait.Add(1)
// polls for the EC2 quota data and sets the quota data in
// the ledger. When the context has been cancelled, the polling will stop.
go func() {
go internal.PeriodicallyDo(ctx, time.Hour, func(ctx context.Context, _ time.Time) {
log.Printf("retrieveing EC2 quota")
_ = b.retrieveAndSetQuota(ctx)
})
b.pollWait.Done()
}()
b.pollWait.Add(1)
// poll queries for VMs which are not tracked in the ledger and
// deletes them. When the context has been cancelled, the polling will stop.
go func() {
go internal.PeriodicallyDo(ctx, 2*time.Minute, func(ctx context.Context, _ time.Time) {
log.Printf("cleaning up unused EC2 instances")
b.destroyUntrackedInstances(ctx)
})
b.pollWait.Done()
}()
// TODO(golang.org/issues/38337) remove once a package level variable is no longer
// required by the main package.
ec2Buildlet = b
return b, nil
}
// GetBuildlet retrieves a buildlet client for a newly created buildlet.
func (eb *EC2Buildlet) GetBuildlet(ctx context.Context, hostType string, lg Logger, si *queue.SchedItem) (buildlet.Client, error) {
hconf, ok := eb.hosts[hostType]
if !ok {
return nil, fmt.Errorf("ec2 pool: unknown host type %q", hostType)
}
instName := instanceName(hostType, 7)
log.Printf("Creating EC2 VM %q for %s", instName, hostType)
kp, err := buildlet.NewKeyPair()
if err != nil {
log.Printf("failed to create TLS key pair for %s: %s", hostType, err)
return nil, fmt.Errorf("failed to create TLS key pair: %w", err)
}
qsp := lg.CreateSpan("awaiting_ec2_quota")
err = eb.ledger.ReserveResources(ctx, instName, hconf.MachineType(), si)
qsp.Done(err)
if err != nil {
return nil, err
}
ec2BuildletSpan := lg.CreateSpan("create_ec2_buildlet", instName)
defer func() { ec2BuildletSpan.Done(err) }()
var (
createSpan = lg.CreateSpan("create_ec2_instance", instName)
waitBuildlet spanlog.Span
curSpan = createSpan
instanceCreated bool
)
bc, err := eb.buildletClient.StartNewVM(ctx, eb.buildEnv, hconf, instName, hostType, &buildlet.VMOpts{
Zone: "", // allow the EC2 api pick an availability zone with capacity
TLS: kp,
Meta: make(map[string]string),
DeleteIn: determineDeleteTimeout(hconf),
OnInstanceRequested: func() {
log.Printf("EC2 VM %q now booting", instName)
},
OnInstanceCreated: func() {
log.Printf("EC2 VM %q now running", instName)
createSpan.Done(nil)
instanceCreated = true
waitBuildlet = lg.CreateSpan("wait_buildlet_start", instName)
curSpan = waitBuildlet
},
OnGotEC2InstanceInfo: func(inst *cloud.Instance) {
lg.LogEventTime("got_instance_info", "waiting_for_buildlet...")
eb.ledger.UpdateReservation(instName, inst.ID)
},
})
if err != nil {
curSpan.Done(err)
log.Printf("EC2 VM creation failed for %s: %v", hostType, err)
if instanceCreated {
log.Printf("EC2 VM %q failed initialize buildlet client. deleting...", instName)
eb.buildletDone(instName)
} else {
eb.ledger.Remove(instName)
}
return nil, err
}
waitBuildlet.Done(nil)
bc.SetDescription(fmt.Sprintf("EC2 VM: %s", instName))
bc.SetOnHeartbeatFailure(func() {
log.Printf("EC2 VM %q failed heartbeat", instName)
eb.buildletDone(instName)
})
bc.SetInstanceName(instName)
return bc, nil
}
func (eb *EC2Buildlet) QuotaStats() map[string]*queue.QuotaStats {
return map[string]*queue.QuotaStats{
"ec2-cpu": eb.ledger.cpuQueue.ToExported(),
}
}
// String gives a report of capacity usage for the EC2 buildlet pool.
func (eb *EC2Buildlet) String() string {
return fmt.Sprintf("EC2 pool capacity: %s", eb.capacityString())
}
// capacityString() gives a report of capacity usage.
func (eb *EC2Buildlet) capacityString() string {
r := eb.ledger.Resources()
return fmt.Sprintf("%d instances; %d/%d CPUs", r.InstCount, r.CPUUsed, r.CPULimit)
}
// WriteHTMLStatus writes the status of the EC2 buildlet pool to an io.Writer.
func (eb *EC2Buildlet) WriteHTMLStatus(w io.Writer) {
fmt.Fprintf(w, "<b>EC2 pool</b> capacity: %s", eb.capacityString())
active := eb.ledger.ResourceTime()
if len(active) > 0 {
fmt.Fprintf(w, "<ul>")
for _, inst := range active {
fmt.Fprintf(w, "<li>%v, %s</li>\n", html.EscapeString(inst.Name), friendlyDuration(time.Since(inst.Creation)))
}
fmt.Fprintf(w, "</ul>")
}
}
// buildletDone issues a call to destroy the EC2 instance and removes
// the instance from the ledger. Removing the instance from the ledger
// also releases any resources allocated to that instance. If an instance
// is not found in the ledger or on EC2 then an error is logged. All
// untracked instances will be cleaned up by the polling cleanupUnusedVMs
// method.
func (eb *EC2Buildlet) buildletDone(instName string) {
vmID := eb.ledger.InstanceID(instName)
if vmID == "" {
log.Printf("EC2 vm %s not found", instName)
return
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := eb.awsClient.DestroyInstances(ctx, vmID); err != nil {
log.Printf("EC2 VM %s deletion failed: %s", instName, err)
}
eb.ledger.Remove(instName)
}
// Close stops the pollers used by the EC2Buildlet pool from running.
func (eb *EC2Buildlet) Close() {
eb.cancelPoll()
eb.pollWait.Wait()
}
// retrieveAndSetQuota queries EC2 for account relevant quotas and sets the quota in the ledger.
func (eb *EC2Buildlet) retrieveAndSetQuota(ctx context.Context) error {
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
cpuQuota, err := eb.awsClient.Quota(ctx, cloud.QuotaServiceEC2, cloud.QuotaCodeCPUOnDemand)
if err != nil {
log.Printf("unable to query for EC2 cpu quota: %s", err)
return err
}
eb.ledger.SetCPULimit(cpuQuota)
return nil
}
// retrieveAndSetInstanceTypes retrieves the ARM64 instance types from the EC2
// service and sets them in the ledger.
func (eb *EC2Buildlet) retrieveAndSetInstanceTypes() error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
its, err := eb.awsClient.InstanceTypesARM(ctx)
if err != nil {
return fmt.Errorf("unable to retrieve EC2 instance types: %w", err)
}
eb.ledger.UpdateInstanceTypes(its)
log.Printf("ec2 buildlet pool instance types updated")
return nil
}
// destroyUntrackedInstances searches for VMs which exist but are not being tracked in the
// ledger and deletes them.
func (eb *EC2Buildlet) destroyUntrackedInstances(ctx context.Context) {
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
insts, err := eb.awsClient.RunningInstances(ctx)
if err != nil {
log.Printf("failed to query for instances: %s", err)
return
}
deleteInsts := make([]string, 0, len(insts))
for _, inst := range insts {
if !isBuildlet(inst.Name) {
// Non-buildlets have not been created by the EC2 buildlet pool. Their lifecycle
// should not be managed by the pool.
log.Printf("destroyUntrackedInstances: skipping non-buildlet %q", inst.Name)
continue
}
if eb.isRemoteBuildlet(inst.Name) {
// Remote buildlets have their own expiration mechanism that respects active SSH sessions.
log.Printf("destroyUntrackedInstances: skipping remote buildlet %q", inst.Name)
continue
}
if id := eb.ledger.InstanceID(inst.Name); id != "" {
continue
}
deleteInsts = append(deleteInsts, inst.ID)
log.Printf("queued for deleting untracked EC2 VM %q with id %q", inst.Name, inst.ID)
}
if len(deleteInsts) == 0 {
return
}
if err := eb.awsClient.DestroyInstances(ctx, deleteInsts...); err != nil {
log.Printf("failed cleaning EC2 VMs: %s", err)
}
}