kubernetes: avoid the WatchPod streaming API
Poll every 5 seconds instead.
Change-Id: I0a8a30fee1118b2aecf4b3cbbf342e07afc27602
Reviewed-on: https://go-review.googlesource.com/22820
Reviewed-by: Evan Brown <evanbrown@google.com>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 93c2103..9e77b8c 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -51,12 +51,15 @@
}, nil
}
-// Run creates a new pod resource in the default pod namespace with
-// the given pod API specification.
+// RunLongLivedPod creates a new pod resource in the default pod namespace with
+// the given pod API specification. It assumes the pod runs a
+// long-lived server (i.e. if the container exit quickly quickly, even
+// with success, then that is an error).
+//
// It returns the pod status once it has entered the Running phase.
// An error is returned if the pod can not be created, or if ctx.Done
// is closed.
-func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.Pod, error) {
+func (c *Client) RunLongLivedPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
var podJSON bytes.Buffer
if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
return nil, fmt.Errorf("failed to encode pod in json: %v", err)
@@ -83,25 +86,31 @@
return nil, fmt.Errorf("failed to decode pod resources: %v", err)
}
- retryWait := 1
- retryMax := retryWait << 3 // retry 3 times
for {
- createdPod, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
+ // TODO(bradfitz,evanbrown): pass podResult.ObjectMeta.ResourceVersion to PodStatus?
+ ps, err := c.PodStatus(ctx, podResult.Name)
if err != nil {
- if err == context.Canceled {
- return nil, err
- }
- if retryWait < retryMax { // retry
- time.Sleep(time.Duration(retryWait) * time.Second)
- retryWait = retryWait << 1
- continue
- }
- // The pod did not leave the pending state. We should try to manually delete it before
- // returning an error.
- c.DeletePod(context.Background(), podResult.Name)
- return nil, fmt.Errorf("waiting for pod %q to leave pending state: %v", pod.Name, err)
+ return nil, err
}
- return createdPod, nil
+ switch ps.Phase {
+ case api.PodPending:
+ // The main phase we're waiting on
+ break
+ case api.PodRunning:
+ return ps, nil
+ case api.PodSucceeded, api.PodFailed:
+ return nil, fmt.Errorf("pod entered phase %q", ps.Phase)
+ default:
+ log.Printf("RunLongLivedPod poll loop: pod %q in unexpected phase %q; sleeping", podResult.Name, ps.Phase)
+ }
+ select {
+ case <-time.After(5 * time.Second):
+ case <-ctx.Done():
+ // The pod did not leave the pending
+ // state. Try to clean it up.
+ go c.DeletePod(context.Background(), podResult.Name)
+ return nil, ctx.Err()
+ }
}
}
@@ -157,21 +166,23 @@
return nil
}
-// awaitPodNotPending will return a pod's status in a
+// TODO(bradfitz): WatchPod is unreliable, so this is disabled.
+//
+// AwaitPodNotPending will return a pod's status in a
// podStatusResult when the pod is no longer in the pending
// state.
// The podResourceVersion is required to prevent a pod's entire
// history from being retrieved when the watch is initiated.
// If there is an error polling for the pod's status, or if
// ctx.Done is closed, podStatusResult will contain an error.
-func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) {
+func (c *Client) _AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) {
if podResourceVersion == "" {
return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- podStatusUpdates, err := c.WatchPod(ctx, podName, podResourceVersion)
+ podStatusUpdates, err := c._WatchPod(ctx, podName, podResourceVersion)
if err != nil {
return nil, err
}
@@ -210,6 +221,10 @@
Object api.Pod `json:"object"`
}
+// TODO(bradfitz): WatchPod is unreliable and sometimes hangs forever
+// without closing and sometimes ends prematurely, so this API is
+// disabled.
+//
// WatchPod long-polls the Kubernetes watch API to be notified
// of changes to the specified pod. Changes are sent on the returned
// PodStatusResult channel as they are received.
@@ -219,7 +234,7 @@
// If any error occurs communicating with the Kubernetes API, the
// error will be sent on the returned PodStatusResult channel and
// it will be closed.
-func (c *Client) WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
+func (c *Client) _WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
if podResourceVersion == "" {
return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
}