all: kubernetes builder autoscaling

Improvements to support rapid scheduling of many build jobs:

- Retry logic in Kubernetes client to handle sporadic connection
  closes from their API server under heavy load

- Cluster autoscaler scales on default CPU utilization metric

- Debug mode allows scheduling multiple builds to test scaling

- Account for scheduled vs. provisioned resources in a cluster
  and use that information to estimate when a build's pod
  will be scheduled and in running state

- Use estimated scheduled time to set context timeout

- Track pod lifecycle (requested time, estimated available time,
  actual available time, terminate time, etc)

Change-Id: I14d6c5e01af0970dbb3390a29d1ee5c43049fff8
Reviewed-on: https://go-review.googlesource.com/19524
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/buildenv/envs.go b/buildenv/envs.go
index 6e55c8d..733a6b2 100644
--- a/buildenv/envs.go
+++ b/buildenv/envs.go
@@ -84,6 +84,12 @@
 
 	// The GCS bucket that snapshots are written to.
 	SnapBucket string
+
+	// The maximum number of concurrent builds that can run.
+	// The zero value indicates unlimited builds and is the default.
+	// MaxBuilds is typically used to limit the number of builds in
+	// a development or staging environment.
+	MaxBuilds int
 }
 
 // MachineTypeURI returns the URI for the environment's Machine Type.
diff --git a/buildlet/kube.go b/buildlet/kube.go
index 1d57351..91af442 100644
--- a/buildlet/kube.go
+++ b/buildlet/kube.go
@@ -51,11 +51,16 @@
 	// to delete the pod.
 	DeleteIn time.Duration
 
-	// OnPodCreated optionally specifies a hook to run synchronously
-	// after the pod operation succeeds.
-	OnPodCreated func()
+	// OnPodCreating optionally specifies a hook to run synchronously
+	// after the pod create request has been made, but before the create
+	// has suceeded.
+	OnPodCreating func()
 
 	// OnPodCreated optionally specifies a hook to run synchronously
+	// after the pod create request succeeds.
+	OnPodCreated func()
+
+	// OnGotPodInfo optionally specifies a hook to run synchronously
 	// after the pod Get call.
 	OnGotPodInfo func()
 }
@@ -144,17 +149,18 @@
 		pod.ObjectMeta.Annotations["delete-at"] = fmt.Sprint(time.Now().Add(opts.DeleteIn).Unix())
 	}
 
+	condRun(opts.OnPodCreating)
 	newPod, err := kubeClient.RunPod(ctx, pod)
 	if err != nil {
-		return nil, fmt.Errorf("pod could not be created: %v", err)
+		return nil, err
 	}
-	condRun(opts.OnPodCreated)
 
 	// The new pod must be in Running phase. Possible phases are described at
 	// http://releases.k8s.io/HEAD/docs/user-guide/pod-states.md#pod-phase
 	if newPod.Status.Phase != api.PodRunning {
 		return nil, fmt.Errorf("pod is in invalid state %q: %v", newPod.Status.Phase, newPod.Status.Message)
 	}
+	condRun(opts.OnPodCreated)
 
 	// Wait for the pod to boot and its buildlet to come up.
 	var buildletURL string
diff --git a/cmd/coordinator/buildongce/create.go b/cmd/coordinator/buildongce/create.go
index 88e188e..e7f0b9d 100644
--- a/cmd/coordinator/buildongce/create.go
+++ b/cmd/coordinator/buildongce/create.go
@@ -103,14 +103,9 @@
     autoscalingPolicy:
       minNumReplicas: {{ .KubeMinNodes }}
       maxNumReplicas: {{ .KubeMaxNodes }}
