all: display pool status and delete failed/old pods

* status page shows kube pool details
* pods created by the coorindator are tracked
* pods that fail to create are deleted
* pods older than delete-at are deleted
* pods created by a different coordinator are deleted

Updates golang/go#12546

Change-Id: I4c4f8ff906962b4a014a66d0a9d490ff17710d62
Reviewed-on: https://go-review.googlesource.com/16101
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/buildlet/kube.go b/buildlet/kube.go
index 6c523aa..a894675 100644
--- a/buildlet/kube.go
+++ b/buildlet/kube.go
@@ -19,6 +19,13 @@
 	"golang.org/x/net/context/ctxhttp"
 )
 
+var (
+	// TODO(evanbrown): resource requirements should be
+	// defined per-builder in dashboard/builders.go
+	BuildletCPU    = api.MustParse("2")         // 2 Cores
+	BuildletMemory = api.MustParse("2000000Ki") // 2,000,000Ki RAM
+)
+
 // PodOpts control how new pods are started.
 type PodOpts struct {
 	// ImageRegistry specifies the Docker registry Kubernetes
@@ -40,11 +47,6 @@
 	// to delete the pod.
 	DeleteIn time.Duration
 
-	// OnInstanceRequested optionally specifies a hook to run synchronously
-	// after the pod create call, but before
-	// waiting for its operation to proceed.
-	OnPodRequested func()
-
 	// OnPodCreated optionally specifies a hook to run synchronously
 	// after the pod operation succeeds.
 	OnPodCreated func()
@@ -73,6 +75,7 @@
 				"type": builderType,
 				"role": "buildlet",
 			},
