all: display pool status and delete failed/old pods

* status page shows kube pool details
* pods created by the coorindator are tracked
* pods that fail to create are deleted
* pods older than delete-at are deleted
* pods created by a different coordinator are deleted

Updates golang/go#12546

Change-Id: I4c4f8ff906962b4a014a66d0a9d490ff17710d62
Reviewed-on: https://go-review.googlesource.com/16101
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 2ee425a..1acc98e 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -11,6 +11,7 @@
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
+	"log"
 	"net/http"
 	"net/url"
 	"strings"
@@ -26,6 +27,7 @@
 	APIEndpoint     = "/api/v1"
 	defaultPod      = "/namespaces/default/pods"
 	defaultWatchPod = "/watch/namespaces/default/pods"
+	nodes           = "/nodes"
 )
 
 // Client is a client for the Kubernetes master.
@@ -55,7 +57,7 @@
 // An error is returned if the pod can not be created, if it does
 // does not enter the running phase within 2 minutes, or if ctx.Done
 // is closed.
-func (c *Client) Run(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
+func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
 	var podJSON bytes.Buffer
 	if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
 		return nil, fmt.Errorf("failed to encode pod in json: %v", err)
@@ -86,11 +88,67 @@
 
 	status, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
 	if err != nil {
+		log.Printf("Timed out waiting for pod to leave pending state. Pod will be deleted.")
+		// The pod did not leave the pending state. We should try to manually delete it before
+		// returning an error.
+		c.DeletePod(context.Background(), podResult.Name)
 		return nil, err
 	}
 	return status, nil
 }
 
+// GetPods returns all pods in the cluster, regardless of status.
+func (c *Client) GetPods(ctx context.Context) ([]api.Pod, error) {
+	getURL := c.endpointURL + defaultPod
+
+	// Make request to Kubernetes API
+	req, err := http.NewRequest("GET", getURL, nil)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
+	}
+	res, err := ctxhttp.Do(ctx, c.httpClient, req)
+	if err != nil {
+		return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
+	}
+
+	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
+	if err != nil {
+		return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
+	}
+	if res.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
+	}
+
+	var podList api.PodList
+	if err := json.Unmarshal(body, &podList); err != nil {
+		return nil, fmt.Errorf("failed to decode list of pod resources: %v", err)
+	}
+	return podList.Items, nil
+}
+
+// PodDelete deletes the specified Kubernetes pod.
+func (c *Client) DeletePod(ctx context.Context, podName string) error {
+	url := c.endpointURL + defaultPod + "/" + podName
+	req, err := http.NewRequest("DELETE", url, nil)
+	if err != nil {
+		return fmt.Errorf("failed to create request: DELETE %q : %v", url, err)
+	}
+	res, err := ctxhttp.Do(ctx, c.httpClient, req)
+	if err != nil {
+		return fmt.Errorf("failed to make request: DELETE %q: %v", url, err)
+	}
+	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
+	if err != nil {
+		return fmt.Errorf("failed to read response body: DELETE %q: %v, url, err")
+	}
+	if res.StatusCode != http.StatusOK {
+		return fmt.Errorf("http error: %d DELETE %q: %q: %v", res.StatusCode, url, string(body), err)
+	}
+	return nil
+}
+
 // awaitPodNotPending will return a pod's status in a
 // podStatusResult when the pod is no longer in the pending
 // state.
@@ -162,6 +220,7 @@
 			return
 		}
 		res, err := ctxhttp.Do(ctx, c.httpClient, req)
+		defer res.Body.Close()
 		if err != nil {
 			statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
 			return
@@ -169,8 +228,23 @@
 
 		var wps watchPodStatus
 		reader := bufio.NewReader(res.Body)
+
+		// bufio.Reader.ReadBytes is blocking, so we watch for
+		// context timeout or cancellation in a goroutine
+		// and close the response body when see see it. The
+		// response body is also closed via defer when the
+		// request is made, but closing twice is OK.
+		go func() {
+			<-ctx.Done()
+			res.Body.Close()
+		}()
+
 		for {
 			line, err := reader.ReadBytes('\n')
+			if ctx.Err() != nil {
+				statusChan <- PodStatusResult{Err: ctx.Err()}
+				return
+			}
 			if err != nil {
 				statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
 				return
@@ -230,9 +304,38 @@
 		return "", fmt.Errorf("failed to make request: GET %q: %v", url, err)
 	}
 	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
 	if err != nil {
 		return "", fmt.Errorf("failed to read response body: GET %q: %v", url, err)
 	}
-	res.Body.Close()
+	if res.StatusCode != http.StatusOK {
+		return "", fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
+	}
 	return string(body), nil
 }
+
+// PodNodes returns the list of nodes that comprise the Kubernetes cluster
+func (c *Client) GetNodes(ctx context.Context) ([]api.Node, error) {
+	url := c.endpointURL + nodes
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create request: GET %q : %v", url, err)
+	}
+	res, err := ctxhttp.Do(ctx, c.httpClient, req)
+	if err != nil {
+		return nil, fmt.Errorf("failed to make request: GET %q: %v", url, err)
+	}
+	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
+	if err != nil {
+		return nil, fmt.Errorf("failed to read response body: GET %q: %v, url, err")
+	}
+	if res.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
+	}
+	var nodeList *api.NodeList
+	if err := json.Unmarshal(body, &nodeList); err != nil {
+		return nil, fmt.Errorf("failed to decode node list: %v", err)
+	}
+	return nodeList.Items, nil
+}