cmd/coordinator: discover kube cluster

Find the default Kubernetes cluster and configure a client to talk to it.
Use application default credentials.

Updates golang/go#12546

Change-Id: Ifb1ce57f52f4fbbee3267f8cc3cf02a78146bd5b
Reviewed-by: Brad Fitzpatrick <>
diff --git a/buildlet/kube.go b/buildlet/kube.go
new file mode 100644
index 0000000..f29bd41
--- /dev/null
+++ b/buildlet/kube.go
@@ -0,0 +1,104 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+package buildlet
+import (
+	"errors"
+	"fmt"
+	"time"
+	""
+	""
+	""
+// PodOpts control how new pods are started.
+type PodOpts struct {
+	// ImageRegistry specifies the Docker registry Kubernetes
+	// will use to create the pod. Required.
+	ImageRegistry string
+	// TLS optionally specifies the TLS keypair to use.
+	// If zero, http without auth is used.
+	TLS KeyPair
+	// Description optionally describes the pod.
+	Description string
+	// Labels optionally specify key=value strings that Kubernetes
+	// can use to filter and group pods.
+	Labels map[string]string
+	// DeleteIn optionally specifies a duration at which
+	// to delete the pod.
+	DeleteIn time.Duration
+	// OnInstanceRequested optionally specifies a hook to run synchronously
+	// after the pod create call, but before
+	// waiting for its operation to proceed.
+	OnPodRequested func()
+	// OnPodCreated optionally specifies a hook to run synchronously
+	// after the pod operation succeeds.
+	OnPodCreated func()
+	// OnPodCreated optionally specifies a hook to run synchronously
+	// after the pod Get call.
+	OnGotPodInfo func()
+// StartPod creates a new pod on a Kubernetes cluster and returns a buildlet client
+// configured to speak to it.
+func StartPod(kubeClient *kubernetes.Client, podName, builderType string, opts PodOpts) (*Client, error) {
+	conf, ok := dashboard.Builders[builderType]
+	if !ok || conf.KubeImage == "" {
+		return nil, fmt.Errorf("invalid builder type %q", builderType)
+	}
+	p := &api.Pod{
+		TypeMeta: api.TypeMeta{
+			APIVersion: "v1",
+			Kind:       "Pod",
+		},
+		ObjectMeta: api.ObjectMeta{
+			Name: podName,
+			Labels: map[string]string{
+				"name": podName,
+				"type": builderType,
+				"role": "buildlet",
+			},
+		},
+		Spec: api.PodSpec{
+			RestartPolicy: "Never",
+			Containers: []api.Container{
+				{
+					Name:            "buildlet",
+					Image:           opts.ImageRegistry + conf.KubeImage,
+					ImagePullPolicy: api.PullAlways,
+					Command:         []string{"/usr/local/bin/stage0"},
+					Ports: []api.ContainerPort{
+						{
+							ContainerPort: 80,
+						},
+					},
+					Env: []api.EnvVar{
+						{
+							Name:  "IN_KUBERNETES",
+							Value: "1",
+						},
+						{
+							Value: conf.BuildletBinaryURL(),
+						},
+					},
+				},
+			},
+		},
+	}
+	if _, err := kubeClient.Run(p); err != nil {
+		return nil, fmt.Errorf("pod could not be created: %v", err)
+	}
+	return nil, errors.New("TODO")
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index d734705..a59037e 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -129,12 +129,13 @@
 const (
 	maxStatusDone = 30
-	// vmDeleteTimeout is how long before we delete a VM.
+	// vmDeleteTimeout and podDeleteTimeout is how long before we delete a VM.
 	// In practice this need only be as long as the slowest
 	// builder (plan9 currently), because on startup this program
 	// already deletes all buildlets it doesn't know about
 	// (i.e. ones from a previous instance of the coordinator).
-	vmDeleteTimeout = 45 * time.Minute
+	vmDeleteTimeout  = 45 * time.Minute
+	podDeleteTimeout = 45 * time.Minute
 func readGCSFile(name string) ([]byte, error) {
@@ -254,6 +255,11 @@
 			*mode = "prod"
+	err = initKube()
+	if err != nil {
+		log.Printf("Kube support disabled due to eror initializing Kubernetes: %v", err)
+	}
 	switch *mode {
 	case "dev", "prod":
 		log.Printf("Running in %s mode", *mode)
diff --git a/cmd/coordinator/gce.go b/cmd/coordinator/gce.go
index a2b87d4..dcc0084 100644
--- a/cmd/coordinator/gce.go
+++ b/cmd/coordinator/gce.go
@@ -83,7 +83,13 @@
 	if err != nil {
 		return fmt.Errorf("failed to get current GCE ProjectID: %v", err)
-	tokenSource = google.ComputeTokenSource("default")
+	inStaging = projectID == "go-dashboard-dev"
+	if inStaging {
+		log.Printf("Running in staging cluster (%q)", projectID)
+	}
+	tokenSource, _ = google.DefaultTokenSource(oauth2.NoContext)
 	httpClient := oauth2.NewClient(oauth2.NoContext, tokenSource)
 	serviceCtx = cloud.NewContext(projectID, httpClient)
@@ -112,10 +118,6 @@
 		log.Printf("TryBot builders enabled.")
-	inStaging = projectID == "go-dashboard-dev"
-	if inStaging {
-		log.Printf("Running in staging cluster (%q)", projectID)
-	}
 	go gcePool.pollQuotaLoop()
 	return nil
@@ -127,7 +129,7 @@
 	wr := storage.NewWriter(serviceCtx, buildLogBucket(), "hello.txt")
 	fmt.Fprintf(wr, "Hello, world! Coordinator start-up at %v", time.Now())
 	if err := wr.Close(); err != nil {
-		return fmt.Errorf("test write of a GCS object failed: %v", err)
+		return fmt.Errorf("test write of a GCS object to bucket %q failed: %v", buildLogBucket(), err)
 	gobotPass, err := metadata.ProjectAttributeValue("gobot-password")
 	if err != nil {
@@ -516,8 +518,9 @@
 func hasScope(want string) bool {
+	// If not on GCE, assume full access
 	if !metadata.OnGCE() {
-		return false
+		return true
 	scopes, err := metadata.Scopes("default")
 	if err != nil {
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index df1cd1d..f90b817 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -5,20 +5,103 @@
 package main
 import (
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/base64"
+	"log"
+	"net/http"
+	"strings"
-	""
+	""
+	""
+	container ""
 This file implements the Kubernetes-based buildlet pool.
+// Initialized by initKube:
+var (
+	containerService *container.Service
+	kubeClient       *kubernetes.Client
+	initKubeCalled   bool
+	registryPrefix   = "" + projectID + "/"
+const (
+	clusterName = "buildlets"
+func initKube() error {
+	initKubeCalled = true
+	if !hasCloudPlatformScope() {
+		return errors.New("coordinator not running with access to the Cloud Platform scope.")
+	}
+	httpClient := oauth2.NewClient(oauth2.NoContext, tokenSource)
+	containerService, _ = container.New(httpClient)
+	cluster, err := containerService.Projects.Zones.Clusters.Get(projectID, projectZone, clusterName).Do()
+	if err != nil {
+		return fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, projectID, projectZone, err)
+	}
+	// Decode certs
+	decode := func(which string, cert string) []byte {
+		if err != nil {
+			return nil
+		}
+		s, decErr := base64.StdEncoding.DecodeString(cert)
+		if decErr != nil {
+			err = fmt.Errorf("error decoding %s cert: %v", which, decErr)
+		}
+		return []byte(s)
+	}
+	clientCert := decode("client cert", cluster.MasterAuth.ClientCertificate)
+	clientKey := decode("client key", cluster.MasterAuth.ClientKey)
+	caCert := decode("cluster cert", cluster.MasterAuth.ClusterCaCertificate)
+	if err != nil {
+		return err
+	}
+	// HTTPS client
+	cert, err := tls.X509KeyPair(clientCert, clientKey)
+	if err != nil {
+		return fmt.Errorf("x509 client key pair could not be generated: %v", err)
+	}
+	// CA Cert from kube master
+	caCertPool := x509.NewCertPool()
+	caCertPool.AppendCertsFromPEM([]byte(caCert))
+	// Setup TLS config
+	tlsConfig := &tls.Config{
+		Certificates: []tls.Certificate{cert},
+		RootCAs:      caCertPool,
+	}
+	tlsConfig.BuildNameToCertificate()
+	kubeHTTPClient := &http.Client{
+		Transport: &http.Transport{
+			TLSClientConfig: tlsConfig,
+		},
+	}
+	kubeClient, err = kubernetes.NewClient(cluster.Endpoint, kubeHTTPClient)
+	if err != nil {
+		return fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
+	}
+	return nil
 var kubePool = &kubeBuildletPool{}
 // kubeBuildletPool is the Kubernetes buildlet pool.
@@ -27,8 +110,61 @@
 	mu sync.Mutex
-func (p *kubeBuildletPool) GetBuildlet(cancel Cancel, machineType, rev string, el eventTimeLogger) (*buildlet.Client, error) {
-	return nil, errors.New("TODO")
+func (p *kubeBuildletPool) GetBuildlet(cancel Cancel, typ, rev string, el eventTimeLogger) (*buildlet.Client, error) {
+	conf, ok := dashboard.Builders[typ]
+	if !ok || conf.KubeImage == "" {
+		return nil, fmt.Errorf("kubepool: invalid builder type %q", typ)
+	}
+	deleteIn := podDeleteTimeout
+	if strings.HasPrefix(rev, "user-") {
+		// Created by gomote (see remote.go), so don't kill it in 45 minutes.
+		// remote.go handles timeouts itself.
+		deleteIn = 0
+		rev = strings.TrimPrefix(rev, "user-")
+	}
+	// name is the cluster-wide unique name of the kubernetes pod. Max length
+	// is not documented, but it's kept <= 61 bytes, in line with GCE
+	revPrefix := rev
+	if len(revPrefix) > 8 {
+		revPrefix = rev[:8]
+	}
+	podName := "buildlet-" + typ + "-" + revPrefix + "-rn" + randHex(6)
+	var needDelete bool
+	el.logEventTime("creating_kube_pod", podName)
+	log.Printf("Creating Kubernetes  pod %q for %s at %s", podName, typ, rev)
+	bc, err := buildlet.StartPod(kubeClient, podName, typ, buildlet.PodOpts{
+		ImageRegistry: registryPrefix,
+		Description:   fmt.Sprintf("Go Builder for %s at %s", typ, rev),
+		DeleteIn:      deleteIn,
+		OnPodRequested: func() {
+			el.logEventTime("pod_create_requested", podName)
+			log.Printf("Pod %q starting", podName)
+		},
+		OnPodCreated: func() {
+			el.logEventTime("pod_created")
+			needDelete = true // redundant with OnPodRequested one, but fine.
+		},
+		OnGotPodInfo: func() {
+			el.logEventTime("got_pod_info", "waiting_for_buildlet...")
+		},
+	})
+	if err != nil {
+		el.logEventTime("kube_buildlet_create_failure", fmt.Sprintf("%s: %v", podName, err))
+		log.Printf("Failed to create kube pod for %s, %s: %v", typ, rev, err)
+		if needDelete {
+			//TODO(evanbrown): delete pod
+		}
+		//p.setInstanceUsed(instName, false)
+		return nil, err
+	}
+	bc.SetDescription("Kube Pod: " + podName)
+	return bc, nil
 func (p *kubeBuildletPool) WriteHTMLStatus(w io.Writer) {
@@ -44,67 +180,6 @@
 	return fmt.Sprintf("Kubernetes pool capacity: %d/%d", inUse, total)
-// uid is caller-generated random id for the build
-func buildletPod(cfg dashboard.BuildConfig, uid string) (*api.Pod, error) {
-	pn := fmt.Sprintf("%v-%v", cfg.Name, uid)
-	p := &api.Pod{
-		TypeMeta: api.TypeMeta{
-			APIVersion: "v1",
-			Kind:       "Pod",
-		},
-		ObjectMeta: api.ObjectMeta{
-			Name: pn,
-			Labels: map[string]string{
-				"type": "buildlet",
-				"name": pn,
-			},
-		},
-		Spec: api.PodSpec{
-			Containers: []api.Container{
-				{
-					Name:  pn,
-					Image: cfg.KubeImage,
-					Ports: []api.ContainerPort{
-						{
-							Name:          "buildlet",
-							ContainerPort: 80,
-						},
-					},
-					Env: []api.EnvVar{
-						{
-							Name:  "IN_KUBERNETES",
-							Value: "1",
-						},
-					},
-				},
-			},
-		},
-	}
-	return p, nil
-func buildletService(p *api.Pod) (*api.Service, error) {
-	s := &api.Service{
-		TypeMeta: api.TypeMeta{
-			APIVersion: "v1",
-			Kind:       "Service",
-		},
-		ObjectMeta: api.ObjectMeta{
-			Name: p.ObjectMeta.Name,
-			Labels: map[string]string{
-				"type": "buildlet-service",
-				"name": p.ObjectMeta.Name,
-			},
-		},
-		Spec: api.ServiceSpec{
-			Selector: p.ObjectMeta.Labels,
-			Type:     api.ServiceTypeNodePort,
-			Ports: []api.ServicePort{
-				{
-					Protocol: api.ProtocolTCP,
-				},
-			},
-		},
-	}
-	return s, nil
+func hasCloudPlatformScope() bool {
+	return hasScope(container.CloudPlatformScope)