all: display pool status and delete failed/old pods
* status page shows kube pool details
* pods created by the coorindator are tracked
* pods that fail to create are deleted
* pods older than delete-at are deleted
* pods created by a different coordinator are deleted
Updates golang/go#12546
Change-Id: I4c4f8ff906962b4a014a66d0a9d490ff17710d62
Reviewed-by: Brad Fitzpatrick <>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 2ee425a..1acc98e 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -11,6 +11,7 @@
+ "log"
@@ -26,6 +27,7 @@
APIEndpoint = "/api/v1"
defaultPod = "/namespaces/default/pods"
defaultWatchPod = "/watch/namespaces/default/pods"
+ nodes = "/nodes"
// Client is a client for the Kubernetes master.
@@ -55,7 +57,7 @@
// An error is returned if the pod can not be created, if it does
// does not enter the running phase within 2 minutes, or if ctx.Done
// is closed.
-func (c *Client) Run(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
+func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
var podJSON bytes.Buffer
if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
return nil, fmt.Errorf("failed to encode pod in json: %v", err)
@@ -86,11 +88,67 @@
status, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
if err != nil {
+ log.Printf("Timed out waiting for pod to leave pending state. Pod will be deleted.")
+ // The pod did not leave the pending state. We should try to manually delete it before
+ // returning an error.
+ c.DeletePod(context.Background(), podResult.Name)
return nil, err
return status, nil
+// GetPods returns all pods in the cluster, regardless of status.
+func (c *Client) GetPods(ctx context.Context) ([]api.Pod, error) {
+ getURL := c.endpointURL + defaultPod
+ // Make request to Kubernetes API
+ req, err := http.NewRequest("GET", getURL, nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
+ }
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
+ }
+ body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ if err != nil {
+ return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
+ }
+ if res.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
+ }
+ var podList api.PodList
+ if err := json.Unmarshal(body, &podList); err != nil {
+ return nil, fmt.Errorf("failed to decode list of pod resources: %v", err)
+ }
+ return podList.Items, nil
+// PodDelete deletes the specified Kubernetes pod.
+func (c *Client) DeletePod(ctx context.Context, podName string) error {
+ url := c.endpointURL + defaultPod + "/" + podName
+ req, err := http.NewRequest("DELETE", url, nil)
+ if err != nil {
+ return fmt.Errorf("failed to create request: DELETE %q : %v", url, err)
+ }
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ if err != nil {
+ return fmt.Errorf("failed to make request: DELETE %q: %v", url, err)
+ }
+ body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ if err != nil {
+ return fmt.Errorf("failed to read response body: DELETE %q: %v, url, err")
+ }
+ if res.StatusCode != http.StatusOK {
+ return fmt.Errorf("http error: %d DELETE %q: %q: %v", res.StatusCode, url, string(body), err)
+ }
+ return nil
// awaitPodNotPending will return a pod's status in a
// podStatusResult when the pod is no longer in the pending
// state.
@@ -162,6 +220,7 @@
res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ defer res.Body.Close()
if err != nil {
statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
@@ -169,8 +228,23 @@
var wps watchPodStatus
reader := bufio.NewReader(res.Body)
+ // bufio.Reader.ReadBytes is blocking, so we watch for
+ // context timeout or cancellation in a goroutine
+ // and close the response body when see see it. The
+ // response body is also closed via defer when the
+ // request is made, but closing twice is OK.
+ go func() {
+ <-ctx.Done()
+ res.Body.Close()
+ }()
for {
line, err := reader.ReadBytes('\n')
+ if ctx.Err() != nil {
+ statusChan <- PodStatusResult{Err: ctx.Err()}
+ return
+ }
if err != nil {
statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
@@ -230,9 +304,38 @@
return "", fmt.Errorf("failed to make request: GET %q: %v", url, err)
body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
if err != nil {
return "", fmt.Errorf("failed to read response body: GET %q: %v", url, err)
- res.Body.Close()
+ if res.StatusCode != http.StatusOK {
+ return "", fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
+ }
return string(body), nil
+// PodNodes returns the list of nodes that comprise the Kubernetes cluster
+func (c *Client) GetNodes(ctx context.Context) ([]api.Node, error) {
+ url := c.endpointURL + nodes
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create request: GET %q : %v", url, err)
+ }
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to make request: GET %q: %v", url, err)
+ }
+ body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ if err != nil {
+ return nil, fmt.Errorf("failed to read response body: GET %q: %v, url, err")
+ }
+ if res.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
+ }
+ var nodeList *api.NodeList
+ if err := json.Unmarshal(body, &nodeList); err != nil {
+ return nil, fmt.Errorf("failed to decode node list: %v", err)
+ }
+ return nodeList.Items, nil