+			Annotations: map[string]string{},
 		},
 		Spec: api.PodSpec{
 			RestartPolicy: api.RestartPolicyNever,
@@ -81,7 +84,13 @@
 					Name:            "buildlet",
 					Image:           imageID(opts.ImageRegistry, conf.KubeImage),
 					ImagePullPolicy: api.PullAlways,
-					Command:         []string{"/usr/local/bin/stage0"},
+					Resources: api.ResourceRequirements{
+						Limits: api.ResourceList{
+							api.ResourceCPU:    BuildletCPU,
+							api.ResourceMemory: BuildletMemory,
+						},
+					},
+					Command: []string{"/usr/local/bin/stage0"},
 					Ports: []api.ContainerPort{
 						{
 							ContainerPort: 80,
@@ -120,17 +129,19 @@
 	if opts.DeleteIn != 0 {
 		// In case the pod gets away from us (generally: if the
 		// coordinator dies while a build is running), then we
-		// set this attribute of when it should be killed so
+		// set this annotation of when it should be killed so
 		// we can kill it later when the coordinator is
 		// restarted. The cleanUpOldPods goroutine loop handles
 		// that killing.
-		addEnv("META_DELETE_AT", fmt.Sprint(time.Now().Add(opts.DeleteIn).Unix()))
+		pod.ObjectMeta.Annotations["delete-at"] = fmt.Sprint(time.Now().Add(opts.DeleteIn).Unix())
 	}
 
-	status, err := kubeClient.Run(ctx, pod)
+	status, err := kubeClient.RunPod(ctx, pod)
 	if err != nil {
 		return nil, fmt.Errorf("pod could not be created: %v", err)
 	}
+	condRun(opts.OnPodCreated)
+
 	// The new pod must be in Running phase. Possible phases are described at
 	// http://releases.k8s.io/HEAD/docs/user-guide/pod-states.md#pod-phase
 	if status.Phase != api.PodRunning {
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index ac1fb68..a996058 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -306,6 +306,7 @@
 		http.HandleFunc("/dosomework/", handleDoSomeWork(workc))
 	} else {
 		go gcePool.cleanUpOldVMs()
+		go kubePool.cleanUpOldPods(context.Background())
 
 		if inStaging {
 			dashboard.BuildletBucket = "dev-go-builder-data"
diff --git a/cmd/coordinator/gce.go b/cmd/coordinator/gce.go
index 337f85c..fb28254 100644
--- a/cmd/coordinator/gce.go
+++ b/cmd/coordinator/gce.go
@@ -393,11 +393,11 @@
 	return ok
 }
 
-func (p *gceBuildletPool) instancesActive() (ret []instanceTime) {
+func (p *gceBuildletPool) instancesActive() (ret []resourceTime) {
 	p.mu.Lock()
 	defer p.mu.Unlock()
 	for name, create := range p.inst {
-		ret = append(ret, instanceTime{
+		ret = append(ret, resourceTime{
 			name:     name,
 			creation: create,
 		})
@@ -406,13 +406,13 @@
 	return ret
 }
 
-// instanceTime is a GCE instance name and its creation time.
-type instanceTime struct {
+// resourceTime is a GCE instance or Kube pod name and its creation time.
+type resourceTime struct {
 	name     string
 	creation time.Time
 }
 
-type byCreationTime []instanceTime
+type byCreationTime []resourceTime
 
 func (s byCreationTime) Len() int           { return len(s) }
 func (s byCreationTime) Less(i, j int) bool { return s[i].creation.Before(s[j].creation) }
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index 0dba4bf..9ecf8d4 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -13,12 +13,16 @@
 	"io"
 	"log"
 	"net/http"
+	"sort"
+	"strconv"
 	"strings"
 	"sync"
+	"time"
 
 	"golang.org/x/build/buildlet"
 	"golang.org/x/build/dashboard"
 	"golang.org/x/build/kubernetes"
+	"golang.org/x/build/kubernetes/api"
 	"golang.org/x/net/context"
 	"golang.org/x/oauth2"
 	container "google.golang.org/api/container/v1"
@@ -104,15 +108,55 @@
 	if err != nil {
 		return fmt.Errorf("kubernetes HTTP client could not be created: %v", err)
 	}
+
+	go kubePool.pollCapacityLoop()
 	return nil
 }
 
-var kubePool = &kubeBuildletPool{}
+var kubePool = &kubeBuildletPool{
+	cpuCapacity:    api.NewQuantity(0, api.DecimalSI),
+	cpuUsage:       api.NewQuantity(0, api.DecimalSI),
+	memoryCapacity: api.NewQuantity(0, api.BinarySI),
+	memoryUsage:    api.NewQuantity(0, api.BinarySI),
+}
 
 // kubeBuildletPool is the Kubernetes buildlet pool.
 type kubeBuildletPool struct {
-	// ...
-	mu sync.Mutex
+	mu sync.Mutex // guards all following
+
+	pods           map[string]time.Time // pod instance name -> creationTime
+	cpuCapacity    *api.Quantity        // cpu capacity as reported by the Kubernetes api
+	memoryCapacity *api.Quantity
+	cpuUsage       *api.Quantity
+	memoryUsage    *api.Quantity
+}
+
+func (p *kubeBuildletPool) pollCapacityLoop() {
+	ctx := context.Background()
+	for {
+		p.pollCapacity(ctx)
+		time.Sleep(30 * time.Second)
+	}
+}
+
+func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {
+	nodes, err := kubeClient.GetNodes(ctx)
+	if err != nil {
+		log.Printf("Failed to get Kubernetes cluster capacity for %s/%s: %v", projectID, projectRegion, err)
+		return
+	}
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	// Calculate the total CPU and memory capacity of the cluster
+	var sumCPU = api.NewQuantity(0, api.DecimalSI)
+	var sumMemory = api.NewQuantity(0, api.BinarySI)
+	for _, n := range nodes {
+		sumCPU.Add(n.Status.Capacity[api.ResourceCPU])
+		sumMemory.Add(n.Status.Capacity[api.ResourceMemory])
+	}
+	p.cpuCapacity = sumCPU
+	p.memoryCapacity = sumMemory
 }
 
 func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, typ, rev string, el eventTimeLogger) (*buildlet.Client, error) {
@@ -153,13 +197,10 @@
 		ImageRegistry: registryPrefix,
 		Description:   fmt.Sprintf("Go Builder for %s at %s", typ, rev),
 		DeleteIn:      deleteIn,
-		OnPodRequested: func() {
-			el.logEventTime("pod_create_requested", podName)
-			log.Printf("Pod %q starting", podName)
-		},
 		OnPodCreated: func() {
 			el.logEventTime("pod_created")
-			needDelete = true // redundant with OnPodRequested one, but fine.
+			p.setPodUsed(podName, true)
+			needDelete = true
 		},
 		OnGotPodInfo: func() {
 			el.logEventTime("got_pod_info", "waiting_for_buildlet...")
@@ -167,19 +208,95 @@
 	})
 	if err != nil {
 		el.logEventTime("kube_buildlet_create_failure", fmt.Sprintf("%s: %v", podName, err))
-		log.Printf("Failed to create kube pod for %s, %s: %v", typ, rev, err)
+
 		if needDelete {
-			//TODO(evanbrown): delete pod
+			log.Printf("Deleting failed pod %q", podName)
+			kubeClient.DeletePod(ctx, podName)
+			p.setPodUsed(podName, false)
 		}
-		//p.setInstanceUsed(instName, false)
 		return nil, err
 	}
+
 	bc.SetDescription("Kube Pod: " + podName)
+
+	// The build's context will be canceled when the build completes (successfully
+	// or not), or if the buildlet becomes unavailable. In any case, delete the pod
+	// running the buildlet.
+	go func() {
+		<-ctx.Done()
+		log.Printf("Deleting pod %q after build context cancel received ", podName)
+		// Giving DeletePod a new context here as the build ctx has been canceled
+		kubeClient.DeletePod(context.Background(), podName)
+		p.setPodUsed(podName, false)
+	}()
+
 	return bc, nil
 }
 
 func (p *kubeBuildletPool) WriteHTMLStatus(w io.Writer) {
-	io.WriteString(w, "<b>Kubernetes pool summary</b><ul><li>(TODO)</li></ul>")
+	fmt.Fprintf(w, "<b>Kubernetes pool</b> capacity: %s", p.capacityString())
+	const show = 6 // must be even
+	active := p.podsActive()
+	if len(active) > 0 {
+		fmt.Fprintf(w, "<ul>")
+		for i, pod := range active {
+			if i < show/2 || i >= len(active)-(show/2) {
+				fmt.Fprintf(w, "<li>%v, %v</li>\n", pod.name, time.Since(pod.creation))
+			} else if i == show/2 {
+				fmt.Fprintf(w, "<li>... %d of %d total omitted ...</li>\n", len(active)-show, len(active))
+			}
+		}
+		fmt.Fprintf(w, "</ul>")
+	}
+}
+
+func (p *kubeBuildletPool) capacityString() string {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	return fmt.Sprintf("%v/%v CPUs; %v/%v Memory",
+		p.cpuUsage, p.cpuCapacity,
+		p.memoryUsage, p.memoryCapacity)
+}
+
+func (p *kubeBuildletPool) setPodUsed(podName string, used bool) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if p.pods == nil {
+		p.pods = make(map[string]time.Time)
+	}
+	if used {
+		p.pods[podName] = time.Now()
+		// Track cpu and memory usage
+		p.cpuUsage.Add(buildlet.BuildletCPU)
+		p.memoryUsage.Add(buildlet.BuildletMemory)
+
+	} else {
+		delete(p.pods, podName)
+		// Track cpu and memory usage
+		p.cpuUsage.Sub(buildlet.BuildletCPU)
+		p.memoryUsage.Sub(buildlet.BuildletMemory)
+
+	}
+}
+
+func (p *kubeBuildletPool) podUsed(podName string) bool {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	_, ok := p.pods[podName]
+	return ok
+}
+
+func (p *kubeBuildletPool) podsActive() (ret []resourceTime) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	for name, create := range p.pods {
+		ret = append(ret, resourceTime{
+			name:     name,
+			creation: create,
+		})
+	}
+	sort.Sort(byCreationTime(ret))
+	return ret
 }
 
 func (p *kubeBuildletPool) String() string {
@@ -191,6 +308,69 @@
 	return fmt.Sprintf("Kubernetes pool capacity: %d/%d", inUse, total)
 }
 
+// cleanUpOldPods loops forever and periodically enumerates pods
+// and deletes those which have expired.
+//
+// A Pod is considered expired if it has a "delete-at" metadata
+// attribute having a unix timestamp before the current time.
+//
+// This is the safety mechanism to delete pods which stray from the
+// normal deleting process. Pods are created to run a single build and
+// should be shut down by a controlling process. Due to various types
+// of failures, they might get stranded. To prevent them from getting
+// stranded and wasting resources forever, we instead set the
+// "delete-at" metadata attribute on them when created to some time
+// that's well beyond their expected lifetime.
+func (p *kubeBuildletPool) cleanUpOldPods(ctx context.Context) {
+	if containerService == nil {
+		return
+	}
+	for {
+		pods, err := kubeClient.GetPods(ctx)
+		if err != nil {
+			log.Printf("Error cleaning pods: %v", err)
+		}
+		for _, pod := range pods {
+			if pod.ObjectMeta.Annotations == nil {
+				// Defensive. Not seen in practice.
+				continue
+			}
+			sawDeleteAt := false
+			for k, v := range pod.ObjectMeta.Annotations {
+				if k == "delete-at" {
+					sawDeleteAt = true
+					if v == "" {
+						log.Printf("missing delete-at value; ignoring")
+						continue
+					}
+					unixDeadline, err := strconv.ParseInt(v, 10, 64)
+					if err != nil {
+						log.Printf("invalid delete-at value %q seen; ignoring", v)
+					}
+					if err == nil && time.Now().Unix() > unixDeadline {
+						log.Printf("Deleting expired pod %q in zone %q ...", pod.Name)
+						err = kubeClient.DeletePod(ctx, pod.Name)
+						if err != nil {
+							log.Printf("problem deleting pod: %v", err)
+						}
+					}
+				}
+			}
+			// Delete buildlets (things we made) from previous
+			// generations. Only deleting things starting with "buildlet-"
+			// is a historical restriction, but still fine for paranoia.
+			if sawDeleteAt && strings.HasPrefix(pod.Name, "buildlet-") && !p.podUsed(pod.Name) {
+				log.Printf("Deleting pod %q from an earlier coordinator generation ...", pod.Name)
+				err = kubeClient.DeletePod(ctx, pod.Name)
+				if err != nil {
+					log.Printf("problem deleting pod: %v", err)
+				}
+			}
+		}
+		time.Sleep(time.Minute)
+	}
+}
+
 func hasCloudPlatformScope() bool {
 	return hasScope(container.CloudPlatformScope)
 }
diff --git a/cmd/coordinator/status.go b/cmd/coordinator/status.go
index bc26ab4..ca4e6fb 100644
--- a/cmd/coordinator/status.go
+++ b/cmd/coordinator/status.go
@@ -61,6 +61,11 @@
 	gcePool.WriteHTMLStatus(&buf)
 	data.GCEPoolStatus = template.HTML(buf.String())
 	buf.Reset()
+
+	kubePool.WriteHTMLStatus(&buf)
+	data.KubePoolStatus = template.HTML(buf.String())
+	buf.Reset()
+
 	reversePool.WriteHTMLStatus(&buf)
 	data.ReversePoolStatus = template.HTML(buf.String())
 
@@ -86,6 +91,7 @@
 	TrybotsErr        string
 	Trybots           template.HTML
 	GCEPoolStatus     template.HTML // TODO: embed template
+	KubePoolStatus    template.HTML // TODO: embed template
 	ReversePoolStatus template.HTML // TODO: embed template
 	RemoteBuildlets   template.HTML
 	DiskFree          string
@@ -129,6 +135,7 @@
 <h2>Buildlet pools</h2>
 <ul>
 <li>{{.GCEPoolStatus}}</li>
+<li>{{.KubePoolStatus}}</li>
 <li>{{.ReversePoolStatus}}</li>
 </ul>
 
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 2ee425a..1acc98e 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -11,6 +11,7 @@
 	"encoding/json"
 	"fmt"
 	"io/ioutil"
+	"log"
 	"net/http"
 	"net/url"
 	"strings"
@@ -26,6 +27,7 @@
 	APIEndpoint     = "/api/v1"
 	defaultPod      = "/namespaces/default/pods"
 	defaultWatchPod = "/watch/namespaces/default/pods"
+	nodes           = "/nodes"
 )
 
 // Client is a client for the Kubernetes master.
@@ -55,7 +57,7 @@
 // An error is returned if the pod can not be created, if it does
 // does not enter the running phase within 2 minutes, or if ctx.Done
 // is closed.
-func (c *Client) Run(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
+func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.PodStatus, error) {
 	var podJSON bytes.Buffer
 	if err := json.NewEncoder(&podJSON).Encode(pod); err != nil {
 		return nil, fmt.Errorf("failed to encode pod in json: %v", err)
@@ -86,11 +88,67 @@
 
 	status, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
 	if err != nil {
+		log.Printf("Timed out waiting for pod to leave pending state. Pod will be deleted.")
+		// The pod did not leave the pending state. We should try to manually delete it before
+		// returning an error.
+		c.DeletePod(context.Background(), podResult.Name)
 		return nil, err
 	}
 	return status, nil
 }
 
+// GetPods returns all pods in the cluster, regardless of status.
+func (c *Client) GetPods(ctx context.Context) ([]api.Pod, error) {
+	getURL := c.endpointURL + defaultPod
+
+	// Make request to Kubernetes API
+	req, err := http.NewRequest("GET", getURL, nil)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create request: GET %q : %v", getURL, err)
+	}
+	res, err := ctxhttp.Do(ctx, c.httpClient, req)
+	if err != nil {
+		return nil, fmt.Errorf("failed to make request: GET %q: %v", getURL, err)
+	}
+
+	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
+	if err != nil {
+		return nil, fmt.Errorf("failed to read request body for GET %q: %v", getURL, err)
+	}
+	if res.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, getURL, string(body), err)
+	}
+
+	var podList api.PodList
+	if err := json.Unmarshal(body, &podList); err != nil {
+		return nil, fmt.Errorf("failed to decode list of pod resources: %v", err)
+	}
+	return podList.Items, nil
+}
+
+// PodDelete deletes the specified Kubernetes pod.
+func (c *Client) DeletePod(ctx context.Context, podName string) error {
+	url := c.endpointURL + defaultPod + "/" + podName
+	req, err := http.NewRequest("DELETE", url, nil)
+	if err != nil {
+		return fmt.Errorf("failed to create request: DELETE %q : %v", url, err)
+	}
+	res, err := ctxhttp.Do(ctx, c.httpClient, req)
+	if err != nil {
+		return fmt.Errorf("failed to make request: DELETE %q: %v", url, err)
+	}
+	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
+	if err != nil {
+		return fmt.Errorf("failed to read response body: DELETE %q: %v, url, err")
+	}
+	if res.StatusCode != http.StatusOK {
+		return fmt.Errorf("http error: %d DELETE %q: %q: %v", res.StatusCode, url, string(body), err)
+	}
+	return nil
+}
+
 // awaitPodNotPending will return a pod's status in a
 // podStatusResult when the pod is no longer in the pending
 // state.
@@ -162,6 +220,7 @@
 			return
 		}
 		res, err := ctxhttp.Do(ctx, c.httpClient, req)