-      coolDownPeriodSec: 600
-      customMetricUtilizations:
-        - metric: custom.cloudmonitoring.googleapis.com/cluster/cpu_used
-          utilizationTarget: .5
-          utilizationTargetType: GAUGE
-        - metric: custom.cloudmonitoring.googleapis.com/cluster/memory_used
-          utilizationTarget: .5
-          utilizationTargetType: GAUGE`
+      coolDownPeriodSec: 1200
+      cpuUtilization:
+        utilizationTarget: .6`
 
 func readFile(v string) string {
 	slurp, err := ioutil.ReadFile(v)
diff --git a/cmd/coordinator/buildongce/create_test.go b/cmd/coordinator/buildongce/create_test.go
index 20bf662..1d9e019 100644
--- a/cmd/coordinator/buildongce/create_test.go
+++ b/cmd/coordinator/buildongce/create_test.go
@@ -29,7 +29,7 @@
       logging_service: "logging.googleapis.com"
       monitoring_service: "none"
       node_config:
-        machine_type: "n1-standard-8"
+        machine_type: "n1-standard-16"
         oauth_scopes:
           - "https://www.googleapis.com/auth/cloud-platform"
       master_auth:
@@ -44,14 +44,9 @@
     autoscalingPolicy:
       minNumReplicas: 1
       maxNumReplicas: 5
-      coolDownPeriodSec: 600
-      customMetricUtilizations:
-        - metric: custom.cloudmonitoring.googleapis.com/cluster/cpu_used
-          utilizationTarget: .5
-          utilizationTargetType: GAUGE
-        - metric: custom.cloudmonitoring.googleapis.com/cluster/memory_used
-          utilizationTarget: .5
-          utilizationTargetType: GAUGE`},
+      coolDownPeriodSec: 1200
+      cpuUtilization:
+        utilizationTarget: .6`},
 	}
 	for _, test := range tests {
 		tpl, err := template.New("kube").Parse(kubeConfig)
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index f7f1f90..6878bb1 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -65,9 +65,15 @@
 	role          = flag.String("role", "coordinator", "Which role this binary should run as. Valid options: coordinator, watcher")
 	masterKeyFile = flag.String("masterkey", "", "Path to builder master key. Else fetched using GCE project attribute 'builder-master-key'.")
 
-	mode         = flag.String("mode", "", "Valid modes are 'dev', 'prod', or '' for auto-detect. dev means localhost development, not be confused with staging on go-dashboard-dev, which is still the 'prod' mode.")
-	buildEnvName = flag.String("env", "", "The build environment configuration to use. Not required if running on GCE, but will override GCE default config if set.")
-	devEnableGCE = flag.Bool("dev_gce", false, "Whether or not to enable the GCE pool when in dev mode. The pool is enabled by default in prod mode.")
+	// TODO(bradfitz): remove this list and just query it from the compute API:
+	// http://godoc.org/google.golang.org/api/compute/v1#RegionsService.Get
+	// and Region.Zones: http://godoc.org/google.golang.org/api/compute/v1#Region
+	cleanZones = flag.String("zones", "us-central1-a,us-central1-b,us-central1-f", "Comma-separated list of zones to periodically clean of stale build VMs (ones that failed to shut themselves down)")
+
+	mode            = flag.String("mode", "", "Valid modes are 'dev', 'prod', or '' for auto-detect. dev means localhost development, not be confused with staging on go-dashboard-dev, which is still the 'prod' mode.")
+	buildEnvName    = flag.String("env", "", "The build environment configuration to use. Not required if running on GCE.")
+	devEnableGCE    = flag.Bool("dev_gce", false, "Whether or not to enable the GCE pool when in dev mode. The pool is enabled by default in prod mode.")
+	enableDebugProd = flag.Bool("debug_prod", false, "Enable the /dosomework URL to manually schedule a build on a prod coordinator. Enabled by default in dev mode.")
 )
 
 // LOCK ORDER:
@@ -375,7 +381,7 @@
 	if isBuilding(rev) {
 		return false
 	}
