Add gke package, add kubenetes.Dialer type.
Updates golang/go#18817
Change-Id: Ifee53384486b0692899b77be2eaa42ca9006ef8e
Reviewed-on: https://go-review.googlesource.com/36016
Reviewed-by: Chris Broadfoot <cbro@golang.org>
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index b57677d..1f184e0 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -6,14 +6,10 @@
import (
"context"
- "crypto/tls"
- "crypto/x509"
- "encoding/base64"
"errors"
"fmt"
"io"
"log"
- "net/http"
"sort"
"strconv"
"strings"
@@ -24,7 +20,7 @@
"golang.org/x/build/dashboard"
"golang.org/x/build/kubernetes"
"golang.org/x/build/kubernetes/api"
- "golang.org/x/oauth2"
+ "golang.org/x/build/kubernetes/gke"
container "google.golang.org/api/container/v1"
)
@@ -56,64 +52,18 @@
if !hasCloudPlatformScope() {
return errors.New("coordinator not running with access to the Cloud Platform scope.")
}
- httpClient := oauth2.NewClient(oauth2.NoContext, tokenSource)
- var err error
- containerService, err = container.New(httpClient)
- if err != nil {
- return fmt.Errorf("could not create client for Google Container Engine: %v", err)
- }
-
- kubeCluster, err = containerService.Projects.Zones.Clusters.Get(buildEnv.ProjectName, buildEnv.Zone, clusterName).Do()
- if err != nil {
- return fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, buildEnv.ProjectName, buildEnv.Zone, err)
- }
-
- // Decode certs
- decode := func(which string, cert string) []byte {
- if err != nil {
- return nil
- }
- s, decErr := base64.StdEncoding.DecodeString(cert)
- if decErr != nil {
- err = fmt.Errorf("error decoding %s cert: %v", which, decErr)
- }
- return []byte(s)
- }
- clientCert := decode("client cert", kubeCluster.MasterAuth.ClientCertificate)
- clientKey := decode("client key", kubeCluster.MasterAuth.ClientKey)
- caCert := decode("cluster cert", kubeCluster.MasterAuth.ClusterCaCertificate)
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
+ defer cancel() // ctx is only used for discovery and connect; not retained.
+ kc, err := gke.NewClient(ctx,
+ clusterName,
+ gke.OptZone(buildEnv.Zone),
+ gke.OptProject(buildEnv.ProjectName),
+ gke.OptTokenSource(tokenSource))
if err != nil {
return err
}
-
- // HTTPS client
- cert, err := tls.X509KeyPair(clientCert, clientKey)
- if err != nil {
- return fmt.Errorf("x509 client key pair could not be generated: %v", err)
- }
-
- // CA Cert from kube master
- caCertPool := x509.NewCertPool()
- caCertPool.AppendCertsFromPEM([]byte(caCert))
-
- // Setup TLS config
- tlsConfig := &tls.Config{
- Certificates: []tls.Certificate{cert},
- RootCAs: caCertPool,
- }
- tlsConfig.BuildNameToCertificate()
-
- kubeHTTPClient := &http.Client{
- Transport: &http.Transport{
- TLSClientConfig: tlsConfig,
- },
- }
-
- kubeClient, err = kubernetes.NewClient("https://"+kubeCluster.Endpoint, kubeHTTPClient)
- if err != nil {
- return fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
- }
+ kubeClient = kc
go kubePool.pollCapacityLoop()
return nil
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 5aa82d6..80604d5 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -51,6 +51,14 @@
}, nil
}
+// Close closes any idle HTTP connections still connected to the Kubernetes master.
+func (c *Client) Close() error {
+ if tr, ok := c.httpClient.Transport.(*http.Transport); ok {
+ tr.CloseIdleConnections()
+ }
+ return nil
+}
+
// RunLongLivedPod creates a new pod resource in the default pod namespace with
// the given pod API specification. It assumes the pod runs a
// long-lived server (i.e. if the container exit quickly quickly, even
diff --git a/kubernetes/dialer.go b/kubernetes/dialer.go
new file mode 100644
index 0000000..6439635
--- /dev/null
+++ b/kubernetes/dialer.go
@@ -0,0 +1,35 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package kubernetes
+
+import (
+ "context"
+ "fmt"
+ "net"
+ "strconv"
+)
+
+// Dialer dials Kubernetes pods.
+//
+// TODO: services also.
+type Dialer struct {
+ kc *Client
+}
+
+func NewDialer(kc *Client) *Dialer {
+ return &Dialer{kc: kc}
+}
+
+func (d *Dialer) Dial(ctx context.Context, podName string, port int) (net.Conn, error) {
+ status, err := d.kc.PodStatus(ctx, podName)
+ if err != nil {
+ return nil, fmt.Errorf("PodStatus of %q: %v", podName, err)
+ }
+ if status.Phase != "Running" {
+ return nil, fmt.Errorf("pod %q in state %q", podName, status.Phase)
+ }
+ var dialer net.Dialer
+ return dialer.DialContext(ctx, "tcp", net.JoinHostPort(status.PodIP, strconv.Itoa(port)))
+}
diff --git a/kubernetes/gke/gke.go b/kubernetes/gke/gke.go
new file mode 100644
index 0000000..0811b53
--- /dev/null
+++ b/kubernetes/gke/gke.go
@@ -0,0 +1,174 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package gke contains code for interacting with Google Container Engine (GKE),
+// the hosted version of Kubernetes on Google Cloud Platform.
+//
+// The API is not subject to the Go 1 compatibility promise and may change at
+// any time. Users should vendor this package and deal with API changes.
+package gke
+
+import (
+ "context"
+ "crypto/tls"
+ "crypto/x509"
+ "encoding/base64"
+ "fmt"
+ "net/http"
+
+ "cloud.google.com/go/compute/metadata"
+
+ "golang.org/x/build/kubernetes"
+ "golang.org/x/oauth2"
+ "golang.org/x/oauth2/google"
+ compute "google.golang.org/api/compute/v1"
+ "google.golang.org/api/container/v1"
+)
+
+// ClientOpt represents an option that can be passed to the Client function.
+type ClientOpt interface {
+ modify(*clientOpt)
+}
+
+type clientOpt struct {
+ Project string
+ TokenSource oauth2.TokenSource
+ Zone string
+}
+
+type clientOptFunc func(*clientOpt)
+
+func (f clientOptFunc) modify(o *clientOpt) { f(o) }
+
+// OptProject returns an option setting the GCE Project ID to projectName.
+// This is the named project ID, not the numeric ID.
+// If unspecified, the current active project ID is used, if the program is running
+// on a GCE intance.
+func OptProject(projectName string) ClientOpt {
+ return clientOptFunc(func(o *clientOpt) {
+ o.Project = projectName
+ })
+}
+
+// OptZone specifies the GCP zone the cluster is located in.
+// This is necessary if and only if there are multiple GKE clusters with
+// the same name in different zones.
+func OptZone(zoneName string) ClientOpt {
+ return clientOptFunc(func(o *clientOpt) {
+ o.Zone = zoneName
+ })
+}
+
+// OptTokenSource sets the oauth2 token source for making
+// authenticated requests to the GKE API. If unset, the default token
+// source is used (https://godoc.org/golang.org/x/oauth2/google#DefaultTokenSource).
+func OptTokenSource(ts oauth2.TokenSource) ClientOpt {
+ return clientOptFunc(func(o *clientOpt) {
+ o.TokenSource = ts
+ })
+}
+
+// NewClient returns an Kubernetes client to a GKE cluster.
+func NewClient(ctx context.Context, clusterName string, opts ...ClientOpt) (*kubernetes.Client, error) {
+ var opt clientOpt
+ for _, o := range opts {
+ o.modify(&opt)
+ }
+ if opt.TokenSource == nil {
+ var err error
+ opt.TokenSource, err = google.DefaultTokenSource(ctx, compute.CloudPlatformScope)
+ if err != nil {
+ return nil, fmt.Errorf("failed to get a token source: %v", err)
+ }
+ }
+ if opt.Project == "" {
+ proj, err := metadata.ProjectID()
+ if err != nil {
+ return nil, fmt.Errorf("metadata.ProjectID: %v", err)
+ }
+ opt.Project = proj
+ }
+
+ httpClient := oauth2.NewClient(ctx, opt.TokenSource)
+ containerService, err := container.New(httpClient)
+ if err != nil {
+ return nil, fmt.Errorf("could not create client for Google Container Engine: %v", err)
+ }
+
+ var cluster *container.Cluster
+ if opt.Zone == "" {
+ clusters, err := containerService.Projects.Zones.Clusters.List(opt.Project, "-").Context(ctx).Do()
+ if err != nil {
+ return nil, err
+ }
+ if len(clusters.MissingZones) > 0 {
+ return nil, fmt.Errorf("GKE cluster list response contains missing zones: %v", clusters.MissingZones)
+ }
+ matches := 0
+ for _, cl := range clusters.Clusters {
+ if cl.Name == clusterName {
+ cluster = cl
+ matches++
+ }
+ }
+ if matches == 0 {
+ return nil, fmt.Errorf("cluster %q not found in any zone", clusterName)
+ }
+ if matches > 1 {
+ return nil, fmt.Errorf("cluster %q is ambiguous without using gke.OptZone to specify a zone", clusterName)
+ }
+ } else {
+ cluster, err = containerService.Projects.Zones.Clusters.Get(opt.Project, opt.Zone, clusterName).Context(ctx).Do()
+ if err != nil {
+ return nil, fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, opt.Project, opt.Zone, err)
+ }
+ }
+
+ // Decode certs
+ decode := func(which string, cert string) []byte {
+ if err != nil {
+ return nil
+ }
+ s, decErr := base64.StdEncoding.DecodeString(cert)
+ if decErr != nil {
+ err = fmt.Errorf("error decoding %s cert: %v", which, decErr)
+ }
+ return []byte(s)
+ }
+ clientCert := decode("client cert", cluster.MasterAuth.ClientCertificate)
+ clientKey := decode("client key", cluster.MasterAuth.ClientKey)
+ caCert := decode("cluster cert", cluster.MasterAuth.ClusterCaCertificate)
+ if err != nil {
+ return nil, err
+ }
+
+ // HTTPS client
+ cert, err := tls.X509KeyPair(clientCert, clientKey)
+ if err != nil {
+ return nil, fmt.Errorf("x509 client key pair could not be generated: %v", err)
+ }
+
+ // CA Cert from kube master
+ caCertPool := x509.NewCertPool()
+ caCertPool.AppendCertsFromPEM([]byte(caCert))
+
+ // Setup TLS config
+ tlsConfig := &tls.Config{
+ Certificates: []tls.Certificate{cert},
+ RootCAs: caCertPool,
+ }
+ tlsConfig.BuildNameToCertificate()
+
+ kubeHTTPClient := &http.Client{
+ Transport: &http.Transport{
+ TLSClientConfig: tlsConfig,
+ },
+ }
+
+ kubeClient, err := kubernetes.NewClient("https://"+cluster.Endpoint, kubeHTTPClient)
+ if err != nil {
+ return nil, fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
+ }
+ return kubeClient, nil
+}
diff --git a/kubernetes/gke/gke_test.go b/kubernetes/gke/gke_test.go
new file mode 100644
index 0000000..c86efa4
--- /dev/null
+++ b/kubernetes/gke/gke_test.go
@@ -0,0 +1,89 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gke_test
+
+import (
+ "context"
+ "strings"
+ "testing"
+
+ "cloud.google.com/go/compute/metadata"
+ "golang.org/x/build/kubernetes"
+ "golang.org/x/build/kubernetes/gke"
+ "golang.org/x/oauth2"
+ "golang.org/x/oauth2/google"
+ compute "google.golang.org/api/compute/v1"
+ container "google.golang.org/api/container/v1"
+)
+
+// Tests NewClient and also Dialer.
+func TestNewClient(t *testing.T) {
+ if !metadata.OnGCE() {
+ t.Skip("not on GCE; skipping")
+ }
+ ctx := context.Background()
+ ts, err := google.DefaultTokenSource(ctx, compute.CloudPlatformScope)
+ if err != nil {
+ t.Fatal(err)
+ }
+ httpClient := oauth2.NewClient(ctx, ts)
+ containerService, err := container.New(httpClient)
+ if err != nil {
+ t.Fatal(err)
+ }
+ proj, err := metadata.ProjectID()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ clusters, err := containerService.Projects.Zones.Clusters.List(proj, "-").Context(ctx).Do()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if len(clusters.Clusters) == 0 {
+ t.Skip("no GKE clusters")
+ }
+ var candidates int
+ for _, cl := range clusters.Clusters {
+ kc, err := gke.NewClient(ctx, cl.Name, gke.OptZone(cl.Zone))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer kc.Close()
+
+ pods, err := kc.GetPods(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, pod := range pods {
+ if pod.Status.Phase != "Running" {
+ continue
+ }
+ for _, container := range pod.Spec.Containers {
+ name := container.Name
+ for _, port := range container.Ports {
+ if strings.ToLower(string(port.Protocol)) == "udp" || port.ContainerPort == 0 {
+ continue
+ }
+ candidates++
+ d := kubernetes.NewDialer(kc)
+ c, err := d.Dial(ctx, name, port.ContainerPort)
+ if err != nil {
+ t.Logf("Dial %q/%q/%d: %v", cl.Name, name, port.ContainerPort, err)
+ continue
+ }
+ c.Close()
+ t.Logf("Dialed %q/%q/%d.", cl.Name, name, port.ContainerPort)
+ return
+ }
+ }
+ }
+ }
+ if candidates == 0 {
+ t.Skip("no pods to dial")
+ }
+ t.Errorf("dial failures")
+}