kubernetes: fix potential goroutine leak

Change-Id: I2e9343c7441db43fe7004d877ee0409d29cb9a23
Reviewed-on: https://go-review.googlesource.com/21764
Reviewed-by: Evan Brown <evanbrown@google.com>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 16c2c1f..a25d6fd 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -170,16 +170,23 @@
 	ctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 
-	podStatusResult, err := c.WatchPod(ctx, podName, podResourceVersion)
+	podStatusUpdates, err := c.WatchPod(ctx, podName, podResourceVersion)
 	if err != nil {
 		return nil, err
 	}
-	var psr PodStatusResult
 	for {
 		select {
-		case psr = <-podStatusResult:
+		case <-ctx.Done():
+			return nil, ctx.Err()
+		case psr := <-podStatusUpdates:
 			if psr.Err != nil {
-				return nil, psr.Err
+				// If the context is done, prefer its error:
+				select {
+				case <-ctx.Done():
+					return nil, ctx.Err()
+				default:
+					return nil, psr.Err
+				}
 			}
 			if psr.Pod.Status.Phase != api.PodPending {
 				return psr.Pod, nil
@@ -188,7 +195,7 @@
 	}
 }
 
-// PodStatusResult wraps a api.PodStatus and error
+// PodStatusResult wraps an api.PodStatus and error.
 type PodStatusResult struct {
 	Pod  *api.Pod
 	Type string
@@ -215,7 +222,7 @@
 	if podResourceVersion == "" {
 		return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
 	}
-	statusChan := make(chan PodStatusResult)
+	statusChan := make(chan PodStatusResult, 1)
 
 	go func() {
 		defer close(statusChan)
@@ -229,15 +236,11 @@
 		}
 		res, err := ctxhttp.Do(ctx, c.httpClient, req)
 		if err != nil {
-			if err != context.Canceled {
-				statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
-			}
-			statusChan <- PodStatusResult{Err: err} // context.Canceled
+			statusChan <- PodStatusResult{Err: err}
 			return
 		}
 		defer res.Body.Close()
 
-		var wps watchPodStatus
 		reader := bufio.NewReader(res.Body)
 
 		// bufio.Reader.ReadBytes is blocking, so we watch for
@@ -252,14 +255,11 @@
 
 		for {
 			line, err := reader.ReadBytes('\n')
-			if ctx.Err() != nil {
-				statusChan <- PodStatusResult{Err: ctx.Err()}
-				return
-			}
 			if err != nil {
 				statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
 				return
 			}
+			var wps watchPodStatus
 			if err := json.Unmarshal(line, &wps); err != nil {
 				statusChan <- PodStatusResult{Err: fmt.Errorf("failed to decode watch pod status: %v", err)}
 				return