kubernetes: be more defensive in WatchPod
Change-Id: I0fe9bfb9f0a440a33ff8eedf00940f53b2b7dc87
Reviewed-on: https://go-review.googlesource.com/22789
Reviewed-by: Evan Brown <evanbrown@google.com>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index a25d6fd..93c2103 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -11,6 +11,7 @@
"encoding/json"
"fmt"
"io/ioutil"
+ "log"
"net/http"
"net/url"
"strings"
@@ -226,6 +227,9 @@
go func() {
defer close(statusChan)
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
// Make request to Kubernetes API
getURL := c.endpointURL + defaultWatchPod + "/" + podName
req, err := http.NewRequest("GET", getURL, nil)
@@ -240,7 +244,10 @@
return
}
defer res.Body.Close()
-
+ if res.StatusCode != 200 {
+ statusChan <- PodStatusResult{Err: fmt.Errorf("WatchPod status %v", res.Status)}
+ return
+ }
reader := bufio.NewReader(res.Body)
// bufio.Reader.ReadBytes is blocking, so we watch for
@@ -253,8 +260,22 @@
res.Body.Close()
}()
+ const backupPollDuration = 30 * time.Second
+ backupPoller := time.AfterFunc(backupPollDuration, func() {
+ log.Printf("kubernetes: backup poller in WatchPod checking on %q", podName)
+ st, err := c.PodStatus(ctx, podName)
+ log.Printf("kubernetes: backup poller in WatchPod PodStatus(%q) = %v, %v", podName, st, err)
+ if err != nil {
+ // Some error.
+ cancel()
+ }
+ })
+ defer backupPoller.Stop()
+
for {
line, err := reader.ReadBytes('\n')
+ log.Printf("kubernetes WatchPod status line of %q: %q, %v", podName, line, err)
+ backupPoller.Reset(backupPollDuration)
if err != nil {
statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
return