blob: a913d133d245661e1fb138dd4a50b130940aab76 [file] [log] [blame]
// Copyright 2015 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"crypto/tls"
"crypto/x509"
"encoding/base64"
"errors"
"fmt"
"io"
"log"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/kubernetes"
"golang.org/x/build/kubernetes/api"
"golang.org/x/net/context"
"golang.org/x/oauth2"
monitoring "google.golang.org/api/cloudmonitoring/v2beta2"
container "google.golang.org/api/container/v1"
googleapi "google.golang.org/api/googleapi"
)
/*
This file implements the Kubernetes-based buildlet pool.
*/
// Initialized by initKube:
var (
containerService *container.Service
monService *monitoring.Service
tsService *monitoring.TimeseriesService
metricDescService *monitoring.MetricDescriptorsService
kubeClient *kubernetes.Client
kubeErr error
initKubeCalled bool
registryPrefix = "gcr.io"
kubeCluster *container.Cluster
)
const (
clusterName = "buildlets"
cpuUsedMetric = "custom.cloudmonitoring.googleapis.com/cluster/cpu_used" // % of available CPU in the cluster that is scheduled
memoryUsedMetric = "custom.cloudmonitoring.googleapis.com/cluster/memory_used" // % of available memory in the cluster that is scheduled
serviceLabelKey = "cloud.googleapis.com/service" // allow selection of custom metric based on service name
clusterNameLabelKey = "custom.cloudmonitoring.googleapis.com/cluster_name" // allow selection of custom metric based on cluster name
)
// initGCE must be called before initKube
func initKube() error {
initKubeCalled = true
// projectID was set by initGCE
registryPrefix += "/" + buildEnv.ProjectName
if !hasCloudPlatformScope() {
return errors.New("coordinator not running with access to the Cloud Platform scope.")
}
httpClient := oauth2.NewClient(oauth2.NoContext, tokenSource)
var err error
containerService, err = container.New(httpClient)
if err != nil {
return fmt.Errorf("could not create client for Google Container Engine: %v", err)
}
monService, err = monitoring.New(httpClient)
if err != nil {
return fmt.Errorf("could not create client for Google Cloud Monitoring: %v", err)
}
tsService = monitoring.NewTimeseriesService(monService)
metricDescService = monitoring.NewMetricDescriptorsService(monService)
kubeCluster, err = containerService.Projects.Zones.Clusters.Get(buildEnv.ProjectName, buildEnv.Zone, clusterName).Do()
if err != nil {
return fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, buildEnv.ProjectName, buildEnv.Zone, err)
}
// Decode certs
decode := func(which string, cert string) []byte {
if err != nil {
return nil
}
s, decErr := base64.StdEncoding.DecodeString(cert)
if decErr != nil {
err = fmt.Errorf("error decoding %s cert: %v", which, decErr)
}
return []byte(s)
}
clientCert := decode("client cert", kubeCluster.MasterAuth.ClientCertificate)
clientKey := decode("client key", kubeCluster.MasterAuth.ClientKey)
caCert := decode("cluster cert", kubeCluster.MasterAuth.ClusterCaCertificate)
if err != nil {
return err
}
// HTTPS client
cert, err := tls.X509KeyPair(clientCert, clientKey)
if err != nil {
return fmt.Errorf("x509 client key pair could not be generated: %v", err)
}
// CA Cert from kube master
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM([]byte(caCert))
// Setup TLS config
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}
tlsConfig.BuildNameToCertificate()
kubeHTTPClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
kubeClient, err = kubernetes.NewClient("https://"+kubeCluster.Endpoint, kubeHTTPClient)
if err != nil {
return fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
}
// Create Google Cloud Monitoring metrics
tryCreateMetrics()
go kubePool.pollCapacityLoop()
return nil
}
var kubePool = &kubeBuildletPool{
cpuCapacity: api.NewQuantity(0, api.DecimalSI),
cpuUsage: api.NewQuantity(0, api.DecimalSI),
memoryCapacity: api.NewQuantity(0, api.BinarySI),
memoryUsage: api.NewQuantity(0, api.BinarySI),
}
// kubeBuildletPool is the Kubernetes buildlet pool.
type kubeBuildletPool struct {
mu sync.Mutex // guards all following
pods map[string]time.Time // pod instance name -> creationTime
cpuCapacity *api.Quantity // cpu capacity as reported by the Kubernetes api
memoryCapacity *api.Quantity
cpuUsage *api.Quantity
memoryUsage *api.Quantity
}
func tryCreateMetrics() {
metric := &monitoring.MetricDescriptor{
Description: "Kubernetes Percent CPU Scheduled",
Name: cpuUsedMetric,
Labels: []*monitoring.MetricDescriptorLabelDescriptor{
{Key: clusterNameLabelKey},
{Key: serviceLabelKey},
},
Project: buildEnv.ProjectName,
TypeDescriptor: &monitoring.MetricDescriptorTypeDescriptor{
MetricType: "gauge",
ValueType: "double",
},
}
_, err := metricDescService.Create(buildEnv.ProjectName, metric).Do()
if err != nil {
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == 403 {
log.Printf("Error creating CPU metric: could not authenticate to Google Cloud Monitoring. If you are running the coordinator on a local machine in dev mode, configure service account credentials for authentication as described at https://cloud.google.com/monitoring/api/authentication#service_account_authorization. Error message: %v\n", err)
} else {
log.Fatalf("Failed to create CPU metric for project. Ensure the Google Cloud Monitoring API is enabled for project %v: %v.", buildEnv.ProjectName, err)
}
}
metric = &monitoring.MetricDescriptor{
Description: "Kubernetes Percent Memory Scheduled",
Name: memoryUsedMetric,
Labels: []*monitoring.MetricDescriptorLabelDescriptor{
{Key: clusterNameLabelKey},
{Key: serviceLabelKey},
},
Project: buildEnv.ProjectName,
TypeDescriptor: &monitoring.MetricDescriptorTypeDescriptor{
MetricType: "gauge",
ValueType: "double",
},
}
_, err = metricDescService.Create(buildEnv.ProjectName, metric).Do()
if err != nil {
if gerr, ok := err.(*googleapi.Error); ok && gerr.Code == 403 {
log.Printf("Error creating memory metric: could not authenticate to Google Cloud Monitoring. If you are running the coordinator on a local machine in dev mode, configure service account credentials for authentication as described at https://cloud.google.com/monitoring/api/authentication#service_account_authorization. Error message: %v\n", err)
} else {
log.Fatalf("Failed to create memory metric for project. Ensure the Google Cloud Monitoring API is enabled for project %v: %v.", buildEnv.ProjectName, err)
}
}
}
func (p *kubeBuildletPool) pollCapacityLoop() {
ctx := context.Background()
for {
p.pollCapacity(ctx)
time.Sleep(30 * time.Second)
}
}
func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {
nodes, err := kubeClient.GetNodes(ctx)
if err != nil {
log.Printf("failed to retrieve nodes to calculate cluster capacity for %s/%s: %v", buildEnv.ProjectName, buildEnv.Region(), err)
return
}
pods, err := kubeClient.GetPods(ctx)
if err != nil {
log.Printf("failed to retrieve pods to calculate cluster capacity for %s/%s: %v", buildEnv.ProjectName, buildEnv.Region(), err)
return
}
p.mu.Lock()
defer p.mu.Unlock()
// Calculate the total CPU and memory used by pods in the cluster
var sumCPUUsed = api.NewQuantity(0, api.DecimalSI)
var sumMemoryUsed = api.NewQuantity(0, api.BinarySI)
for _, p := range pods {
for _, c := range p.Spec.Containers {
sumCPUUsed.Add(c.Resources.Requests[api.ResourceCPU])
sumMemoryUsed.Add(c.Resources.Requests[api.ResourceMemory])
}
}
p.cpuUsage = sumCPUUsed
p.memoryUsage = sumMemoryUsed
// Calculate the total CPU and memory capacity of the cluster
var sumCPUCapacity = api.NewQuantity(0, api.DecimalSI)
var sumMemoryCapacity = api.NewQuantity(0, api.BinarySI)
for _, n := range nodes {
sumCPUCapacity.Add(n.Status.Capacity[api.ResourceCPU])
sumMemoryCapacity.Add(n.Status.Capacity[api.ResourceMemory])
}
p.cpuCapacity = sumCPUCapacity
p.memoryCapacity = sumMemoryCapacity
// Calculate the % of CPU and memory consumed
pctCPUConsumed := float64(p.cpuUsage.Value()) / float64(p.cpuCapacity.Value())
pctMemoryConsumed := float64(p.memoryUsage.Value()) / float64(p.memoryCapacity.Value())
t := time.Now().Format(time.RFC3339)
wtr := monitoring.WriteTimeseriesRequest{
Timeseries: []*monitoring.TimeseriesPoint{
{
Point: &monitoring.Point{
DoubleValue: &pctCPUConsumed,
Start: t,
End: t,
},
TimeseriesDesc: &monitoring.TimeseriesDescriptor{
Metric: cpuUsedMetric,
Project: buildEnv.ProjectName,
Labels: map[string]string{
clusterNameLabelKey: clusterName,
serviceLabelKey: "container",
},
},
},
{
Point: &monitoring.Point{
DoubleValue: &pctMemoryConsumed,
Start: t,
End: t,
},
TimeseriesDesc: &monitoring.TimeseriesDescriptor{
Metric: memoryUsedMetric,
Project: buildEnv.ProjectName,
Labels: map[string]string{
clusterNameLabelKey: clusterName,
serviceLabelKey: "container",
},
},
},
},
}
_, err = tsService.Write(buildEnv.ProjectName, &wtr).Do()
if err != nil {
log.Printf("custom cluster utilization metric could not be written to Google Cloud Monitoring: %v", err)
}
}
func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, typ, rev string, el eventTimeLogger) (*buildlet.Client, error) {
conf, ok := dashboard.Builders[typ]
if !ok || conf.KubeImage == "" {
return nil, fmt.Errorf("kubepool: invalid builder type %q", typ)
}
if kubeErr != nil {
return nil, kubeErr
}
if kubeClient == nil {
panic("expect non-nil kubeClient")
}
deleteIn := podDeleteTimeout
if strings.HasPrefix(rev, "user-") {
// Created by gomote (see remote.go), so don't kill it in 45 minutes.
// remote.go handles timeouts itself.
deleteIn = 0
rev = strings.TrimPrefix(rev, "user-")
}
// name is the cluster-wide unique name of the kubernetes pod. Max length
// is not documented, but it's kept <= 61 bytes, in line with GCE
revPrefix := rev
if len(revPrefix) > 8 {
revPrefix = rev[:8]
}
podName := "buildlet-" + typ + "-" + revPrefix + "-rn" + randHex(6)
var needDelete bool
el.logEventTime("creating_kube_pod", podName)
log.Printf("Creating Kubernetes pod %q for %s at %s", podName, typ, rev)
bc, err := buildlet.StartPod(ctx, kubeClient, podName, typ, buildlet.PodOpts{
ProjectID: buildEnv.ProjectName,
ImageRegistry: registryPrefix,
Description: fmt.Sprintf("Go Builder for %s at %s", typ, rev),
DeleteIn: deleteIn,
OnPodCreated: func() {
el.logEventTime("pod_created")
p.setPodUsed(podName, true)
needDelete = true
},
OnGotPodInfo: func() {
el.logEventTime("got_pod_info", "waiting_for_buildlet...")
},
})
if err != nil {
el.logEventTime("kube_buildlet_create_failure", fmt.Sprintf("%s: %v", podName, err))
if needDelete {
log.Printf("Deleting failed pod %q", podName)
kubeClient.DeletePod(ctx, podName)
p.setPodUsed(podName, false)
}
return nil, err
}
bc.SetDescription("Kube Pod: " + podName)
// The build's context will be canceled when the build completes (successfully
// or not), or if the buildlet becomes unavailable. In any case, delete the pod
// running the buildlet.
go func() {
<-ctx.Done()
log.Printf("Deleting pod %q after build context cancel received ", podName)
// Giving DeletePod a new context here as the build ctx has been canceled
kubeClient.DeletePod(context.Background(), podName)
p.setPodUsed(podName, false)
}()
return bc, nil
}
func (p *kubeBuildletPool) WriteHTMLStatus(w io.Writer) {
fmt.Fprintf(w, "<b>Kubernetes pool</b> capacity: %s", p.capacityString())
const show = 6 // must be even
active := p.podsActive()
if len(active) > 0 {
fmt.Fprintf(w, "<ul>")
for i, pod := range active {
if i < show/2 || i >= len(active)-(show/2) {
fmt.Fprintf(w, "<li>%v, %v</li>\n", pod.name, time.Since(pod.creation))
} else if i == show/2 {
fmt.Fprintf(w, "<li>... %d of %d total omitted ...</li>\n", len(active)-show, len(active))
}
}
fmt.Fprintf(w, "</ul>")
}
}
func (p *kubeBuildletPool) capacityString() string {
p.mu.Lock()
defer p.mu.Unlock()
return fmt.Sprintf("%v/%v CPUs; %v/%v Memory",
p.cpuUsage, p.cpuCapacity,
p.memoryUsage, p.memoryCapacity)
}
func (p *kubeBuildletPool) setPodUsed(podName string, used bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.pods == nil {
p.pods = make(map[string]time.Time)
}
if used {
p.pods[podName] = time.Now()
// Track cpu and memory usage
p.cpuUsage.Add(buildlet.BuildletCPU)
p.memoryUsage.Add(buildlet.BuildletMemory)
} else {
delete(p.pods, podName)
// Track cpu and memory usage
p.cpuUsage.Sub(buildlet.BuildletCPU)
p.memoryUsage.Sub(buildlet.BuildletMemory)
}
}
func (p *kubeBuildletPool) podUsed(podName string) bool {
p.mu.Lock()
defer p.mu.Unlock()
_, ok := p.pods[podName]
return ok
}
func (p *kubeBuildletPool) podsActive() (ret []resourceTime) {
p.mu.Lock()
defer p.mu.Unlock()
for name, create := range p.pods {
ret = append(ret, resourceTime{
name: name,
creation: create,
})
}
sort.Sort(byCreationTime(ret))
return ret
}
func (p *kubeBuildletPool) String() string {
p.mu.Lock()
inUse := 0
total := 0
// ...
p.mu.Unlock()
return fmt.Sprintf("Kubernetes pool capacity: %d/%d", inUse, total)
}
// cleanUpOldPods loops forever and periodically enumerates pods
// and deletes those which have expired.
//
// A Pod is considered expired if it has a "delete-at" metadata
// attribute having a unix timestamp before the current time.
//
// This is the safety mechanism to delete pods which stray from the
// normal deleting process. Pods are created to run a single build and
// should be shut down by a controlling process. Due to various types
// of failures, they might get stranded. To prevent them from getting
// stranded and wasting resources forever, we instead set the
// "delete-at" metadata attribute on them when created to some time
// that's well beyond their expected lifetime.
func (p *kubeBuildletPool) cleanUpOldPods(ctx context.Context) {
if containerService == nil {
return
}
for {
pods, err := kubeClient.GetPods(ctx)
if err != nil {
log.Printf("Error cleaning pods: %v", err)
return
}
for _, pod := range pods {
if pod.ObjectMeta.Annotations == nil {
// Defensive. Not seen in practice.
continue
}
sawDeleteAt := false
for k, v := range pod.ObjectMeta.Annotations {
if k == "delete-at" {
sawDeleteAt = true
if v == "" {
log.Printf("missing delete-at value; ignoring")
continue
}
unixDeadline, err := strconv.ParseInt(v, 10, 64)
if err != nil {
log.Printf("invalid delete-at value %q seen; ignoring", v)
}
if err == nil && time.Now().Unix() > unixDeadline {
log.Printf("Deleting expired pod %q in zone %q ...", pod.Name)
err = kubeClient.DeletePod(ctx, pod.Name)
if err != nil {
log.Printf("problem deleting pod: %v", err)
}
}
}
}
// Delete buildlets (things we made) from previous
// generations. Only deleting things starting with "buildlet-"
// is a historical restriction, but still fine for paranoia.
if sawDeleteAt && strings.HasPrefix(pod.Name, "buildlet-") && !p.podUsed(pod.Name) {
log.Printf("Deleting pod %q from an earlier coordinator generation ...", pod.Name)
err = kubeClient.DeletePod(ctx, pod.Name)
if err != nil {
log.Printf("problem deleting pod: %v", err)
}
}
}
time.Sleep(time.Minute)
}
}
func hasCloudPlatformScope() bool {
return hasScope(container.CloudPlatformScope)
}