-	if inStaging && numCurrentBuilds() != 0 {
+	if buildEnv.MaxBuilds > 0 && numCurrentBuilds() >= buildEnv.MaxBuilds {
 		return false
 	}
 	if strings.Contains(rev.name, "netbsd") {
diff --git a/cmd/coordinator/debug.go b/cmd/coordinator/debug.go
index 525312f..4e84fcb 100644
--- a/cmd/coordinator/debug.go
+++ b/cmd/coordinator/debug.go
@@ -11,6 +11,7 @@
 	"fmt"
 	"log"
 	"net/http"
+	"strconv"
 	"strings"
 	"text/template"
 
@@ -37,59 +38,80 @@
 		}
 
 		mode := strings.TrimPrefix(r.URL.Path, "/dosomework/")
-		log.Printf("looking for work: %q", mode)
+
+		count, err := strconv.Atoi(r.FormValue("count"))
+		if err != nil {
+			count = 1
+		}
+
+		// Cap number of jobs that can be scheduled from debug UI. If
+		// buildEnv.MaxBuilds is zero, there is no cap.
+		if buildEnv.MaxBuilds > 0 && count > buildEnv.MaxBuilds {
+			count = buildEnv.MaxBuilds
+		}
+		log.Printf("looking for %v work items for %q", count, mode)
 
 		w.Header().Set("Content-Type", "text/plain; charset=utf-8")
 		fmt.Fprintf(w, "looking for work for %s...\n", mode)
 		if f, ok := w.(http.Flusher); ok {
 			f.Flush()
 		}
-		rev, err := latestBuildableGoRev()
+		revs, err := latestBuildableGoRev(count)
 		if err != nil {
 			fmt.Fprintf(w, "cannot find revision: %v", err)
 			return
 		}
-		fmt.Fprintf(w, "found work: %s\n", rev)
-		work <- builderRev{name: mode, rev: rev}
+		fmt.Fprintf(w, "found work: %v\n", revs)
+		for _, rev := range revs {
+			work <- builderRev{name: mode, rev: rev}
+		}
 	}
 }
 
-func latestBuildableGoRev() (string, error) {
+// latestBuildableGoRev returns the specified number of most recent buildable
+// revisions. If there are not enough buildable revisions available to satisfy
+// the specified amount, unbuildable revisions will be used to meet the
+// specified count.
+func latestBuildableGoRev(count int) ([]string, error) {
 	var bs types.BuildStatus
+	var revisions []string
 	res, err := http.Get("https://build.golang.org/?mode=json")
 	if err != nil {
-		return "", err
+		return nil, err
 	}
 	defer res.Body.Close()
 	if err := json.NewDecoder(res.Body).Decode(&bs); err != nil {
-		return "", err
+		return nil, err
 	}
 	if res.StatusCode != 200 {
-		return "", fmt.Errorf("unexpected build.golang.org http status %v", res.Status)
+		return nil, fmt.Errorf("unexpected build.golang.org http status %v", res.Status)
 	}
-	// Find first "ok" revision.
+	// Find first count "ok" revisions
 	for _, br := range bs.Revisions {
 		if br.Repo == "go" {
-			ok := false
 			for _, res := range br.Results {
 				if res == "ok" {
-					ok = true
+					revisions = append(revisions, br.Revision)
 					break
 				}
 			}
-			if !ok {
-				continue
-			}
-			return br.Revision, nil
+		}
+		if len(revisions) == count {
+			return revisions, nil
 		}
 	}
-	// No "ok" revisions, return the first go revision.
+
+	// If there weren't enough "ok" revisions, add enough "not ok"
+	// revisions to satisfy count.
 	for _, br := range bs.Revisions {
 		if br.Repo == "go" {
-			return br.Revision, nil
+			revisions = append(revisions, br.Revision)
+			if len(revisions) == count {
+				return revisions, nil
+			}
 		}
 	}
-	return "", errors.New("no revisions on build.golang.org")
+	return nil, errors.New("no revisions on build.golang.org")
 }
 
 var tmplDoSomeWork = template.Must(template.New("").Parse(`
@@ -98,6 +120,6 @@
 {{range .}}
 <form action="/dosomework/{{.}}" method="POST"><button>{{.}}</button></form><br\>
 {{end}}
-<form action="/dosomework/linux-amd64-kube" method="POST"><button>linux-amd64-kube</button></form><br\>
+<form action="/dosomework/linux-amd64-kube" method="POST"><input type="text" name="count" id="count" value="1"></input><button>linux-amd64-kube</button></form><br\>
 </body></html>
 `))
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index bb6ed6e..2fedd6c 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -45,6 +45,7 @@
 	initKubeCalled    bool
 	registryPrefix    = "gcr.io"
 	kubeCluster       *container.Cluster
+	nodeCount         int
 )
 
 const (
@@ -53,6 +54,15 @@
 	memoryUsedMetric    = "custom.cloudmonitoring.googleapis.com/cluster/memory_used" // % of available memory in the cluster that is scheduled
 	serviceLabelKey     = "cloud.googleapis.com/service"                              // allow selection of custom metric based on service name
 	clusterNameLabelKey = "custom.cloudmonitoring.googleapis.com/cluster_name"        // allow selection of custom metric based on cluster name
+	// This is a conservative estimate of the amount of time required for a
+	// build to run, exclusive of scheduling time. It is used to calculate
+	// a worst-case estimate of the time for a build to complete, freeing
+	// up available resources for the next build to run. This would ideally
+	// be adjusted based on observed values during the lifetime of the
+	// coordinator process.
+	timePerBuild       = 10 * time.Minute // estimated time in minutes to complete a build
+	timeToSchedule     = 90 * time.Second // estimated time for pod to enter running state when cluster resources are available
+	clusterCPUOverhead = 2                // Number of CPUs per node required by the system
 )
 
 // initGCE must be called before initKube
