all: display pool status and delete failed/old pods
* status page shows kube pool details
* pods created by the coorindator are tracked
* pods that fail to create are deleted
* pods older than delete-at are deleted
* pods created by a different coordinator are deleted
Updates golang/go#12546
Change-Id: I4c4f8ff906962b4a014a66d0a9d490ff17710d62
Reviewed-on: https://go-review.googlesource.com/16101
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/buildlet/kube.go b/buildlet/kube.go
index 6c523aa..a894675 100644
--- a/buildlet/kube.go
+++ b/buildlet/kube.go
@@ -19,6 +19,13 @@
"golang.org/x/net/context/ctxhttp"
)
+var (
+ // TODO(evanbrown): resource requirements should be
+ // defined per-builder in dashboard/builders.go
+ BuildletCPU = api.MustParse("2") // 2 Cores
+ BuildletMemory = api.MustParse("2000000Ki") // 2,000,000Ki RAM
+)
+
// PodOpts control how new pods are started.
type PodOpts struct {
// ImageRegistry specifies the Docker registry Kubernetes
@@ -40,11 +47,6 @@
// to delete the pod.
DeleteIn time.Duration
- // OnInstanceRequested optionally specifies a hook to run synchronously
- // after the pod create call, but before
- // waiting for its operation to proceed.
- OnPodRequested func()
-
// OnPodCreated optionally specifies a hook to run synchronously
// after the pod operation succeeds.
OnPodCreated func()
@@ -73,6 +75,7 @@
"type": builderType,
"role": "buildlet",
},
+ Annotations: map[string]string{},
},
Spec: api.PodSpec{
RestartPolicy: api.RestartPolicyNever,
@@ -81,7 +84,13 @@
Name: "buildlet",
Image: imageID(opts.ImageRegistry, conf.KubeImage),
ImagePullPolicy: api.PullAlways,
- Command: []string{"/usr/local/bin/stage0"},
+ Resources: api.ResourceRequirements{
+ Limits: api.ResourceList{
+ api.ResourceCPU: BuildletCPU,
+ api.ResourceMemory: BuildletMemory,
+ },
+ },
+ Command: []string{"/usr/local/bin/stage0"},
Ports: []api.ContainerPort{
{
ContainerPort: 80,
@@ -120,17 +129,19 @@
if opts.DeleteIn != 0 {
// In case the pod gets away from us (generally: if the
// coordinator dies while a build is running), then we
- // set this attribute of when it should be killed so
+ // set this annotation of when it should be killed so
// we can kill it later when the coordinator is
// restarted. The cleanUpOldPods goroutine loop handles
// that killing.
- addEnv("META_DELETE_AT", fmt.Sprint(time.Now().Add(opts.DeleteIn).Unix()))
+ pod.ObjectMeta.Annotations["delete-at"] = fmt.Sprint(time.Now().Add(opts.DeleteIn).Unix())
}
- status, err := kubeClient.Run(ctx, pod)
+ status, err := kubeClient.RunPod(ctx, pod)
if err != nil {
return nil, fmt.Errorf("pod could not be created: %v", err)
}
+ condRun(opts.OnPodCreated)
+
// The new pod must be in Running phase. Possible phases are described at
// http://releases.k8s.io/HEAD/docs/user-guide/pod-states.md#pod-phase
if status.Phase != api.PodRunning {
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index ac1fb68..a996058 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -306,6 +306,7 @@
http.HandleFunc("/dosomework/", handleDoSomeWork(workc))
} else {
go gcePool.cleanUpOldVMs()
+ go kubePool.cleanUpOldPods(context.Background())
if inStaging {
dashboard.BuildletBucket = "dev-go-builder-data"
diff --git a/cmd/coordinator/gce.go b/cmd/coordinator/gce.go
index 337f85c..fb28254 100644
--- a/cmd/coordinator/gce.go
+++ b/cmd/coordinator/gce.go
@@ -393,11 +393,11 @@
return ok
}
-func (p *gceBuildletPool) instancesActive() (ret []instanceTime) {
+func (p *gceBuildletPool) instancesActive() (ret []resourceTime) {
p.mu.Lock()
defer p.mu.Unlock()
for name, create := range p.inst {
- ret = append(ret, instanceTime{
+ ret = append(ret, resourceTime{
name: name,
creation: create,
})
@@ -406,13 +406,13 @@
return ret
}
-// instanceTime is a GCE instance name and its creation time.
-type instanceTime struct {
+// resourceTime is a GCE instance or Kube pod name and its creation time.
+type resourceTime struct {
name string
creation time.Time
}
-type byCreationTime []instanceTime
+type byCreationTime []resourceTime
func (s byCreationTime) Len() int { return len(s) }
func (s byCreationTime) Less(i, j int) bool { return s[i].creation.Before(s[j].creation) }
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index 0dba4bf..9ecf8d4 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -13,12 +13,16 @@
"io"
"log"
"net/http"
+ "sort"
+ "strconv"
"strings"
"sync"
+ "time"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
"golang.org/x/build/kubernetes"
+ "golang.org/x/build/kubernetes/api"
"golang.org/x/net/context"
"golang.org/x/oauth2"
container "google.golang.org/api/container/v1"
@@ -104,15 +108,55 @@
if err != nil {
return fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
}
+
+ go kubePool.pollCapacityLoop()
return nil
}
-var kubePool = &kubeBuildletPool{}
+var kubePool = &kubeBuildletPool{
+ cpuCapacity: api.NewQuantity(0, api.DecimalSI),
+ cpuUsage: api.NewQuantity(0, api.DecimalSI),
+ memoryCapacity: api.NewQuantity(0, api.BinarySI),
+ memoryUsage: api.NewQuantity(0, api.BinarySI),
+}
// kubeBuildletPool is the Kubernetes buildlet pool.
type kubeBuildletPool struct {
- // ...
- mu sync.Mutex
+ mu sync.Mutex // guards all following
+
+ pods map[string]time.Time // pod instance name -> creationTime
+ cpuCapacity *api.Quantity // cpu capacity as reported by the Kubernetes api
+ memoryCapacity *api.Quantity
+ cpuUsage *api.Quantity
+ memoryUsage *api.Quantity
+}
+
+func (p *kubeBuildletPool) pollCapacityLoop() {
+ ctx := context.Background()
+ for {
+ p.pollCapacity(ctx)
+ time.Sleep(30 * time.Second)
+ }
+}
+
+func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {
+ nodes, err := kubeClient.GetNodes(ctx)
+ if err != nil {
+ log.Printf("Failed to get Kubernetes cluster capacity for %s/%s: %v", projectID, projectRegion, err)
+ return
+ }
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ // Calculate the total CPU and memory capacity of the cluster
+ var sumCPU = api.NewQuantity(0, api.DecimalSI)
+ var sumMemory = api.NewQuantity(0, api.BinarySI)
+ for _, n := range nodes {
+ sumCPU.Add(n.Status.Capacity[api.ResourceCPU])
+ sumMemory.Add(n.Status.Capacity[api.ResourceMemory])
+ }
+ p.cpuCapacity = sumCPU
+ p.memoryCapacity = sumMemory
}
func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, typ, rev string, el eventTimeLogger) (*buildlet.Client, error) {
@@ -153,13 +197,10 @@
ImageRegistry: registryPrefix,
Description: fmt.Sprintf("Go Builder for %s at %s", typ, rev),
DeleteIn: deleteIn,
- OnPodRequested: func() {
- el.logEventTime("pod_create_requested", podName)
- log.Printf("Pod %q starting", podName)
- },
OnPodCreated: func() {
el.logEventTime("pod_created")
- needDelete = true // redundant with OnPodRequested one, but fine.
+ p.setPodUsed(podName, true)
+ needDelete = true
},
OnGotPodInfo: func() {
el.logEventTime("got_pod_info", "waiting_for_buildlet...")
@@ -167,19 +208,95 @@
})
if err != nil {
el.logEventTime("kube_buildlet_create_failure", fmt.Sprintf("%s: %v", podName, err))
- log.Printf("Failed to create kube pod for %s, %s: %v", typ, rev, err)
+
if needDelete {
- //TODO(evanbrown): delete pod
+ log.Printf("Deleting failed pod %q", podName)
+ kubeClient.DeletePod(ctx, podName)
+ p.setPodUsed(podName, false)
}
- //p.setInstanceUsed(instName, false)
return nil, err
}
+
bc.SetDescription("Kube Pod: " + podName)
+
+ // The build's context will be canceled when the build completes (successfully
+ // or not), or if the buildlet becomes unavailable. In any case, delete the pod
+ // running the buildlet.
+ go func() {
+ <-ctx.Done()
+ log.Printf("Deleting pod %q after build context cancel received ", podName)
+ // Giving DeletePod a new context here as the build ctx has been canceled
+ kubeClient.DeletePod(context.Background(), podName)
+ p.setPodUsed(podName, false)
+ }()
+
return bc, nil
}
func (p *kubeBuildletPool) WriteHTMLStatus(w io.Writer) {
- io.WriteString(w, "<b>Kubernetes pool summary</b><ul><li>(TODO)</li></ul>")
+ fmt.Fprintf(w, "<b>Kubernetes pool</b> capacity: %s", p.capacityString())
+ const show = 6 // must be even
+ active := p.podsActive()
+ if len(active) > 0 {
+ fmt.Fprintf(w, "<ul>")
+ for i, pod := range active {
+ if i < show/2 || i >= len(active)-(show/2) {
+ fmt.Fprintf(w, "<li>%v, %v</li>\n", pod.name, time.Since(pod.creation))
+ } else if i == show/2 {
+ fmt.Fprintf(w, "<li>... %d of %d total omitted ...</li>\n", len(active)-show, len(active))
+ }
+ }
+ fmt.Fprintf(w, "</ul>")
+ }
+}
+
+func (p *kubeBuildletPool) capacityString() string {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ return fmt.Sprintf("%v/%v CPUs; %v/%v Memory",
+ p.cpuUsage, p.cpuCapacity,
+ p.memoryUsage, p.memoryCapacity)
+}
+
+func (p *kubeBuildletPool) setPodUsed(podName string, used bool) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ if p.pods == nil {
+ p.pods = make(map[string]time.Time)
+ }
+ if used {
+ p.pods[podName] = time.Now()
+ // Track cpu and memory usage
+ p.cpuUsage.Add(buildlet.BuildletCPU)
+ p.memoryUsage.Add(buildlet.BuildletMemory)
+
+ } else {
+ delete(p.pods, podName)
+ // Track cpu and memory usage
+ p.cpuUsage.Sub(buildlet.BuildletCPU)
+ p.memoryUsage.Sub(buildlet.BuildletMemory)
+
+ }
+}
+
+func (p *kubeBuildletPool) podUsed(podName string) bool {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ _, ok := p.pods[podName]
+ return ok
+}
+
+func (p *kubeBuildletPool) podsActive() (ret []resourceTime) {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+ for name, create := range p.pods {
+ ret = append(ret, resourceTime{
+ name: name,
+ creation: create,
+ })
+ }
+ sort.Sort(byCreationTime(ret))
+ return ret
}
func (p *kubeBuildletPool) String() string {
@@ -191,6 +308,69 @@
return fmt.Sprintf("Kubernetes pool capacity: %d/%d", inUse, total)
}
+// cleanUpOldPods loops forever and periodically enumerates pods
+// and deletes those which have expired.
+//
+// A Pod is considered expired if it has a "delete-at" metadata
+// attribute having a unix timestamp before the current time.
+//
+// This is the safety mechanism to delete pods which stray from the
+// normal deleting process. Pods are created to run a single build and
+// should be shut down by a controlling process. Due to various types
+// of failures, they might get stranded. To prevent them from getting
+// stranded and wasting resources forever, we instead set the
+// "delete-at" metadata attribute on them when created to some time
+// that's well beyond their expected lifetime.
+func (p *kubeBuildletPool) cleanUpOldPods(ctx context.Context) {
+ if containerService == nil {
+ return
+ }
+ for {
+ pods, err := kubeClient.GetPods(ctx)
+ if err != nil {
+ log.Printf("Error cleaning pods: %v", err)
+ }
+ for _, pod := range pods {
+ if pod.ObjectMeta.Annotations == nil {
+ // Defensive. Not seen in practice.
+ continue
+ }
+ sawDeleteAt := false
+ for k, v := range pod.ObjectMeta.Annotations {
+ if k == "delete-at" {
+ sawDeleteAt = true
+ if v == "" {
+ log.Printf("missing delete-at value; ignoring")
+ continue
+ }
+ unixDeadline, err := strconv.ParseInt(v, 10, 64)
+ if err != nil {
+ log.Printf("invalid delete-at value %q seen; ignoring", v)
+ }
+ if err == nil && time.Now().Unix() > unixDeadline {
+ log.Printf("Deleting expired pod %q in zone %q ...", pod.Name)
+ err = kubeClient.DeletePod(ctx, pod.Name)
+ if err != nil {
+ log.Printf("problem deleting pod: %v", err)
+ }
+ }
+ }
+ }
+ // Delete buildlets (things we made) from previous
+ // generations. Only deleting things starting with "buildlet-"
+ // is a historical restriction, but still fine for paranoia.
+ if sawDeleteAt && strings.HasPrefix(pod.Name, "buildlet-") && !p.podUsed(pod.Name) {
+ log.Printf("Deleting pod %q from an earlier coordinator generation ...", pod.Name)
+ err = kubeClient.DeletePod(ctx, pod.Name)
+ if err != nil {
+ log.Printf("problem deleting pod: %v", err)
+ }
+ }
+ }
+ time.Sleep(time.Minute)
+ }
+}
+
func hasCloudPlatformScope() bool {
return hasScope(container.CloudPlatformScope)
}
diff --git a/cmd/coordinator/status.go b/cmd/coordinator/status.go
index bc26ab4..ca4e6fb 100644
--- a/cmd/coordinator/status.go
+++ b/cmd/coordinator/status.go
@@ -61,6 +61,11 @@
gcePool.WriteHTMLStatus(&buf)
data.GCEPoolStatus = template.HTML(buf.String())
buf.Reset()
+
+ kubePool.WriteHTMLStatus(&buf)
+ data.KubePoolStatus = template.HTML(buf.String())
+ buf.Reset()
+
reversePool.WriteHTMLStatus(&buf)
data.ReversePoolStatus = template.HTML(buf.String())
@@ -86,6 +91,7 @@
TrybotsErr string
Trybots template.HTML
GCEPoolStatus template.HTML // TODO: embed template
+ KubePoolStatus template.HTML // TODO: embed template
ReversePoolStatus template.HTML // TODO: embed template
RemoteBuildlets template.HTML
DiskFree string
@@ -129,6 +135,7 @@
<h2>Buildlet pools</h2>
<ul>
<li>{{.GCEPoolStatus}}</li>
+<li>{{.KubePoolStatus}}</li>
<li>{{.ReversePoolStatus}}</li>
</ul>
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 2ee425a..1acc98e 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -11,6 +11,7 @@
"encoding/json"
"fmt"
"io/ioutil"
+ "log"
"net/http"
"net/url"
"strings"
@@ -26,6 +27,7 @@
APIEndpoint = "/api/v1"
defaultPod = "/namespaces/default/pods"
defaultWatchPod = "/watch/namespaces/default/pods"
+ nodes = "/nodes"
)
// Client is a client for the Kubernetes master.
@@ -55,7 +57,7 @@
// An error is returned if the pod can not be created, if it does
// does not enter the running phase within 2 minutes, or if ctx.Done
// is closed.
-func (c *Client) Run(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
+func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
var podJSON bytes.Buffer
if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
return nil, fmt.Errorf("failed to encode pod in json: %v", err)
@@ -86,11 +88,67 @@
status, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
if err != nil {
+ log.Printf("Timed out waiting for pod to leave pending state. Pod will be deleted.")
+ // The pod did not leave the pending state. We should try to manually delete it before
+ // returning an error.
+ c.DeletePod(context.Background(), podResult.Name)
return nil, err
}
return status, nil
}
+// GetPods returns all pods in the cluster, regardless of status.
+func (c *Client) GetPods(ctx context.Context) ([]api.Pod, error) {
+ getURL := c.endpointURL + defaultPod
+
+ // Make request to Kubernetes API
+ req, err := http.NewRequest("GET", getURL, nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
+ }
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
+ }
+
+ body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ if err != nil {
+ return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
+ }
+ if res.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
+ }
+
+ var podList api.PodList
+ if err := json.Unmarshal(body, &podList); err != nil {
+ return nil, fmt.Errorf("failed to decode list of pod resources: %v", err)
+ }
+ return podList.Items, nil
+}
+
+// PodDelete deletes the specified Kubernetes pod.
+func (c *Client) DeletePod(ctx context.Context, podName string) error {
+ url := c.endpointURL + defaultPod + "/" + podName
+ req, err := http.NewRequest("DELETE", url, nil)
+ if err != nil {
+ return fmt.Errorf("failed to create request: DELETE %q : %v", url, err)
+ }
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ if err != nil {
+ return fmt.Errorf("failed to make request: DELETE %q: %v", url, err)
+ }
+ body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ if err != nil {
+ return fmt.Errorf("failed to read response body: DELETE %q: %v, url, err")
+ }
+ if res.StatusCode != http.StatusOK {
+ return fmt.Errorf("http error: %d DELETE %q: %q: %v", res.StatusCode, url, string(body), err)
+ }
+ return nil
+}
+
// awaitPodNotPending will return a pod's status in a
// podStatusResult when the pod is no longer in the pending
// state.
@@ -162,6 +220,7 @@
return
}
res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ defer res.Body.Close()
if err != nil {
statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
return
@@ -169,8 +228,23 @@
var wps watchPodStatus
reader := bufio.NewReader(res.Body)
+
+ // bufio.Reader.ReadBytes is blocking, so we watch for
+ // context timeout or cancellation in a goroutine
+ // and close the response body when see see it. The
+ // response body is also closed via defer when the
+ // request is made, but closing twice is OK.
+ go func() {
+ <-ctx.Done()
+ res.Body.Close()
+ }()
+
for {
line, err := reader.ReadBytes('\n')
+ if ctx.Err() != nil {
+ statusChan <- PodStatusResult{Err: ctx.Err()}
+ return
+ }
if err != nil {
statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
return
@@ -230,9 +304,38 @@
return "", fmt.Errorf("failed to make request: GET %q: %v", url, err)
}
body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
if err != nil {
return "", fmt.Errorf("failed to read response body: GET %q: %v", url, err)
}
- res.Body.Close()
+ if res.StatusCode != http.StatusOK {
+ return "", fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
+ }
return string(body), nil
}
+
+// PodNodes returns the list of nodes that comprise the Kubernetes cluster
+func (c *Client) GetNodes(ctx context.Context) ([]api.Node, error) {
+ url := c.endpointURL + nodes
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, fmt.Errorf("failed to create request: GET %q : %v", url, err)
+ }
+ res, err := ctxhttp.Do(ctx, c.httpClient, req)
+ if err != nil {
+ return nil, fmt.Errorf("failed to make request: GET %q: %v", url, err)
+ }
+ body, err := ioutil.ReadAll(res.Body)
+ res.Body.Close()
+ if err != nil {
+ return nil, fmt.Errorf("failed to read response body: GET %q: %v, url, err")
+ }
+ if res.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
+ }
+ var nodeList *api.NodeList
+ if err := json.Unmarshal(body, &nodeList); err != nil {
+ return nil, fmt.Errorf("failed to decode node list: %v", err)
+ }
+ return nodeList.Items, nil
+}