kubernetes: avoid the WatchPod streaming API Poll every 5 seconds instead. Change-Id: I0a8a30fee1118b2aecf4b3cbbf342e07afc27602 Reviewed-on: https://go-review.googlesource.com/22820 Reviewed-by: Evan Brown <evanbrown@google.com>
diff --git a/buildlet/kube.go b/buildlet/kube.go index 91af442..c539a65 100644 --- a/buildlet/kube.go +++ b/buildlet/kube.go
@@ -150,15 +150,15 @@ } condRun(opts.OnPodCreating) - newPod, err := kubeClient.RunPod(ctx, pod) + podStatus, err := kubeClient.RunLongLivedPod(ctx, pod) if err != nil { return nil, err } // The new pod must be in Running phase. Possible phases are described at // http://releases.k8s.io/HEAD/docs/user-guide/pod-states.md#pod-phase - if newPod.Status.Phase != api.PodRunning { - return nil, fmt.Errorf("pod is in invalid state %q: %v", newPod.Status.Phase, newPod.Status.Message) + if podStatus.Phase != api.PodRunning { + return nil, fmt.Errorf("pod is in invalid state %q: %v", podStatus.Phase, podStatus.Message) } condRun(opts.OnPodCreated) @@ -166,11 +166,11 @@ var buildletURL string var ipPort string if !opts.TLS.IsZero() { - buildletURL = "https://" + newPod.Status.PodIP - ipPort = newPod.Status.PodIP + ":443" + buildletURL = "https://" + podStatus.PodIP + ipPort = podStatus.PodIP + ":443" } else { - buildletURL = "http://" + newPod.Status.PodIP - ipPort = newPod.Status.PodIP + ":80" + buildletURL = "http://" + podStatus.PodIP + ipPort = podStatus.PodIP + ":80" } condRun(opts.OnGotPodInfo)
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go index 8bb22e5..ebc78fb 100644 --- a/cmd/coordinator/kube.go +++ b/cmd/coordinator/kube.go
@@ -539,14 +539,25 @@ log.Printf("Error cleaning pods: %v", err) return } + var stats struct { + Pods int + WithAttr int + WithDelete int + DeletedOld int // even if failed to delete + StillUsed int + DeletedOldGen int // even if failed to delete + } for _, pod := range pods { if pod.ObjectMeta.Annotations == nil { // Defensive. Not seen in practice. continue } + stats.Pods++ sawDeleteAt := false + stats.WithAttr++ for k, v := range pod.ObjectMeta.Annotations { if k == "delete-at" { + stats.WithDelete++ sawDeleteAt = true if v == "" { log.Printf("missing delete-at value; ignoring") @@ -557,10 +568,11 @@ log.Printf("invalid delete-at value %q seen; ignoring", v) } if err == nil && time.Now().Unix() > unixDeadline { + stats.DeletedOld++ log.Printf("Deleting expired pod %q in zone %q ...", pod.Name) err = kubeClient.DeletePod(ctx, pod.Name) if err != nil { - log.Printf("problem deleting pod: %v", err) + log.Printf("problem deleting old pod %q: %v", pod.Name, err) } } } @@ -568,14 +580,20 @@ // Delete buildlets (things we made) from previous // generations. Only deleting things starting with "buildlet-" // is a historical restriction, but still fine for paranoia. - if sawDeleteAt && strings.HasPrefix(pod.Name, "buildlet-") && !p.podUsed(pod.Name) { - log.Printf("Deleting pod %q from an earlier coordinator generation ...", pod.Name) - err = kubeClient.DeletePod(ctx, pod.Name) - if err != nil { - log.Printf("problem deleting pod: %v", err) + if sawDeleteAt && strings.HasPrefix(pod.Name, "buildlet-") { + if p.podUsed(pod.Name) { + stats.StillUsed++ + } else { + stats.DeletedOldGen++ + log.Printf("Deleting pod %q from an earlier coordinator generation ...", pod.Name) + err = kubeClient.DeletePod(ctx, pod.Name) + if err != nil { + log.Printf("problem deleting pod: %v", err) + } } } } + log.Printf("Kubernetes pod cleanup loop stats: %+v", stats) time.Sleep(time.Minute) } }
diff --git a/kubernetes/client.go b/kubernetes/client.go index 93c2103..9e77b8c 100644 --- a/kubernetes/client.go +++ b/kubernetes/client.go
@@ -51,12 +51,15 @@ }, nil } -// Run creates a new pod resource in the default pod namespace with -// the given pod API specification. +// RunLongLivedPod creates a new pod resource in the default pod namespace with +// the given pod API specification. It assumes the pod runs a +// long-lived server (i.e. if the container exit quickly quickly, even +// with success, then that is an error). +// // It returns the pod status once it has entered the Running phase. // An error is returned if the pod can not be created, or if ctx.Done // is closed. -func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.Pod, error) { +func (c *Client) RunLongLivedPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) { var podJSON bytes.Buffer if err := json.NewEncoder(&podJSON).Encode(pod); err != nil { return nil, fmt.Errorf("failed to encode pod in json: %v", err) @@ -83,25 +86,31 @@ return nil, fmt.Errorf("failed to decode pod resources: %v", err) } - retryWait := 1 - retryMax := retryWait << 3 // retry 3 times for { - createdPod, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion) + // TODO(bradfitz,evanbrown): pass podResult.ObjectMeta.ResourceVersion to PodStatus? + ps, err := c.PodStatus(ctx, podResult.Name) if err != nil { - if err == context.Canceled { - return nil, err - } - if retryWait < retryMax { // retry - time.Sleep(time.Duration(retryWait) * time.Second) - retryWait = retryWait << 1 - continue - } - // The pod did not leave the pending state. We should try to manually delete it before - // returning an error. - c.DeletePod(context.Background(), podResult.Name) - return nil, fmt.Errorf("waiting for pod %q to leave pending state: %v", pod.Name, err) + return nil, err } - return createdPod, nil + switch ps.Phase { + case api.PodPending: + // The main phase we're waiting on + break + case api.PodRunning: + return ps, nil + case api.PodSucceeded, api.PodFailed: + return nil, fmt.Errorf("pod entered phase %q", ps.Phase) + default: + log.Printf("RunLongLivedPod poll loop: pod %q in unexpected phase %q; sleeping", podResult.Name, ps.Phase) + } + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + // The pod did not leave the pending + // state. Try to clean it up. + go c.DeletePod(context.Background(), podResult.Name) + return nil, ctx.Err() + } } } @@ -157,21 +166,23 @@ return nil } -// awaitPodNotPending will return a pod's status in a +// TODO(bradfitz): WatchPod is unreliable, so this is disabled. +// +// AwaitPodNotPending will return a pod's status in a // podStatusResult when the pod is no longer in the pending // state. // The podResourceVersion is required to prevent a pod's entire // history from being retrieved when the watch is initiated. // If there is an error polling for the pod's status, or if // ctx.Done is closed, podStatusResult will contain an error. -func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) { +func (c *Client) _AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.Pod, error) { if podResourceVersion == "" { return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName) } ctx, cancel := context.WithCancel(ctx) defer cancel() - podStatusUpdates, err := c.WatchPod(ctx, podName, podResourceVersion) + podStatusUpdates, err := c._WatchPod(ctx, podName, podResourceVersion) if err != nil { return nil, err } @@ -210,6 +221,10 @@ Object api.Pod `json:"object"` } +// TODO(bradfitz): WatchPod is unreliable and sometimes hangs forever +// without closing and sometimes ends prematurely, so this API is +// disabled. +// // WatchPod long-polls the Kubernetes watch API to be notified // of changes to the specified pod. Changes are sent on the returned // PodStatusResult channel as they are received. @@ -219,7 +234,7 @@ // If any error occurs communicating with the Kubernetes API, the // error will be sent on the returned PodStatusResult channel and // it will be closed. -func (c *Client) WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) { +func (c *Client) _WatchPod(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) { if podResourceVersion == "" { return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName) }