@@ -137,22 +147,45 @@
 	return nil
 }
 
-var kubePool = &kubeBuildletPool{
-	cpuCapacity:    api.NewQuantity(0, api.DecimalSI),
-	cpuUsage:       api.NewQuantity(0, api.DecimalSI),
-	memoryCapacity: api.NewQuantity(0, api.BinarySI),
-	memoryUsage:    api.NewQuantity(0, api.BinarySI),
-}
-
 // kubeBuildletPool is the Kubernetes buildlet pool.
 type kubeBuildletPool struct {
 	mu sync.Mutex // guards all following
 
-	pods           map[string]time.Time // pod instance name -> creationTime
-	cpuCapacity    *api.Quantity        // cpu capacity as reported by the Kubernetes api
-	memoryCapacity *api.Quantity
-	cpuUsage       *api.Quantity
-	memoryUsage    *api.Quantity
+	pods             map[string]podHistory // pod instance name -> podHistory
+	clusterResources *kubeResource         // cpu and memory resources of the Kubernetes cluster
+	pendingResources *kubeResource         // cpu and memory resources waiting to be scheduled
+	runningResources *kubeResource         // cpu and memory resources already running (periodically updated from API)
+}
+
+var kubePool = &kubeBuildletPool{
+	clusterResources: &kubeResource{
+		cpu:    api.NewQuantity(0, api.DecimalSI),
+		memory: api.NewQuantity(0, api.BinarySI),
+	},
+	pendingResources: &kubeResource{
+		cpu:    api.NewQuantity(0, api.DecimalSI),
+		memory: api.NewQuantity(0, api.BinarySI),
+	},
+	runningResources: &kubeResource{
+		cpu:    api.NewQuantity(0, api.DecimalSI),
+		memory: api.NewQuantity(0, api.BinarySI),
+	},
+}
+
+type kubeResource struct {
+	cpu    *api.Quantity
+	memory *api.Quantity
+}
+
+type podHistory struct {
+	requestedAt   time.Time
+	estReadyAt    time.Time
+	actualReadyAt time.Time
+	deletedAt     time.Time
+}
+
+func (p podHistory) String() string {
+	return fmt.Sprintf("requested at %v, estimated ready at %v, actual ready at %v, deleted at %v", p.requestedAt, p.estReadyAt, p.actualReadyAt, p.deletedAt)
 }
 
 func tryCreateMetrics() {
@@ -205,7 +238,21 @@
 	ctx := context.Background()
 	for {
 		p.pollCapacity(ctx)
-		time.Sleep(30 * time.Second)
+		time.Sleep(15 * time.Second)
+	}
+}
+
+// Returns a worst-case estimate of the amount of time before a pod
+// would enter the running state if it were scheduled now.
+func (p *kubeBuildletPool) estTimeToStartPod(ctx context.Context) time.Duration {
+	p.pollCapacity(ctx)
+	scheduledCores := p.pendingResources.cpu.Value() + p.runningResources.cpu.Value()    // Running or pending cores
+	clusterCores := p.clusterResources.cpu.Value() - int64(clusterCPUOverhead*nodeCount) // Cores in cluster less system requirements
+
+	if buildlet.BuildletCPU.Value()+scheduledCores < clusterCores {
+		return timeToSchedule
+	} else {
+		return time.Second * time.Duration(float64(scheduledCores)/float64(clusterCores)*timePerBuild.Seconds())
 	}
 }
 
