all: improved monitoring of buildlet pods after creation

* Replaced cancel with context.Context
* StartPod can be canceled
* Wait for buildlet to come online, but fail fast if pod fails first
* Support timeout waiting for pod to leave pending phase
* Use Kubernetes watch API (long poll)

Updates golang/go#12546

Change-Id: I792a3b8fed615362a0290feee7de0c2cefe43c0e
Reviewed-on: https://go-review.googlesource.com/15285
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index b44b650..9af7ed7 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -6,6 +6,7 @@
 package kubernetes
 
 import (
+	"bufio"
 	"bytes"
 	"encoding/json"
 	"fmt"
@@ -16,12 +17,15 @@
 	"time"
 
 	"golang.org/x/build/kubernetes/api"
+	"golang.org/x/net/context"
+	"golang.org/x/net/context/ctxhttp"
 )
 
 const (
 	// APIEndpoint defines the base path for kubernetes API resources.
-	APIEndpoint  = "/api/v1"
-	defaultPodNS = "/namespaces/default/pods"
+	APIEndpoint     = "/api/v1"
+	defaultPod      = "/namespaces/default/pods"
+	defaultWatchPod = "/watch/namespaces/default/pods"
 )
 
 // Client is a client for the Kubernetes master.
@@ -45,20 +49,23 @@
 	}, nil
 }
 
-// RunPod create a new pod resource in the default pod namespace with
+// Run creates a new pod resource in the default pod namespace with
 // the given pod API specification.
-// It returns the pod status once it is not pending anymore.
-func (c *Client) Run(pod *api.Pod) (*api.PodStatus, error) {
+// It returns the pod status once it has entered the Running phase.
+// An error is returned if the pod can not be created, if it does
+// does not enter the running phase within 2 minutes, or if ctx.Done
+// is closed.
+func (c *Client) Run(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
 	var podJSON bytes.Buffer
 	if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
 		return nil, fmt.Errorf("failed to encode pod in json: %v", err)
 	}
-	postURL := c.endpointURL + defaultPodNS
-	r, err := http.NewRequest("POST", postURL, &podJSON)
+	postURL := c.endpointURL + defaultPod
+	req, err := http.NewRequest("POST", postURL, &podJSON)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create request: POST %q : %v", postURL, err)
 	}
-	res, err := c.httpClient.Do(r)
+	res, err := ctxhttp.Do(ctx, c.httpClient, req)
 	if err != nil {
 		return nil, fmt.Errorf("failed to make request: POST %q: %v", postURL, err)
 	}
@@ -74,30 +81,158 @@
 	if err := json.Unmarshal(body, &podResult); err != nil {
 		return nil, fmt.Errorf("failed to decode pod resources: %v", err)
 	}
