cmd/coordinator: discover kube cluster
Find the default Kubernetes cluster and configure a client to talk to it.
Use application default credentials.
Updates golang/go#12546
Change-Id: Ifb1ce57f52f4fbbee3267f8cc3cf02a78146bd5b
Reviewed-on: https://go-review.googlesource.com/14532
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/buildlet/kube.go b/buildlet/kube.go
new file mode 100644
index 0000000..f29bd41
--- /dev/null
+++ b/buildlet/kube.go
@@ -0,0 +1,104 @@
+// Copyright 2015 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package buildlet
+
+import (
+ "errors"
+ "fmt"
+ "time"
+
+ "golang.org/x/build/dashboard"
+ "golang.org/x/build/kubernetes"
+ "golang.org/x/build/kubernetes/api"
+)
+
+// PodOpts control how new pods are started.
+type PodOpts struct {
+ // ImageRegistry specifies the Docker registry Kubernetes
+ // will use to create the pod. Required.
+ ImageRegistry string
+
+ // TLS optionally specifies the TLS keypair to use.
+ // If zero, http without auth is used.
+ TLS KeyPair
+
+ // Description optionally describes the pod.
+ Description string
+
+ // Labels optionally specify key=value strings that Kubernetes
+ // can use to filter and group pods.
+ Labels map[string]string
+
+ // DeleteIn optionally specifies a duration at which
+ // to delete the pod.
+ DeleteIn time.Duration
+
+ // OnInstanceRequested optionally specifies a hook to run synchronously
+ // after the pod create call, but before
+ // waiting for its operation to proceed.
+ OnPodRequested func()
+
+ // OnPodCreated optionally specifies a hook to run synchronously
+ // after the pod operation succeeds.
+ OnPodCreated func()
+
+ // OnPodCreated optionally specifies a hook to run synchronously
+ // after the pod Get call.
+ OnGotPodInfo func()
+}
+
+// StartPod creates a new pod on a Kubernetes cluster and returns a buildlet client
+// configured to speak to it.
+func StartPod(kubeClient *kubernetes.Client, podName, builderType string, opts PodOpts) (*Client, error) {
+ conf, ok := dashboard.Builders[builderType]
+ if !ok || conf.KubeImage == "" {
+ return nil, fmt.Errorf("invalid builder type %q", builderType)
+ }
+ p := &api.Pod{
+ TypeMeta: api.TypeMeta{
+ APIVersion: "v1",
+ Kind: "Pod",
+ },
+ ObjectMeta: api.ObjectMeta{
+ Name: podName,
+ Labels: map[string]string{
+ "name": podName,
+ "type": builderType,
+ "role": "buildlet",
+ },
+ },
+ Spec: api.PodSpec{
+ RestartPolicy: "Never",
+ Containers: []api.Container{
+ {
+ Name: "buildlet",
+ Image: opts.ImageRegistry + conf.KubeImage,
+ ImagePullPolicy: api.PullAlways,
+ Command: []string{"/usr/local/bin/stage0"},
+ Ports: []api.ContainerPort{
+ {
+ ContainerPort: 80,
+ },
+ },
+ Env: []api.EnvVar{
+ {
+ Name: "IN_KUBERNETES",
+ Value: "1",
+ },
+ {
+ Name: "META_BUILDLET_BINARY_URL",
+ Value: conf.BuildletBinaryURL(),
+ },
+ },
+ },
+ },
+ },
+ }
+
+ if _, err := kubeClient.Run(p); err != nil {
+ return nil, fmt.Errorf("pod could not be created: %v", err)
+ }
+ return nil, errors.New("TODO")
+}
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index d734705..a59037e 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -129,12 +129,13 @@
const (
maxStatusDone = 30
- // vmDeleteTimeout is how long before we delete a VM.
+ // vmDeleteTimeout and podDeleteTimeout is how long before we delete a VM.
// In practice this need only be as long as the slowest
// builder (plan9 currently), because on startup this program
// already deletes all buildlets it doesn't know about
// (i.e. ones from a previous instance of the coordinator).
- vmDeleteTimeout = 45 * time.Minute
+ vmDeleteTimeout = 45 * time.Minute
+ podDeleteTimeout = 45 * time.Minute
)
func readGCSFile(name string) ([]byte, error) {
@@ -254,6 +255,11 @@
*mode = "prod"
}
}
+
+ err = initKube()
+ if err != nil {
+ log.Printf("Kube support disabled due to eror initializing Kubernetes: %v", err)
+ }
switch *mode {
case "dev", "prod":
log.Printf("Running in %s mode", *mode)
diff --git a/cmd/coordinator/gce.go b/cmd/coordinator/gce.go
index a2b87d4..dcc0084 100644
--- a/cmd/coordinator/gce.go
+++ b/cmd/coordinator/gce.go
@@ -83,7 +83,13 @@
if err != nil {
return fmt.Errorf("failed to get current GCE ProjectID: %v", err)
}
- tokenSource = google.ComputeTokenSource("default")
+
+ inStaging = projectID == "go-dashboard-dev"
+ if inStaging {
+ log.Printf("Running in staging cluster (%q)", projectID)
+ }
+
+ tokenSource, _ = google.DefaultTokenSource(oauth2.NoContext)
httpClient := oauth2.NewClient(oauth2.NoContext, tokenSource)
serviceCtx = cloud.NewContext(projectID, httpClient)
@@ -112,10 +118,6 @@
log.Printf("TryBot builders enabled.")
}
- inStaging = projectID == "go-dashboard-dev"
- if inStaging {
- log.Printf("Running in staging cluster (%q)", projectID)
- }
go gcePool.pollQuotaLoop()
return nil
}
@@ -127,7 +129,7 @@
wr := storage.NewWriter(serviceCtx, buildLogBucket(), "hello.txt")
fmt.Fprintf(wr, "Hello, world! Coordinator start-up at %v", time.Now())
if err := wr.Close(); err != nil {
- return fmt.Errorf("test write of a GCS object failed: %v", err)
+ return fmt.Errorf("test write of a GCS object to bucket %q failed: %v", buildLogBucket(), err)
}
gobotPass, err := metadata.ProjectAttributeValue("gobot-password")
if err != nil {
@@ -516,8 +518,9 @@
}
func hasScope(want string) bool {
+ // If not on GCE, assume full access
if !metadata.OnGCE() {
- return false
+ return true
}
scopes, err := metadata.Scopes("default")
if err != nil {
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index df1cd1d..f90b817 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -5,20 +5,103 @@
package main
import (
+ "crypto/tls"
+ "crypto/x509"
+ "encoding/base64"
"errors"
"fmt"
"io"
+ "log"
+ "net/http"
+ "strings"
"sync"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
- "golang.org/x/build/kubernetes/api"
+ "golang.org/x/build/kubernetes"
+ "golang.org/x/oauth2"
+ container "google.golang.org/api/container/v1"
)
/*
This file implements the Kubernetes-based buildlet pool.
*/
+// Initialized by initKube:
+var (
+ containerService *container.Service
+ kubeClient *kubernetes.Client
+ initKubeCalled bool
+ registryPrefix = "grc.io/" + projectID + "/"
+)
+
+const (
+ clusterName = "buildlets"
+)
+
+func initKube() error {
+ initKubeCalled = true
+
+ if !hasCloudPlatformScope() {
+ return errors.New("coordinator not running with access to the Cloud Platform scope.")
+
+ }
+ httpClient := oauth2.NewClient(oauth2.NoContext, tokenSource)
+ containerService, _ = container.New(httpClient)
+
+ cluster, err := containerService.Projects.Zones.Clusters.Get(projectID, projectZone, clusterName).Do()
+ if err != nil {
+ return fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, projectID, projectZone, err)
+ }
+
+ // Decode certs
+ decode := func(which string, cert string) []byte {
+ if err != nil {
+ return nil
+ }
+ s, decErr := base64.StdEncoding.DecodeString(cert)
+ if decErr != nil {
+ err = fmt.Errorf("error decoding %s cert: %v", which, decErr)
+ }
+ return []byte(s)
+ }
+ clientCert := decode("client cert", cluster.MasterAuth.ClientCertificate)
+ clientKey := decode("client key", cluster.MasterAuth.ClientKey)
+ caCert := decode("cluster cert", cluster.MasterAuth.ClusterCaCertificate)
+ if err != nil {
+ return err
+ }
+
+ // HTTPS client
+ cert, err := tls.X509KeyPair(clientCert, clientKey)
+ if err != nil {
+ return fmt.Errorf("x509 client key pair could not be generated: %v", err)
+ }
+
+ // CA Cert from kube master
+ caCertPool := x509.NewCertPool()
+ caCertPool.AppendCertsFromPEM([]byte(caCert))
+
+ // Setup TLS config
+ tlsConfig := &tls.Config{
+ Certificates: []tls.Certificate{cert},
+ RootCAs: caCertPool,
+ }
+ tlsConfig.BuildNameToCertificate()
+
+ kubeHTTPClient := &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: tlsConfig,
+ },
+ }
+
+ kubeClient, err = kubernetes.NewClient(cluster.Endpoint, kubeHTTPClient)
+ if err != nil {
+ return fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
+ }
+ return nil
+}
+
var kubePool = &kubeBuildletPool{}
// kubeBuildletPool is the Kubernetes buildlet pool.
@@ -27,8 +110,61 @@
mu sync.Mutex
}
-func (p *kubeBuildletPool) GetBuildlet(cancel Cancel, machineType, rev string, el eventTimeLogger) (*buildlet.Client, error) {
- return nil, errors.New("TODO")
+func (p *kubeBuildletPool) GetBuildlet(cancel Cancel, typ, rev string, el eventTimeLogger) (*buildlet.Client, error) {
+ conf, ok := dashboard.Builders[typ]
+ if !ok || conf.KubeImage == "" {
+ return nil, fmt.Errorf("kubepool: invalid builder type %q", typ)
+ }
+
+ deleteIn := podDeleteTimeout
+ if strings.HasPrefix(rev, "user-") {
+ // Created by gomote (see remote.go), so don't kill it in 45 minutes.
+ // remote.go handles timeouts itself.
+ deleteIn = 0
+ rev = strings.TrimPrefix(rev, "user-")
+ }
+
+ // name is the cluster-wide unique name of the kubernetes pod. Max length
+ // is not documented, but it's kept <= 61 bytes, in line with GCE
+ revPrefix := rev
+ if len(revPrefix) > 8 {
+ revPrefix = rev[:8]
+ }
+
+ podName := "buildlet-" + typ + "-" + revPrefix + "-rn" + randHex(6)
+
+ var needDelete bool
+
+ el.logEventTime("creating_kube_pod", podName)
+ log.Printf("Creating Kubernetes pod %q for %s at %s", podName, typ, rev)
+
+ bc, err := buildlet.StartPod(kubeClient, podName, typ, buildlet.PodOpts{
+ ImageRegistry: registryPrefix,
+ Description: fmt.Sprintf("Go Builder for %s at %s", typ, rev),
+ DeleteIn: deleteIn,
+ OnPodRequested: func() {
+ el.logEventTime("pod_create_requested", podName)
+ log.Printf("Pod %q starting", podName)
+ },
+ OnPodCreated: func() {
+ el.logEventTime("pod_created")
+ needDelete = true // redundant with OnPodRequested one, but fine.
+ },
+ OnGotPodInfo: func() {
+ el.logEventTime("got_pod_info", "waiting_for_buildlet...")
+ },
+ })
+ if err != nil {
+ el.logEventTime("kube_buildlet_create_failure", fmt.Sprintf("%s: %v", podName, err))
+ log.Printf("Failed to create kube pod for %s, %s: %v", typ, rev, err)
+ if needDelete {
+ //TODO(evanbrown): delete pod
+ }
+ //p.setInstanceUsed(instName, false)
+ return nil, err
+ }
+ bc.SetDescription("Kube Pod: " + podName)
+ return bc, nil
}
func (p *kubeBuildletPool) WriteHTMLStatus(w io.Writer) {
@@ -44,67 +180,6 @@
return fmt.Sprintf("Kubernetes pool capacity: %d/%d", inUse, total)
}
-// uid is caller-generated random id for the build
-func buildletPod(cfg dashboard.BuildConfig, uid string) (*api.Pod, error) {
- pn := fmt.Sprintf("%v-%v", cfg.Name, uid)
- p := &api.Pod{
- TypeMeta: api.TypeMeta{
- APIVersion: "v1",
- Kind: "Pod",
- },
- ObjectMeta: api.ObjectMeta{
- Name: pn,
- Labels: map[string]string{
- "type": "buildlet",
- "name": pn,
- },
- },
- Spec: api.PodSpec{
- Containers: []api.Container{
- {
- Name: pn,
- Image: cfg.KubeImage,
- Ports: []api.ContainerPort{
- {
- Name: "buildlet",
- ContainerPort: 80,
- },
- },
- Env: []api.EnvVar{
- {
- Name: "IN_KUBERNETES",
- Value: "1",
- },
- },
- },
- },
- },
- }
- return p, nil
-}
-
-func buildletService(p *api.Pod) (*api.Service, error) {
- s := &api.Service{
- TypeMeta: api.TypeMeta{
- APIVersion: "v1",
- Kind: "Service",
- },
- ObjectMeta: api.ObjectMeta{
- Name: p.ObjectMeta.Name,
- Labels: map[string]string{
- "type": "buildlet-service",
- "name": p.ObjectMeta.Name,
- },
- },
- Spec: api.ServiceSpec{
- Selector: p.ObjectMeta.Labels,
- Type: api.ServiceTypeNodePort,
- Ports: []api.ServicePort{
- {
- Protocol: api.ProtocolTCP,
- },
- },
- },
- }
- return s, nil
+func hasCloudPlatformScope() bool {
+ return hasScope(container.CloudPlatformScope)
}