kubernetes: avoid the WatchPod streaming API

Poll every 5 seconds instead.

Change-Id: I0a8a30fee1118b2aecf4b3cbbf342e07afc27602
Reviewed-on: https://go-review.googlesource.com/22820
Reviewed-by: Evan Brown <evanbrown@google.com>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 93c2103..9e77b8c 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -51,12 +51,15 @@
 	}, nil
 }
 
-// Run creates a new pod resource in the default pod namespace with
-// the given pod API specification.
+// RunLongLivedPod creates a new pod resource in the default pod namespace with
+// the given pod API specification. It assumes the pod runs a
+// long-lived server (i.e. if the container exit quickly quickly, even
+// with success, then that is an error).
+//
 // It returns the pod status once it has entered the Running phase.
 // An error is returned if the pod can not be created, or if ctx.Done
 // is closed.
-func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.Pod, error) {
+func (c *Client) RunLongLivedPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
 	var podJSON bytes.Buffer
 	if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
 		return nil, fmt.Errorf("failed to encode pod in json: %v", err)
@@ -83,25 +86,31 @@
 		return nil, fmt.Errorf("failed to decode pod resources: %v", err)
 	}
 
-	retryWait := 1
-	retryMax := retryWait << 3 // retry 3 times
 	for {
-		createdPod, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
+		// TODO(bradfitz,evanbrown): pass podResult.ObjectMeta.ResourceVersion to PodStatus?
+		ps, err := c.PodStatus(ctx, podResult.Name)
 		if err != nil {
-			if err == context.Canceled {
-				return nil, err
-			}
-			if retryWait < retryMax { // retry
-				time.Sleep(time.Duration(retryWait) * time.Second)
-				retryWait = retryWait << 1
-				continue
-			}
-			// The pod did not leave the pending state. We should try to manually delete it before
-			// returning an error.
-			c.DeletePod(context.Background(), podResult.Name)
-			return nil, fmt.Errorf("waiting for pod %q to leave pending state: %v", pod.Name, err)
+			return nil, err
 		}
-		return createdPod, nil
+		switch ps.Phase {
+		case api.PodPending:
+			// The main phase we're waiting on
+			break
+		case api.PodRunning:
+			return ps, nil
+		case api.PodSucceeded, api.PodFailed:
+			return nil, fmt.Errorf("pod entered phase %q", ps.Phase)
+		default:
+			log.Printf("RunLongLivedPod poll loop: pod %q in unexpected phase %q; sleeping", podResult.Name, ps.Phase)
+		}
+		select {
+		case <-time.After(5 * time.Second):
+		case <-ctx.Done():
+			// The pod did not leave the pending
+			// state. Try to clean it up.
+			go c.DeletePod(context.Background(), podResult.Name)
+			return nil, ctx.Err()
+		}
 	}
 }
 
@@ -157,21 +166,23 @@
 	return nil
 }
 
-// awaitPodNotPending will return a pod's status in a
+// TODO(bradfitz): WatchPod is unreliable, so this is disabled.
+//
+// AwaitPodNotPending will return a pod's status in a
 // podStatusResult when the pod is no longer in the pending
 // state.
 // The podResourceVersion is required to prevent a pod's entire
 // history from being retrieved when the watch is initiated.
 // If there is an error polling for the pod's status, or if
 // ctx.Done is closed, podStatusResult will contain an error.
-func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) {
+func (c *Client) _AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) {
 	if podResourceVersion == "" {
 		return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
 	}
 	ctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 
-	podStatusUpdates, err := c.WatchPod(ctx, podName, podResourceVersion)
+	podStatusUpdates, err := c._WatchPod(ctx, podName, podResourceVersion)
 	if err != nil {
 		return nil, err
 	}
@@ -210,6 +221,10 @@
 	Object api.Pod `json:"object"`
 }
 
+// TODO(bradfitz): WatchPod is unreliable and sometimes hangs forever
+// without closing and sometimes ends prematurely, so this API is
+// disabled.
+//
 // WatchPod long-polls the Kubernetes watch API to be notified
 // of changes to the specified pod. Changes are sent on the returned
 // PodStatusResult channel as they are received.
@@ -219,7 +234,7 @@
 // If any error occurs communicating with the Kubernetes API, the
 // error will be sent on the returned PodStatusResult channel and
 // it will be closed.
-func (c *Client) WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
+func (c *Client) _WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
 	if podResourceVersion == "" {
 		return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
 	}