-	for podResult.Status.Phase == "Pending" {
-		getURL := c.endpointURL + defaultPodNS + "/" + pod.Name
-		r, err := http.NewRequest("GET", getURL, nil)
-		if err != nil {
-			return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
-		}
-		res, err := c.httpClient.Do(r)
-		if err != nil {
-			return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
-		}
-		body, err := ioutil.ReadAll(res.Body)
-		res.Body.Close()
-		if err != nil {
-			return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
-		}
-		if res.StatusCode != http.StatusOK {
-			return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
-		}
-		if err := json.Unmarshal(body, &podResult); err != nil {
-			return nil, fmt.Errorf("failed to decode pod resources: %v", err)
-		}
-		time.Sleep(1 * time.Second)
-		// TODO(proppy): add a Cancel type to this func later
-		// so this can select on it.
+	ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
+	defer cancel()
+
+	status, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
+	if err != nil {
+		return nil, err
 	}
-	return &podResult.Status, nil
+	return status, nil
+}
+
+// awaitPodNotPending will return a pod's status in a
+// podStatusResult when the pod is no longer in the pending
+// state.
+// The podResourceVersion is required to prevent a pod's entire
+// history from being retrieved when the watch is initiated.
+// If there is an error polling for the pod's status, or if
+// ctx.Done is closed, podStatusResult will contain an error.
+func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.PodStatus, error) {
+	if podResourceVersion == "" {
+		return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
+	}
+	ctx, cancel := context.WithCancel(ctx)
+	defer cancel()
+
+	podStatusResult, err := c.WatchPodStatus(ctx, podName, podResourceVersion)
+	if err != nil {
+		return nil, err
+	}
+	var psr PodStatusResult
+	for {
+		select {
+		case psr = <-podStatusResult:
+			if psr.Err != nil {
+				return nil, psr.Err
+			}
+			if psr.Status.Phase != api.PodPending {
+				return psr.Status, nil
+			}
+		}
+	}
+}
+
+// PodStatusResult wraps a api.PodStatus and error
+type PodStatusResult struct {
+	Status *api.PodStatus
+	Err    error
+}
+
+type watchPodStatus struct {
+	// The type of watch update contained in the message
+	Type string `json:"type"`
+	// Pod details
+	Object api.Pod `json:"object"`
+}
+
+// WatchPodStatus long-polls the Kubernetes watch API to be notified
+// of changes to the specified pod. Changes are sent on the returned
+// PodStatusResult channel as they are received.
+// The podResourceVersion is required to prevent a pod's entire
+// history from being retrieved when the watch is initiated.
+// The provided context must be canceled or timed out to stop the watch.
+// If any error occurs communicating with the Kubernetes API, the
+// error will be sent on the returned PodStatusResult channel and
+// it will be closed.
+func (c *Client) WatchPodStatus(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
+	if podResourceVersion == "" {
+		return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
+	}
+	statusChan := make(chan PodStatusResult)
+
+	go func() {
+		defer close(statusChan)
+		// Make request to Kubernetes API
+		getURL := c.endpointURL + defaultWatchPod + "/" + podName
+		req, err := http.NewRequest("GET", getURL, nil)
+		req.URL.Query().Add("resourceVersion", podResourceVersion)
+		if err != nil {
+			statusChan <- PodStatusResult{Err: fmt.Errorf("failed to create request: GET %q : %v", getURL, err)}
+			return
+		}
+		res, err := ctxhttp.Do(ctx, c.httpClient, req)
+		if err != nil {
+			statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
+			return
+		}
+
+		var wps watchPodStatus
+		reader := bufio.NewReader(res.Body)
+		for {
+			line, err := reader.ReadBytes('\n')
+			if err != nil {
+				statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
+				return
+			}
+			if err := json.Unmarshal(line, &wps); err != nil {
+				statusChan <- PodStatusResult{Err: fmt.Errorf("failed to decode watch pod status: %v", err)}
+				return
+			}
+			statusChan <- PodStatusResult{Status: &wps.Object.Status}
+		}
+	}()
+	return statusChan, nil
+}
+
+// Retrieve the status of a pod synchronously from the Kube
+// API server.
+func (c *Client) PodStatus(ctx context.Context, podName string) (*api.PodStatus, error) {
+	getURL := c.endpointURL + defaultPod + "/" + podName
+
+	// Make request to Kubernetes API
+	req, err := http.NewRequest("GET", getURL, nil)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
+	}
+	res, err := ctxhttp.Do(ctx, c.httpClient, req)
+	if err != nil {
+		return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
+	}
+
+	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
+	if err != nil {
+		return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
+	}
+	if res.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
+	}
+
+	var pod *api.Pod
+	if err := json.Unmarshal(body, &pod); err != nil {
+		return nil, fmt.Errorf("failed to decode pod resources: %v", err)
+	}
+	return &pod.Status, nil
+}
+
+// PodLog retrieves the container log for the first container
+// in the pod.
+func (c *Client) PodLog(ctx context.Context, podName string) (string, error) {
+	// TODO(evanbrown): support multiple containers
+	url := c.endpointURL + defaultPod + "/" + podName + "/log"
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return "", fmt.Errorf("failed to create request: GET %q : %v", url, err)
+	}
+	res, err := ctxhttp.Do(ctx, c.httpClient, req)
+	if err != nil {
+		return "", fmt.Errorf("failed to make request: GET %q: %v", url, err)
+	}
+	body, err := ioutil.ReadAll(res.Body)
+	if err != nil {
+		return "", fmt.Errorf("failed to read response body: GET %q: %v, url, err")
+	}
+	res.Body.Close()
+	return string(body), nil
 }