kubernetes: fix potential goroutine leak
Change-Id: I2e9343c7441db43fe7004d877ee0409d29cb9a23
Reviewed-on: https://go-review.googlesource.com/21764
Reviewed-by: Evan Brown <evanbrown@google.com>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 16c2c1f..a25d6fd 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -170,16 +170,23 @@
ctx, cancel := context.WithCancel(ctx)
defer cancel()
- podStatusResult, err := c.WatchPod(ctx, podName, podResourceVersion)
+ podStatusUpdates, err := c.WatchPod(ctx, podName, podResourceVersion)
if err != nil {
return nil, err
}
- var psr PodStatusResult
for {
select {
- case psr = <-podStatusResult:
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case psr := <-podStatusUpdates:
if psr.Err != nil {
- return nil, psr.Err
+ // If the context is done, prefer its error:
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ default:
+ return nil, psr.Err
+ }
}
if psr.Pod.Status.Phase != api.PodPending {
return psr.Pod, nil
@@ -188,7 +195,7 @@
}
}
-// PodStatusResult wraps a api.PodStatus and error
+// PodStatusResult wraps an api.PodStatus and error.
type PodStatusResult struct {
Pod *api.Pod
Type string
@@ -215,7 +222,7 @@
if podResourceVersion == "" {
return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
}
- statusChan := make(chan PodStatusResult)
+ statusChan := make(chan PodStatusResult, 1)
go func() {
defer close(statusChan)
@@ -229,15 +236,11 @@
}
res, err := ctxhttp.Do(ctx, c.httpClient, req)
if err != nil {
- if err != context.Canceled {
- statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
- }
- statusChan <- PodStatusResult{Err: err} // context.Canceled
+ statusChan <- PodStatusResult{Err: err}
return
}
defer res.Body.Close()
- var wps watchPodStatus
reader := bufio.NewReader(res.Body)
// bufio.Reader.ReadBytes is blocking, so we watch for
@@ -252,14 +255,11 @@
for {
line, err := reader.ReadBytes('\n')
- if ctx.Err() != nil {
- statusChan <- PodStatusResult{Err: ctx.Err()}
- return
- }
if err != nil {
statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
return
}
+ var wps watchPodStatus
if err := json.Unmarshal(line, &wps); err != nil {
statusChan <- PodStatusResult{Err: fmt.Errorf("failed to decode watch pod status: %v", err)}
return