Add gke package, add kubenetes.Dialer type.

Updates golang/go#18817

Change-Id: Ifee53384486b0692899b77be2eaa42ca9006ef8e
Reviewed-on: https://go-review.googlesource.com/36016
Reviewed-by: Chris Broadfoot <cbro@golang.org>
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index b57677d..1f184e0 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -6,14 +6,10 @@
 
 import (
 	"context"
-	"crypto/tls"
-	"crypto/x509"
-	"encoding/base64"
 	"errors"
 	"fmt"
 	"io"
 	"log"
-	"net/http"
 	"sort"
 	"strconv"
 	"strings"
@@ -24,7 +20,7 @@
 	"golang.org/x/build/dashboard"
 	"golang.org/x/build/kubernetes"
 	"golang.org/x/build/kubernetes/api"
-	"golang.org/x/oauth2"
+	"golang.org/x/build/kubernetes/gke"
 	container "google.golang.org/api/container/v1"
 )
 
@@ -56,64 +52,18 @@
 	if !hasCloudPlatformScope() {
 		return errors.New("coordinator not running with access to the Cloud Platform scope.")
 	}
-	httpClient := oauth2.NewClient(oauth2.NoContext, tokenSource)
-	var err error
 
-	containerService, err = container.New(httpClient)
-	if err != nil {
-		return fmt.Errorf("could not create client for Google Container Engine: %v", err)
-	}
-
-	kubeCluster, err = containerService.Projects.Zones.Clusters.Get(buildEnv.ProjectName, buildEnv.Zone, clusterName).Do()
-	if err != nil {
-		return fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, buildEnv.ProjectName, buildEnv.Zone, err)
-	}
-
-	// Decode certs
-	decode := func(which string, cert string) []byte {
-		if err != nil {
-			return nil
-		}
-		s, decErr := base64.StdEncoding.DecodeString(cert)
-		if decErr != nil {
-			err = fmt.Errorf("error decoding %s cert: %v", which, decErr)
-		}
-		return []byte(s)
-	}
-	clientCert := decode("client cert", kubeCluster.MasterAuth.ClientCertificate)
-	clientKey := decode("client key", kubeCluster.MasterAuth.ClientKey)
-	caCert := decode("cluster cert", kubeCluster.MasterAuth.ClusterCaCertificate)
+	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
+	defer cancel() // ctx is only used for discovery and connect; not retained.
+	kc, err := gke.NewClient(ctx,
+		clusterName,
+		gke.OptZone(buildEnv.Zone),
+		gke.OptProject(buildEnv.ProjectName),
+		gke.OptTokenSource(tokenSource))
 	if err != nil {
 		return err
 	}
-
-	// HTTPS client
-	cert, err := tls.X509KeyPair(clientCert, clientKey)
-	if err != nil {
-		return fmt.Errorf("x509 client key pair could not be generated: %v", err)
-	}
-
-	// CA Cert from kube master
-	caCertPool := x509.NewCertPool()
-	caCertPool.AppendCertsFromPEM([]byte(caCert))
-
-	// Setup TLS config
-	tlsConfig := &tls.Config{
-		Certificates: []tls.Certificate{cert},
-		RootCAs:      caCertPool,
-	}
-	tlsConfig.BuildNameToCertificate()
-
-	kubeHTTPClient := &http.Client{
-		Transport: &http.Transport{
-			TLSClientConfig: tlsConfig,
-		},
-	}
-
-	kubeClient, err = kubernetes.NewClient("https://"+kubeCluster.Endpoint, kubeHTTPClient)
-	if err != nil {
-		return fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
-	}
+	kubeClient = kc
 
 	go kubePool.pollCapacityLoop()
 	return nil
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 5aa82d6..80604d5 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -51,6 +51,14 @@
 	}, nil
 }
 
+// Close closes any idle HTTP connections still connected to the Kubernetes master.
+func (c *Client) Close() error {
+	if tr, ok := c.httpClient.Transport.(*http.Transport); ok {
+		tr.CloseIdleConnections()
+	}
+	return nil
+}
+
 // RunLongLivedPod creates a new pod resource in the default pod namespace with
 // the given pod API specification. It assumes the pod runs a
 // long-lived server (i.e. if the container exit quickly quickly, even