@@ -222,40 +269,67 @@
 	}
 
 	p.mu.Lock()
-	defer p.mu.Unlock()
+	// Calculate the total provisioned, pending, and running CPU and memory
+	// in the cluster
+	provisioned := &kubeResource{
+		cpu:    api.NewQuantity(0, api.DecimalSI),
+		memory: api.NewQuantity(0, api.BinarySI),
+	}
+	running := &kubeResource{
+		cpu:    api.NewQuantity(0, api.DecimalSI),
+		memory: api.NewQuantity(0, api.BinarySI),
+	}
+	pending := &kubeResource{
+		cpu:    api.NewQuantity(0, api.DecimalSI),
+		memory: api.NewQuantity(0, api.BinarySI),
+	}
 
-	// Calculate the total CPU and memory used by pods in the cluster
-	var sumCPUUsed = api.NewQuantity(0, api.DecimalSI)
-	var sumMemoryUsed = api.NewQuantity(0, api.BinarySI)
-	for _, p := range pods {
-		for _, c := range p.Spec.Containers {
-			sumCPUUsed.Add(c.Resources.Requests[api.ResourceCPU])
-			sumMemoryUsed.Add(c.Resources.Requests[api.ResourceMemory])
+	// Resources used by running and pending pods
+	var resourceCounter *kubeResource
+	for _, pod := range pods {
+		switch pod.Status.Phase {
+		case api.PodPending:
+			resourceCounter = pending
+		case api.PodRunning:
+			resourceCounter = running
+		}
+		for _, c := range pod.Spec.Containers {
+			// The Kubernetes API rarely, but can, return a response
+			// with an empty Requests map. Check to be sure...
+			if _, ok := c.Resources.Requests[api.ResourceCPU]; ok {
+				resourceCounter.cpu.Add(c.Resources.Requests[api.ResourceCPU])
+			}
+			if _, ok := c.Resources.Requests[api.ResourceMemory]; ok {
+				resourceCounter.memory.Add(c.Resources.Requests[api.ResourceMemory])
+			}
 		}
 	}
-	p.cpuUsage = sumCPUUsed
-	p.memoryUsage = sumMemoryUsed
+	p.runningResources = running
+	p.pendingResources = pending
 
-	// Calculate the total CPU and memory capacity of the cluster
-	var sumCPUCapacity = api.NewQuantity(0, api.DecimalSI)
-	var sumMemoryCapacity = api.NewQuantity(0, api.BinarySI)
+	// Resources provisioned to the cluster
+	nodeCount = len(nodes)
 	for _, n := range nodes {
-		sumCPUCapacity.Add(n.Status.Capacity[api.ResourceCPU])
-		sumMemoryCapacity.Add(n.Status.Capacity[api.ResourceMemory])
+		provisioned.cpu.Add(n.Status.Capacity[api.ResourceCPU])
+		provisioned.memory.Add(n.Status.Capacity[api.ResourceMemory])
 	}
-	p.cpuCapacity = sumCPUCapacity
-	p.memoryCapacity = sumMemoryCapacity
+	p.clusterResources = provisioned
+	p.mu.Unlock()
 
-	// Calculate the % of CPU and memory consumed
-	pctCPUConsumed := float64(p.cpuUsage.Value()) / float64(p.cpuCapacity.Value())
-	pctMemoryConsumed := float64(p.memoryUsage.Value()) / float64(p.memoryCapacity.Value())
+	// Estimate the time it would take for a build pod to be scheduled
+	// in the current resource environment
+
+	// Calculate requested CPU and memory (both running and pending pods) vs
+	// provisioned capacity in the cluster.
+	pctCPUWanted := float64(p.pendingResources.cpu.Value()+p.runningResources.cpu.Value()) / float64(p.clusterResources.cpu.Value())
+	pctMemoryWanted := float64(p.pendingResources.memory.Value()+p.runningResources.memory.Value()) / float64(p.clusterResources.memory.Value())
 	t := time.Now().Format(time.RFC3339)
 
 	wtr := monitoring.WriteTimeseriesRequest{
 		Timeseries: []*monitoring.TimeseriesPoint{
 			{
 				Point: &monitoring.Point{
-					DoubleValue: &pctCPUConsumed,
+					DoubleValue: &pctCPUWanted,
 					Start:       t,
 					End:         t,
 				},
@@ -270,7 +344,7 @@
 			},
 			{
 				Point: &monitoring.Point{
-					DoubleValue: &pctMemoryConsumed,
+					DoubleValue: &pctMemoryWanted,
 					Start:       t,
 					End:         t,
 				},
@@ -311,6 +385,10 @@
 
 	podName := "buildlet-" + typ + "-rn" + randHex(7)
 
+	// Get an estimate for when the pod will be started/running and set
+	// the context timeout based on that
+	estReady := p.estTimeToStartPod(ctx)
+	ctx, _ = context.WithTimeout(ctx, estReady)
 	var needDelete bool
 
 	lg.logEventTime("creating_kube_pod", podName)
@@ -321,10 +399,15 @@
 		ImageRegistry: registryPrefix,
 		Description:   fmt.Sprintf("Go Builder for %s at %s", typ),
 		DeleteIn:      deleteIn,
+		OnPodCreating: func() {
+			lg.logEventTime("pod_creating")
+			p.setPodUsed(podName, true)
+			p.updatePodHistory(podName, podHistory{requestedAt: time.Now(), estReadyAt: time.Now().Add(estReady)})
+			needDelete = true
+		},
 		OnPodCreated: func() {
 			lg.logEventTime("pod_created")
-			p.setPodUsed(podName, true)
-			needDelete = true
+			p.updatePodHistory(podName, podHistory{actualReadyAt: time.Now()})
 		},
 		OnGotPodInfo: func() {
 			lg.logEventTime("got_pod_info", "waiting_for_buildlet...")
@@ -335,7 +418,9 @@
 
 		if needDelete {
 			log.Printf("Deleting failed pod %q", podName)
-			kubeClient.DeletePod(ctx, podName)
+			if err := kubeClient.DeletePod(context.Background(), podName); err != nil {
+				log.Printf("Error deleting pod %q: %v", podName, err)
+			}
 			p.setPodUsed(podName, false)
 		}
 		return nil, err
@@ -348,7 +433,7 @@
 	// running the buildlet.
 	go func() {
 		<-ctx.Done()
-		log.Printf("Deleting pod %q after build context cancel received ", podName)
+		log.Printf("Deleting pod %q after build context completed", podName)
 		// Giving DeletePod a new context here as the build ctx has been canceled
 		kubeClient.DeletePod(context.Background(), podName)
 		p.setPodUsed(podName, false)
@@ -377,31 +462,52 @@
 func (p *kubeBuildletPool) capacityString() string {
 	p.mu.Lock()
 	defer p.mu.Unlock()
-	return fmt.Sprintf("%v/%v CPUs; %v/%v Memory",
-		p.cpuUsage, p.cpuCapacity,
-		p.memoryUsage, p.memoryCapacity)
+	return fmt.Sprintf("<ul><li>%v CPUs running, %v CPUs pending, %v total CPUs in cluster</li><li>%v memory running, %v memory pending, %v total memory in cluster</li></ul>",
+		p.runningResources.cpu, p.pendingResources.cpu, p.clusterResources.cpu,
+		p.runningResources.memory, p.pendingResources.memory, p.clusterResources.memory)
 }
 
 func (p *kubeBuildletPool) setPodUsed(podName string, used bool) {
 	p.mu.Lock()
 	defer p.mu.Unlock()
 	if p.pods == nil {
-		p.pods = make(map[string]time.Time)
+		p.pods = make(map[string]podHistory)
 	}
 	if used {
-		p.pods[podName] = time.Now()
-		// Track cpu and memory usage
-		p.cpuUsage.Add(buildlet.BuildletCPU)
-		p.memoryUsage.Add(buildlet.BuildletMemory)
+		p.pods[podName] = podHistory{requestedAt: time.Now()}
 
 	} else {
+		p.pods[podName] = podHistory{deletedAt: time.Now()}
+		// TODO(evanbrown): log this podHistory data for analytics purposes before deleting
 		delete(p.pods, podName)
-		// Track cpu and memory usage
-		p.cpuUsage.Sub(buildlet.BuildletCPU)
-		p.memoryUsage.Sub(buildlet.BuildletMemory)
 	}
 }
 
+func (p *kubeBuildletPool) updatePodHistory(podName string, updatedHistory podHistory) error {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+
+	ph, ok := p.pods[podName]
+	if !ok {
+		return fmt.Errorf("pod %q does not exist", podName)
+	}
+
+	if !updatedHistory.actualReadyAt.IsZero() {
+		ph.actualReadyAt = updatedHistory.actualReadyAt
+	}
+	if !updatedHistory.estReadyAt.IsZero() {
+		ph.estReadyAt = updatedHistory.estReadyAt
+	}
+	if !updatedHistory.requestedAt.IsZero() {
+		ph.requestedAt = updatedHistory.requestedAt
+	}
+	if !updatedHistory.deletedAt.IsZero() {
+		ph.deletedAt = updatedHistory.deletedAt
+	}
+	p.pods[podName] = ph
+	return nil
+}
+
 func (p *kubeBuildletPool) podUsed(podName string) bool {
 	p.mu.Lock()
 	defer p.mu.Unlock()
@@ -412,10 +518,10 @@
 func (p *kubeBuildletPool) podsActive() (ret []resourceTime) {
 	p.mu.Lock()
 	defer p.mu.Unlock()
-	for name, create := range p.pods {
+	for name, ph := range p.pods {
 		ret = append(ret, resourceTime{
 			name:     name,
-			creation: create,
+			creation: ph.requestedAt,
 		})
 	}
 	sort.Sort(byCreationTime(ret))
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 5767465..16c2c1f 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -53,8 +53,7 @@
 // Run creates a new pod resource in the default pod namespace with
 // the given pod API specification.
 // It returns the pod status once it has entered the Running phase.
-// An error is returned if the pod can not be created, if it does
-// does not enter the running phase within 2 minutes, or if ctx.Done
+// An error is returned if the pod can not be created, or if ctx.Done
 // is closed.
 func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.Pod, error) {
 	var podJSON bytes.Buffer
@@ -82,17 +81,27 @@
 	if err := json.Unmarshal(body, &podResult); err != nil {
 		return nil, fmt.Errorf("failed to decode pod resources: %v", err)
 	}
-	ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
-	defer cancel()
 
-	createdPod, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
-	if err != nil {
-		// The pod did not leave the pending state. We should try to manually delete it before
-		// returning an error.
-		c.DeletePod(context.Background(), podResult.Name)
-		return nil, fmt.Errorf("timed out waiting for pod %q to leave pending state: %v", pod.Name, err)
+	retryWait := 1
+	retryMax := retryWait << 3 // retry 3 times
+	for {
+		createdPod, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
+		if err != nil {
+			if err == context.Canceled {
+				return nil, err
+			}
+			if retryWait < retryMax { // retry
+				time.Sleep(time.Duration(retryWait) * time.Second)
+				retryWait = retryWait << 1
+				continue
+			}
+			// The pod did not leave the pending state. We should try to manually delete it before
+			// returning an error.
+			c.DeletePod(context.Background(), podResult.Name)
+			return nil, fmt.Errorf("waiting for pod %q to leave pending state: %v", pod.Name, err)
+		}
+		return createdPod, nil
 	}
-	return createdPod, nil
 }
 
 // GetPods returns all pods in the cluster, regardless of status.
@@ -219,11 +228,14 @@
 			return
 		}
 		res, err := ctxhttp.Do(ctx, c.httpClient, req)
-		defer res.Body.Close()
 		if err != nil {
-			statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
+			if err != context.Canceled {
+				statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
+			}
+			statusChan <- PodStatusResult{Err: err} // context.Canceled
 			return
 		}
+		defer res.Body.Close()
 
 		var wps watchPodStatus
 		reader := bufio.NewReader(res.Body)