kubernetes: be more defensive in WatchPod

Change-Id: I0fe9bfb9f0a440a33ff8eedf00940f53b2b7dc87
Reviewed-on: https://go-review.googlesource.com/22789
Reviewed-by: Evan Brown <evanbrown@google.com>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index a25d6fd..93c2103 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -11,6 +11,7 @@
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
+	"log"
 	"net/http"
 	"net/url"
 	"strings"
@@ -226,6 +227,9 @@
 
 	go func() {
 		defer close(statusChan)
+		ctx, cancel := context.WithCancel(ctx)
+		defer cancel()
+
 		// Make request to Kubernetes API
 		getURL := c.endpointURL + defaultWatchPod + "/" + podName
 		req, err := http.NewRequest("GET", getURL, nil)
@@ -240,7 +244,10 @@
 			return
 		}
 		defer res.Body.Close()
-
+		if res.StatusCode != 200 {
+			statusChan <- PodStatusResult{Err: fmt.Errorf("WatchPod status %v", res.Status)}
+			return
+		}
 		reader := bufio.NewReader(res.Body)
 
 		// bufio.Reader.ReadBytes is blocking, so we watch for
@@ -253,8 +260,22 @@
 			res.Body.Close()
 		}()
 
+		const backupPollDuration = 30 * time.Second
+		backupPoller := time.AfterFunc(backupPollDuration, func() {
+			log.Printf("kubernetes: backup poller in WatchPod checking on %q", podName)
+			st, err := c.PodStatus(ctx, podName)
+			log.Printf("kubernetes: backup poller in WatchPod PodStatus(%q) = %v, %v", podName, st, err)
+			if err != nil {
+				// Some error.
+				cancel()
+			}
+		})
+		defer backupPoller.Stop()
+
 		for {
 			line, err := reader.ReadBytes('\n')
+			log.Printf("kubernetes WatchPod status line of %q: %q, %v", podName, line, err)
+			backupPoller.Reset(backupPollDuration)
 			if err != nil {
 				statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
 				return