diff --git a/kubernetes/dialer.go b/kubernetes/dialer.go
new file mode 100644
index 0000000..6439635
--- /dev/null
+++ b/kubernetes/dialer.go
@@ -0,0 +1,35 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package kubernetes
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"strconv"
+)
+
+// Dialer dials Kubernetes pods.
+//
+// TODO: services also.
+type Dialer struct {
+	kc *Client
+}
+
+func NewDialer(kc *Client) *Dialer {
+	return &Dialer{kc: kc}
+}
+
+func (d *Dialer) Dial(ctx context.Context, podName string, port int) (net.Conn, error) {
+	status, err := d.kc.PodStatus(ctx, podName)
+	if err != nil {
+		return nil, fmt.Errorf("PodStatus of %q: %v", podName, err)
+	}
+	if status.Phase != "Running" {
+		return nil, fmt.Errorf("pod %q in state %q", podName, status.Phase)
+	}
+	var dialer net.Dialer
+	return dialer.DialContext(ctx, "tcp", net.JoinHostPort(status.PodIP, strconv.Itoa(port)))
+}
diff --git a/kubernetes/gke/gke.go b/kubernetes/gke/gke.go
new file mode 100644
index 0000000..0811b53
--- /dev/null
+++ b/kubernetes/gke/gke.go
@@ -0,0 +1,174 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package gke contains code for interacting with Google Container Engine (GKE),
+// the hosted version of Kubernetes on Google Cloud Platform.
+//
+// The API is not subject to the Go 1 compatibility promise and may change at
+// any time. Users should vendor this package and deal with API changes.
+package gke
+
+import (
+	"context"
+	"crypto/tls"
+	"crypto/x509"
+	"encoding/base64"
+	"fmt"
+	"net/http"
+
+	"cloud.google.com/go/compute/metadata"
+
+	"golang.org/x/build/kubernetes"
+	"golang.org/x/oauth2"
+	"golang.org/x/oauth2/google"
+	compute "google.golang.org/api/compute/v1"
+	"google.golang.org/api/container/v1"
+)
+
+// ClientOpt represents an option that can be passed to the Client function.
+type ClientOpt interface {
+	modify(*clientOpt)
+}
+
+type clientOpt struct {
+	Project     string
+	TokenSource oauth2.TokenSource
+	Zone        string
+}
+
+type clientOptFunc func(*clientOpt)
+
+func (f clientOptFunc) modify(o *clientOpt) { f(o) }
+
+// OptProject returns an option setting the GCE Project ID to projectName.
+// This is the named project ID, not the numeric ID.
+// If unspecified, the current active project ID is used, if the program is running
+// on a GCE intance.
+func OptProject(projectName string) ClientOpt {
+	return clientOptFunc(func(o *clientOpt) {
+		o.Project = projectName
+	})
+}
+
+// OptZone specifies the GCP zone the cluster is located in.
+// This is necessary if and only if there are multiple GKE clusters with
+// the same name in different zones.
+func OptZone(zoneName string) ClientOpt {
+	return clientOptFunc(func(o *clientOpt) {
+		o.Zone = zoneName
+	})
+}
+
+// OptTokenSource sets the oauth2 token source for making
+// authenticated requests to the GKE API. If unset, the default token
+// source is used (https://godoc.org/golang.org/x/oauth2/google#DefaultTokenSource).
+func OptTokenSource(ts oauth2.TokenSource) ClientOpt {
+	return clientOptFunc(func(o *clientOpt) {
+		o.TokenSource = ts
+	})
+}
+
+// NewClient returns an Kubernetes client to a GKE cluster.
+func NewClient(ctx context.Context, clusterName string, opts ...ClientOpt) (*kubernetes.Client, error) {
+	var opt clientOpt
+	for _, o := range opts {
+		o.modify(&opt)
+	}
+	if opt.TokenSource == nil {
+		var err error
+		opt.TokenSource, err = google.DefaultTokenSource(ctx, compute.CloudPlatformScope)
+		if err != nil {
+			return nil, fmt.Errorf("failed to get a token source: %v", err)
+		}
+	}
+	if opt.Project == "" {
+		proj, err := metadata.ProjectID()
+		if err != nil {
+			return nil, fmt.Errorf("metadata.ProjectID: %v", err)
+		}
+		opt.Project = proj
+	}
+
+	httpClient := oauth2.NewClient(ctx, opt.TokenSource)
+	containerService, err := container.New(httpClient)
+	if err != nil {
+		return nil, fmt.Errorf("could not create client for Google Container Engine: %v", err)
+	}
+
+	var cluster *container.Cluster
+	if opt.Zone == "" {
+		clusters, err := containerService.Projects.Zones.Clusters.List(opt.Project, "-").Context(ctx).Do()
+		if err != nil {
+			return nil, err
+		}
+		if len(clusters.MissingZones) > 0 {
+			return nil, fmt.Errorf("GKE cluster list response contains missing zones: %v", clusters.MissingZones)
+		}
+		matches := 0
+		for _, cl := range clusters.Clusters {
+			if cl.Name == clusterName {
+				cluster = cl
+				matches++
+			}
+		}
+		if matches == 0 {
+			return nil, fmt.Errorf("cluster %q not found in any zone", clusterName)
+		}
+		if matches > 1 {
+			return nil, fmt.Errorf("cluster %q is ambiguous without using gke.OptZone to specify a zone", clusterName)
+		}
+	} else {
+		cluster, err = containerService.Projects.Zones.Clusters.Get(opt.Project, opt.Zone, clusterName).Context(ctx).Do()
+		if err != nil {
+			return nil, fmt.Errorf("cluster %q could not be found in project %q, zone %q: %v", clusterName, opt.Project, opt.Zone, err)
+		}
+	}
+
+	// Decode certs
+	decode := func(which string, cert string) []byte {
+		if err != nil {
+			return nil
+		}
+		s, decErr := base64.StdEncoding.DecodeString(cert)
+		if decErr != nil {
+			err = fmt.Errorf("error decoding %s cert: %v", which, decErr)
+		}
+		return []byte(s)
+	}
+	clientCert := decode("client cert", cluster.MasterAuth.ClientCertificate)
+	clientKey := decode("client key", cluster.MasterAuth.ClientKey)
+	caCert := decode("cluster cert", cluster.MasterAuth.ClusterCaCertificate)
+	if err != nil {
+		return nil, err
+	}
+
+	// HTTPS client
+	cert, err := tls.X509KeyPair(clientCert, clientKey)
+	if err != nil {
+		return nil, fmt.Errorf("x509 client key pair could not be generated: %v", err)
+	}
+
+	// CA Cert from kube master
+	caCertPool := x509.NewCertPool()
+	caCertPool.AppendCertsFromPEM([]byte(caCert))
+
+	// Setup TLS config
+	tlsConfig := &tls.Config{
+		Certificates: []tls.Certificate{cert},
+		RootCAs:      caCertPool,
+	}
+	tlsConfig.BuildNameToCertificate()
+
+	kubeHTTPClient := &http.Client{
+		Transport: &http.Transport{
+			TLSClientConfig: tlsConfig,
+		},
+	}
+
+	kubeClient, err := kubernetes.NewClient("https://"+cluster.Endpoint, kubeHTTPClient)
+	if err != nil {
+		return nil, fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
+	}
+	return kubeClient, nil
+}
diff --git a/kubernetes/gke/gke_test.go b/kubernetes/gke/gke_test.go
new file mode 100644
index 0000000..c86efa4
--- /dev/null
+++ b/kubernetes/gke/gke_test.go
@@ -0,0 +1,89 @@
+// Copyright 2017 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package gke_test
+
+import (
+	"context"
+	"strings"
+	"testing"
+
+	"cloud.google.com/go/compute/metadata"
+	"golang.org/x/build/kubernetes"
+	"golang.org/x/build/kubernetes/gke"
+	"golang.org/x/oauth2"
+	"golang.org/x/oauth2/google"
+	compute "google.golang.org/api/compute/v1"
+	container "google.golang.org/api/container/v1"
+)
+
+// Tests NewClient and also Dialer.
+func TestNewClient(t *testing.T) {
+	if !metadata.OnGCE() {
+		t.Skip("not on GCE; skipping")
+	}
+	ctx := context.Background()
+	ts, err := google.DefaultTokenSource(ctx, compute.CloudPlatformScope)
+	if err != nil {
+		t.Fatal(err)
+	}
+	httpClient := oauth2.NewClient(ctx, ts)
+	containerService, err := container.New(httpClient)
+	if err != nil {
+		t.Fatal(err)
+	}
+	proj, err := metadata.ProjectID()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	clusters, err := containerService.Projects.Zones.Clusters.List(proj, "-").Context(ctx).Do()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	if len(clusters.Clusters) == 0 {
+		t.Skip("no GKE clusters")
+	}
+	var candidates int
+	for _, cl := range clusters.Clusters {
+		kc, err := gke.NewClient(ctx, cl.Name, gke.OptZone(cl.Zone))
+		if err != nil {
+			t.Fatal(err)
+		}
+		defer kc.Close()
+
+		pods, err := kc.GetPods(ctx)
+		if err != nil {
+			t.Fatal(err)
+		}
+		for _, pod := range pods {
+			if pod.Status.Phase != "Running" {
+				continue
+			}
+			for _, container := range pod.Spec.Containers {
+				name := container.Name
+				for _, port := range container.Ports {
+					if strings.ToLower(string(port.Protocol)) == "udp" || port.ContainerPort == 0 {
+						continue
+					}
+					candidates++
+					d := kubernetes.NewDialer(kc)
+					c, err := d.Dial(ctx, name, port.ContainerPort)
+					if err != nil {
+						t.Logf("Dial %q/%q/%d: %v", cl.Name, name, port.ContainerPort, err)
+						continue
+					}
+					c.Close()
+					t.Logf("Dialed %q/%q/%d.", cl.Name, name, port.ContainerPort)
+					return
+				}
+			}
+		}
+	}
+	if candidates == 0 {
+		t.Skip("no pods to dial")
+	}
+	t.Errorf("dial failures")
+}