+		defer res.Body.Close()
 		if err != nil {
 			statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
 			return
@@ -169,8 +228,23 @@
 
 		var wps watchPodStatus
 		reader := bufio.NewReader(res.Body)
+
+		// bufio.Reader.ReadBytes is blocking, so we watch for
+		// context timeout or cancellation in a goroutine
+		// and close the response body when see see it. The
+		// response body is also closed via defer when the
+		// request is made, but closing twice is OK.
+		go func() {
+			<-ctx.Done()
+			res.Body.Close()
+		}()
+
 		for {
 			line, err := reader.ReadBytes('\n')
+			if ctx.Err() != nil {
+				statusChan <- PodStatusResult{Err: ctx.Err()}
+				return
+			}
 			if err != nil {
 				statusChan <- PodStatusResult{Err: fmt.Errorf("error reading streaming response body: %v", err)}
 				return
@@ -230,9 +304,38 @@
 		return "", fmt.Errorf("failed to make request: GET %q: %v", url, err)
 	}
 	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
 	if err != nil {
 		return "", fmt.Errorf("failed to read response body: GET %q: %v", url, err)
 	}
-	res.Body.Close()
+	if res.StatusCode != http.StatusOK {
+		return "", fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
+	}
 	return string(body), nil
 }
+
+// PodNodes returns the list of nodes that comprise the Kubernetes cluster
+func (c *Client) GetNodes(ctx context.Context) ([]api.Node, error) {
+	url := c.endpointURL + nodes
+	req, err := http.NewRequest("GET", url, nil)
+	if err != nil {
+		return nil, fmt.Errorf("failed to create request: GET %q : %v", url, err)
+	}
+	res, err := ctxhttp.Do(ctx, c.httpClient, req)
+	if err != nil {
+		return nil, fmt.Errorf("failed to make request: GET %q: %v", url, err)
+	}
+	body, err := ioutil.ReadAll(res.Body)
+	res.Body.Close()
+	if err != nil {
+		return nil, fmt.Errorf("failed to read response body: GET %q: %v, url, err")
+	}
+	if res.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("http error %d GET %q: %q: %v", res.StatusCode, url, string(body), err)
+	}
+	var nodeList *api.NodeList
+	if err := json.Unmarshal(body, &nodeList); err != nil {
+		return nil, fmt.Errorf("failed to decode node list: %v", err)
+	}
+	return nodeList.Items, nil
+}