kubernetes: avoid the WatchPod streaming API

Poll every 5 seconds instead.

Change-Id: I0a8a30fee1118b2aecf4b3cbbf342e07afc27602
Reviewed-on: https://go-review.googlesource.com/22820
Reviewed-by: Evan Brown <evanbrown@google.com>
diff --git a/buildlet/kube.go b/buildlet/kube.go
index 91af442..c539a65 100644
--- a/buildlet/kube.go
+++ b/buildlet/kube.go
@@ -150,15 +150,15 @@
 	}
 
 	condRun(opts.OnPodCreating)
-	newPod, err := kubeClient.RunPod(ctx, pod)
+	podStatus, err := kubeClient.RunLongLivedPod(ctx, pod)
 	if err != nil {
 		return nil, err
 	}
 
 	// The new pod must be in Running phase. Possible phases are described at
 	// http://releases.k8s.io/HEAD/docs/user-guide/pod-states.md#pod-phase
-	if newPod.Status.Phase != api.PodRunning {
-		return nil, fmt.Errorf("pod is in invalid state %q: %v", newPod.Status.Phase, newPod.Status.Message)
+	if podStatus.Phase != api.PodRunning {
+		return nil, fmt.Errorf("pod is in invalid state %q: %v", podStatus.Phase, podStatus.Message)
 	}
 	condRun(opts.OnPodCreated)
 
@@ -166,11 +166,11 @@
 	var buildletURL string
 	var ipPort string
 	if !opts.TLS.IsZero() {
-		buildletURL = "https://" + newPod.Status.PodIP
-		ipPort = newPod.Status.PodIP + ":443"
+		buildletURL = "https://" + podStatus.PodIP
+		ipPort = podStatus.PodIP + ":443"
 	} else {
-		buildletURL = "http://" + newPod.Status.PodIP
-		ipPort = newPod.Status.PodIP + ":80"
+		buildletURL = "http://" + podStatus.PodIP
+		ipPort = podStatus.PodIP + ":80"
 	}
 	condRun(opts.OnGotPodInfo)
 
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index 8bb22e5..ebc78fb 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -539,14 +539,25 @@
 			log.Printf("Error cleaning pods: %v", err)
 			return
 		}
+		var stats struct {
+			Pods          int
+			WithAttr      int
+			WithDelete    int
+			DeletedOld    int // even if failed to delete
+			StillUsed     int
+			DeletedOldGen int // even if failed to delete
+		}
 		for _, pod := range pods {
 			if pod.ObjectMeta.Annotations == nil {
 				// Defensive. Not seen in practice.
 				continue
 			}
+			stats.Pods++
 			sawDeleteAt := false
+			stats.WithAttr++
 			for k, v := range pod.ObjectMeta.Annotations {
 				if k == "delete-at" {
+					stats.WithDelete++
 					sawDeleteAt = true
 					if v == "" {
 						log.Printf("missing delete-at value; ignoring")
@@ -557,10 +568,11 @@
 						log.Printf("invalid delete-at value %q seen; ignoring", v)
 					}
 					if err == nil && time.Now().Unix() > unixDeadline {
+						stats.DeletedOld++
 						log.Printf("Deleting expired pod %q in zone %q ...", pod.Name)
 						err = kubeClient.DeletePod(ctx, pod.Name)
 						if err != nil {
-							log.Printf("problem deleting pod: %v", err)
+							log.Printf("problem deleting old pod %q: %v", pod.Name, err)
 						}
 					}
 				}
@@ -568,14 +580,20 @@
 			// Delete buildlets (things we made) from previous
 			// generations. Only deleting things starting with "buildlet-"
 			// is a historical restriction, but still fine for paranoia.
-			if sawDeleteAt && strings.HasPrefix(pod.Name, "buildlet-") && !p.podUsed(pod.Name) {
-				log.Printf("Deleting pod %q from an earlier coordinator generation ...", pod.Name)
-				err = kubeClient.DeletePod(ctx, pod.Name)
-				if err != nil {
-					log.Printf("problem deleting pod: %v", err)
+			if sawDeleteAt && strings.HasPrefix(pod.Name, "buildlet-") {
+				if p.podUsed(pod.Name) {
+					stats.StillUsed++
+				} else {
+					stats.DeletedOldGen++
+					log.Printf("Deleting pod %q from an earlier coordinator generation ...", pod.Name)
+					err = kubeClient.DeletePod(ctx, pod.Name)
+					if err != nil {
+						log.Printf("problem deleting pod: %v", err)
+					}
 				}
 			}
 		}
+		log.Printf("Kubernetes pod cleanup loop stats: %+v", stats)
 		time.Sleep(time.Minute)
 	}
 }
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 93c2103..9e77b8c 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -51,12 +51,15 @@
 	}, nil
 }
 
