all: improved monitoring of buildlet pods after creation
* Replaced cancel with context.Context
* StartPod can be canceled
* Wait for buildlet to come online, but fail fast if pod fails first
* Support timeout waiting for pod to leave pending phase
* Use Kubernetes watch API (long poll)
Updates golang/go#12546
Change-Id: I792a3b8fed615362a0290feee7de0c2cefe43c0e
Reviewed-on: https://go-review.googlesource.com/15285
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index b44b650..9af7ed7 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -6,6 +6,7 @@
package kubernetes
import (
+ "bufio"
"bytes"
"encoding/json"
"fmt"
@@ -16,12 +17,15 @@
"time"
"golang.org/x/build/kubernetes/api"
+ "golang.org/x/net/context"
+ "golang.org/x/net/context/ctxhttp"
)
const (
// APIEndpoint defines the base path for kubernetes API resources.
- APIEndpoint = "/api/v1"
- defaultPodNS = "/namespaces/default/pods"
+ APIEndpoint = "/api/v1"
+ defaultPod = "/namespaces/default/pods"
+ defaultWatchPod = "/watch/namespaces/default/pods"
)
// Client is a client for the Kubernetes master.
@@ -45,20 +49,23 @@
}, nil
}
-// RunPod create a new pod resource in the default pod namespace with
+// Run creates a new pod resource in the default pod namespace with
// the given pod API specification.
-// It returns the pod status once it is not pending anymore.
-func (c *Client) Run(pod *api.Pod) (*api.PodStatus, error) {
+// It returns the pod status once it has entered the Running phase.
+// An error is returned if the pod can not be created, if it does
+// does not enter the running phase within 2 minutes, or if ctx.Done
+// is closed.
+func (c *Client) Run(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
var podJSON bytes.Buffer
if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
return nil, fmt.Errorf("failed to encode pod in json: %v", err)
}
- postURL := c.endpointURL + defaultPodNS
- r, err := http.NewRequest("POST", postURL, &podJSON)
+ postURL := c.endpointURL + defaultPod
+ req, err := http.NewRequest("POST", postURL, &podJSON)
if err != nil {
return nil, fmt.Errorf("failed to create request: POST %q : %v", postURL, err)
}
- res, err := c.httpClient.Do(r)
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
if err != nil {
return nil, fmt.Errorf("failed to make request: POST %q: %v", postURL, err)
}
@@ -74,30 +81,158 @@
if err := json.Unmarshal(body, &podResult); err != nil {
return nil, fmt.Errorf("failed to decode pod resources: %v", err)
}
- for podResult.Status.Phase == "Pending" {
- getURL := c.endpointURL + defaultPodNS + "/" + pod.Name
- r, err := http.NewRequest("GET", getURL, nil)
- if err != nil {
- return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
- }
- res, err := c.httpClient.Do(r)
- if err != nil {
- return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
- }
- body, err := ioutil.ReadAll(res.Body)
- res.Body.Close()
- if err != nil {
- return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
- }
- if res.StatusCode != http.StatusOK {
- return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
- }
- if err := json.Unmarshal(body, &podResult); err != nil {
- return nil, fmt.Errorf("failed to decode pod resources: %v", err)
- }
- time.Sleep(1 * time.Second)
- // TODO(proppy): add a Cancel type to this func later
- // so this can select on it.
+ ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
+ defer cancel()
+
+ status, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
+ if err != nil {
+ return nil, err
}
- return &podResult.Status, nil
+ return status, nil
+}
+
+// awaitPodNotPending will return a pod's status in a
+// podStatusResult when the pod is no longer in the pending
+// state.
+// The podResourceVersion is required to prevent a pod's entire
+// history from being retrieved when the watch is initiated.
+// If there is an error polling for the pod's status, or if
+// ctx.Done is closed, podStatusResult will contain an error.
+func (c *Client) AwaitPodNotPending(ctx context.Context, podName, podResourceVersion string) (*api.PodStatus, error) {
+ if podResourceVersion == "" {
+ return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
+ }
+ ctx, cancel := context.WithCancel(ctx)
+ defer cancel()
+
+ podStatusResult, err := c.WatchPodStatus(ctx, podName, podResourceVersion)
+ if err != nil {
+ return nil, err
+ }
+ var psr PodStatusResult
+ for {
+ select {
+ case psr = <-podStatusResult:
+ if psr.Err != nil {
+ return nil, psr.Err
+ }
+ if psr.Status.Phase != api.PodPending {
+ return psr.Status, nil
+ }
+ }
+ }
+}
+
+// PodStatusResult wraps a api.PodStatus and error
+type PodStatusResult struct {
+ Status *api.PodStatus
+ Err error
+}
+
+type watchPodStatus struct {
+ // The type of watch update contained in the message
+ Type string `json:"type"`
+ // Pod details
+ Object api.Pod `json:"object"`
+}
+
+// WatchPodStatus long-polls the Kubernetes watch API to be notified
+// of changes to the specified pod. Changes are sent on the returned
+// PodStatusResult channel as they are received.
+// The podResourceVersion is required to prevent a pod's entire
+// history from being retrieved when the watch is initiated.
+// The provided context must be canceled or timed out to stop the watch.
+// If any error occurs communicating with the Kubernetes API, the
+// error will be sent on the returned PodStatusResult channel and
+// it will be closed.
+func (c *Client) WatchPodStatus(ctx context.Context, podName, podResourceVersion string) (<-chan PodStatusResult, error) {
+ if podResourceVersion == "" {
+ return nil, fmt.Errorf("resourceVersion for pod %v must be provided", podName)
+ }
+ statusChan := make(chan PodStatusResult)
+
+ go func() {
+ defer close(statusChan)
+ // Make request to Kubernetes API
+ getURL := c.endpointURL + defaultWatchPod + "/" + podName
+ req, err := http.NewRequest("GET", getURL, nil)
+ req.URL.Query().Add("resourceVersion", podResourceVersion)
+ if err != nil {
+ statusChan <- PodStatusResult{Err: fmt.Errorf("failed to create request: GET %q : %v", getURL, err)}
+ return
+ }
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ if err != nil {
+ statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
+ return
+ }
+
+ var wps watchPodStatus
+ reader := bufio.NewReader(res.Body)
+ for {
+ line, err := reader.ReadBytes('\n')
+ if err != nil {
+ statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
+ return
+ }
+ if err := json.Unmarshal(line, &wps); err != nil {
+ statusChan <- PodStatusResult{Err: fmt.Errorf("failed to decode watch pod status: %v", err)}
+ return
+ }
+ statusChan <- PodStatusResult{Status: &wps.Object.Status}
+ }
+ }()
+ return statusChan, nil
+}
+
+// Retrieve the status of a pod synchronously from the Kube
+// API server.
+func (c *Client) PodStatus(ctx context.Context, podName string) (*api.PodStatus, error) {
+ getURL := c.endpointURL + defaultPod + "/" + podName
+
+ // Make request to Kubernetes API
+ req, err := http.NewRequest("GET", getURL, nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
+ }
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
+ }
+
+ body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ if err != nil {
+ return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
+ }
+ if res.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
+ }
+
+ var pod *api.Pod
+ if err := json.Unmarshal(body, &pod); err != nil {
+ return nil, fmt.Errorf("failed to decode pod resources: %v", err)
+ }
+ return &pod.Status, nil
+}
+
+// PodLog retrieves the container log for the first container
+// in the pod.
+func (c *Client) PodLog(ctx context.Context, podName string) (string, error) {
+ // TODO(evanbrown): support multiple containers
+ url := c.endpointURL + defaultPod + "/" + podName + "/log"
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return "", fmt.Errorf("failed to create request: GET %q : %v", url, err)
+ }
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ if err != nil {
+ return "", fmt.Errorf("failed to make request: GET %q: %v", url, err)
+ }
+ body, err := ioutil.ReadAll(res.Body)
+ if err != nil {
+ return "", fmt.Errorf("failed to read response body: GET %q: %v, url, err")
+ }
+ res.Body.Close()
+ return string(body), nil
}