-// Run creates a new pod resource in the default pod namespace with
-// the given pod API specification.
+// RunLongLivedPod creates a new pod resource in the default pod namespace with
+// the given pod API specification. It assumes the pod runs a
+// long-lived server (i.e. if the container exit quickly quickly, even
+// with success, then that is an error).
+//
 // It returns the pod status once it has entered the Running phase.
 // An error is returned if the pod can not be created, or if ctx.Done
 // is closed.
-func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.Pod, error) {
+func (c *Client) RunLongLivedPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
 	var podJSON bytes.Buffer
 	if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
 		return nil, fmt.Errorf("failed to encode pod in json: %v", err)
@@ -83,25 +86,31 @@
 		return nil, fmt.Errorf("failed to decode pod resources: %v", err)
 	}
 
-	retryWait := 1
-	retryMax := retryWait << 3 // retry 3 times
 	for {
-		createdPod, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
+		// TODO(bradfitz,evanbrown): pass podResult.ObjectMeta.ResourceVersion to PodStatus?
+		ps, err := c.PodStatus(ctx, podResult.Name)
 		if err != nil {
-			if err == context.Canceled {
-				return nil, err
-			}
-			if retryWait < retryMax { // retry
-				time.Sleep(time.Duration(retryWait) * time.Second)
-				retryWait = retryWait << 1
-				continue
-			}
-			// The pod did not leave the pending state. We should try to manually delete it before
-			// returning an error.
-			c.DeletePod(context.Background(), podResult.Name)
-			return nil, fmt.Errorf("waiting for pod %q to leave pending state: %v", pod.Name, err)
+			return nil, err
 		}
-		return createdPod, nil
+		switch ps.Phase {
+		case api.PodPending:
+			// The main phase we're waiting on
+			break
+		case api.PodRunning:
+			return ps, nil
+		case api.PodSucceeded, api.PodFailed:
+			return nil, fmt.Errorf("pod entered phase %q", ps.Phase)
+		default:
+			log.Printf("RunLongLivedPod poll loop: pod %q in unexpected phase %q; sleeping", podResult.Name, ps.Phase)
+		}
+		select {
+		case <-time.After(5 * time.Second):
+		case <-ctx.Done():
+			// The pod did not leave the pending
+			// state. Try to clean it up.
+			go c.DeletePod(context.Background(), podResult.Name)
+			return nil, ctx.Err()
+		}
 	}
 }
 
@@ -157,21 +166,23 @@
 	return nil
 }
 
-// awaitPodNotPending will return a pod's status in a
+// TODO(bradfitz): WatchPod is unreliable, so this is disabled.
+//
+// AwaitPodNotPending will return a pod's status in a
 // podStatusResult when the pod is no longer in the pending
 // state.
 // The podResourceVersion is required to prevent a pod's entire
 // history from being retrieved when the watch is initiated.
 // If there is an error polling for the pod's status, or if
 // ctx.Done is closed, podStatusResult will contain an error.
-func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) {
+func (c *Client) _AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) {
 	if podResourceVersion == "" {
 		return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
 	}
 	ctx, cancel := context.WithCancel(ctx)
 	defer cancel()
 
-	podStatusUpdates, err := c.WatchPod(ctx, podName, podResourceVersion)
+	podStatusUpdates, err := c._WatchPod(ctx, podName, podResourceVersion)
 	if err != nil {
 		return nil, err
 	}
@@ -210,6 +221,10 @@
 	Object api.Pod `json:"object"`
 }
 
+// TODO(bradfitz): WatchPod is unreliable and sometimes hangs forever
+// without closing and sometimes ends prematurely, so this API is
+// disabled.
+//
 // WatchPod long-polls the Kubernetes watch API to be notified
 // of changes to the specified pod. Changes are sent on the returned
 // PodStatusResult channel as they are received.
@@ -219,7 +234,7 @@
 // If any error occurs communicating with the Kubernetes API, the
 // error will be sent on the returned PodStatusResult channel and
 // it will be closed.
-func (c *Client) WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
+func (c *Client) _WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
 	if podResourceVersion == "" {
 		return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
 	}