all: kubernetes builder autoscaling
Improvements to support rapid scheduling of many build jobs:
- Retry logic in Kubernetes client to handle sporadic connection
closes from their API server under heavy load
- Cluster autoscaler scales on default CPU utilization metric
- Debug mode allows scheduling multiple builds to test scaling
- Account for scheduled vs. provisioned resources in a cluster
and use that information to estimate when a build's pod
will be scheduled and in running state
- Use estimated scheduled time to set context timeout
- Track pod lifecycle (requested time, estimated available time,
actual available time, terminate time, etc)
Change-Id: I14d6c5e01af0970dbb3390a29d1ee5c43049fff8
Reviewed-on: https://go-review.googlesource.com/19524
Reviewed-by: Brad Fitzpatrick <bradfitz@golang.org>
diff --git a/buildenv/envs.go b/buildenv/envs.go
index 6e55c8d..733a6b2 100644
--- a/buildenv/envs.go
+++ b/buildenv/envs.go
@@ -84,6 +84,12 @@
// The GCS bucket that snapshots are written to.
SnapBucket string
+
+ // The maximum number of concurrent builds that can run.
+ // The zero value indicates unlimited builds and is the default.
+ // MaxBuilds is typically used to limit the number of builds in
+ // a development or staging environment.
+ MaxBuilds int
}
// MachineTypeURI returns the URI for the environment's Machine Type.
diff --git a/buildlet/kube.go b/buildlet/kube.go
index 1d57351..91af442 100644
--- a/buildlet/kube.go
+++ b/buildlet/kube.go
@@ -51,11 +51,16 @@
// to delete the pod.
DeleteIn time.Duration
- // OnPodCreated optionally specifies a hook to run synchronously
- // after the pod operation succeeds.
- OnPodCreated func()
+ // OnPodCreating optionally specifies a hook to run synchronously
+ // after the pod create request has been made, but before the create
+ // has suceeded.
+ OnPodCreating func()
// OnPodCreated optionally specifies a hook to run synchronously
+ // after the pod create request succeeds.
+ OnPodCreated func()
+
+ // OnGotPodInfo optionally specifies a hook to run synchronously
// after the pod Get call.
OnGotPodInfo func()
}
@@ -144,17 +149,18 @@
pod.ObjectMeta.Annotations["delete-at"] = fmt.Sprint(time.Now().Add(opts.DeleteIn).Unix())
}
+ condRun(opts.OnPodCreating)
newPod, err := kubeClient.RunPod(ctx, pod)
if err != nil {
- return nil, fmt.Errorf("pod could not be created: %v", err)
+ return nil, err
}
- condRun(opts.OnPodCreated)
// The new pod must be in Running phase. Possible phases are described at
// http://releases.k8s.io/HEAD/docs/user-guide/pod-states.md#pod-phase
if newPod.Status.Phase != api.PodRunning {
return nil, fmt.Errorf("pod is in invalid state %q: %v", newPod.Status.Phase, newPod.Status.Message)
}
+ condRun(opts.OnPodCreated)
// Wait for the pod to boot and its buildlet to come up.
var buildletURL string
diff --git a/cmd/coordinator/buildongce/create.go b/cmd/coordinator/buildongce/create.go
index 88e188e..e7f0b9d 100644
--- a/cmd/coordinator/buildongce/create.go
+++ b/cmd/coordinator/buildongce/create.go
@@ -103,14 +103,9 @@
autoscalingPolicy:
minNumReplicas: {{ .KubeMinNodes }}
maxNumReplicas: {{ .KubeMaxNodes }}
- coolDownPeriodSec: 600
- customMetricUtilizations:
- - metric: custom.cloudmonitoring.googleapis.com/cluster/cpu_used
- utilizationTarget: .5
- utilizationTargetType: GAUGE
- - metric: custom.cloudmonitoring.googleapis.com/cluster/memory_used
- utilizationTarget: .5
- utilizationTargetType: GAUGE`
+ coolDownPeriodSec: 1200
+ cpuUtilization:
+ utilizationTarget: .6`
func readFile(v string) string {
slurp, err := ioutil.ReadFile(v)
diff --git a/cmd/coordinator/buildongce/create_test.go b/cmd/coordinator/buildongce/create_test.go
index 20bf662..1d9e019 100644
--- a/cmd/coordinator/buildongce/create_test.go
+++ b/cmd/coordinator/buildongce/create_test.go
@@ -29,7 +29,7 @@
logging_service: "logging.googleapis.com"
monitoring_service: "none"
node_config:
- machine_type: "n1-standard-8"
+ machine_type: "n1-standard-16"
oauth_scopes:
- "https://www.googleapis.com/auth/cloud-platform"
master_auth:
@@ -44,14 +44,9 @@
autoscalingPolicy:
minNumReplicas: 1
maxNumReplicas: 5
- coolDownPeriodSec: 600
- customMetricUtilizations:
- - metric: custom.cloudmonitoring.googleapis.com/cluster/cpu_used
- utilizationTarget: .5
- utilizationTargetType: GAUGE
- - metric: custom.cloudmonitoring.googleapis.com/cluster/memory_used
- utilizationTarget: .5
- utilizationTargetType: GAUGE`},
+ coolDownPeriodSec: 1200
+ cpuUtilization:
+ utilizationTarget: .6`},
}
for _, test := range tests {
tpl, err := template.New("kube").Parse(kubeConfig)
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index f7f1f90..6878bb1 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -65,9 +65,15 @@
role = flag.String("role", "coordinator", "Which role this binary should run as. Valid options: coordinator, watcher")
masterKeyFile = flag.String("masterkey", "", "Path to builder master key. Else fetched using GCE project attribute 'builder-master-key'.")
- mode = flag.String("mode", "", "Valid modes are 'dev', 'prod', or '' for auto-detect. dev means localhost development, not be confused with staging on go-dashboard-dev, which is still the 'prod' mode.")
- buildEnvName = flag.String("env", "", "The build environment configuration to use. Not required if running on GCE, but will override GCE default config if set.")
- devEnableGCE = flag.Bool("dev_gce", false, "Whether or not to enable the GCE pool when in dev mode. The pool is enabled by default in prod mode.")
+ // TODO(bradfitz): remove this list and just query it from the compute API:
+ // http://godoc.org/google.golang.org/api/compute/v1#RegionsService.Get
+ // and Region.Zones: http://godoc.org/google.golang.org/api/compute/v1#Region
+ cleanZones = flag.String("zones", "us-central1-a,us-central1-b,us-central1-f", "Comma-separated list of zones to periodically clean of stale build VMs (ones that failed to shut themselves down)")
+
+ mode = flag.String("mode", "", "Valid modes are 'dev', 'prod', or '' for auto-detect. dev means localhost development, not be confused with staging on go-dashboard-dev, which is still the 'prod' mode.")
+ buildEnvName = flag.String("env", "", "The build environment configuration to use. Not required if running on GCE.")
+ devEnableGCE = flag.Bool("dev_gce", false, "Whether or not to enable the GCE pool when in dev mode. The pool is enabled by default in prod mode.")
+ enableDebugProd = flag.Bool("debug_prod", false, "Enable the /dosomework URL to manually schedule a build on a prod coordinator. Enabled by default in dev mode.")
)
// LOCK ORDER:
@@ -375,7 +381,7 @@
if isBuilding(rev) {
return false
}
- if inStaging && numCurrentBuilds() != 0 {
+ if buildEnv.MaxBuilds > 0 && numCurrentBuilds() >= buildEnv.MaxBuilds {
return false
}
if strings.Contains(rev.name, "netbsd") {
diff --git a/cmd/coordinator/debug.go b/cmd/coordinator/debug.go
index 525312f..4e84fcb 100644
--- a/cmd/coordinator/debug.go
+++ b/cmd/coordinator/debug.go
@@ -11,6 +11,7 @@
"fmt"
"log"
"net/http"
+ "strconv"
"strings"
"text/template"
@@ -37,59 +38,80 @@
}
mode := strings.TrimPrefix(r.URL.Path, "/dosomework/")
- log.Printf("looking for work: %q", mode)
+
+ count, err := strconv.Atoi(r.FormValue("count"))
+ if err != nil {
+ count = 1
+ }
+
+ // Cap number of jobs that can be scheduled from debug UI. If
+ // buildEnv.MaxBuilds is zero, there is no cap.
+ if buildEnv.MaxBuilds > 0 && count > buildEnv.MaxBuilds {
+ count = buildEnv.MaxBuilds
+ }
+ log.Printf("looking for %v work items for %q", count, mode)
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
fmt.Fprintf(w, "looking for work for %s...\n", mode)
if f, ok := w.(http.Flusher); ok {
f.Flush()
}
- rev, err := latestBuildableGoRev()
+ revs, err := latestBuildableGoRev(count)
if err != nil {
fmt.Fprintf(w, "cannot find revision: %v", err)
return
}
- fmt.Fprintf(w, "found work: %s\n", rev)
- work <- builderRev{name: mode, rev: rev}
+ fmt.Fprintf(w, "found work: %v\n", revs)
+ for _, rev := range revs {
+ work <- builderRev{name: mode, rev: rev}
+ }
}
}
-func latestBuildableGoRev() (string, error) {
+// latestBuildableGoRev returns the specified number of most recent buildable
+// revisions. If there are not enough buildable revisions available to satisfy
+// the specified amount, unbuildable revisions will be used to meet the
+// specified count.
+func latestBuildableGoRev(count int) ([]string, error) {
var bs types.BuildStatus
+ var revisions []string
res, err := http.Get("https://build.golang.org/?mode=json")
if err != nil {
- return "", err
+ return nil, err
}
defer res.Body.Close()
if err := json.NewDecoder(res.Body).Decode(&bs); err != nil {
- return "", err
+ return nil, err
}
if res.StatusCode != 200 {
- return "", fmt.Errorf("unexpected build.golang.org http status %v", res.Status)
+ return nil, fmt.Errorf("unexpected build.golang.org http status %v", res.Status)
}
- // Find first "ok" revision.
+ // Find first count "ok" revisions
for _, br := range bs.Revisions {
if br.Repo == "go" {
- ok := false
for _, res := range br.Results {
if res == "ok" {
- ok = true
+ revisions = append(revisions, br.Revision)
break
}
}
- if !ok {
- continue
- }
- return br.Revision, nil
+ }
+ if len(revisions) == count {
+ return revisions, nil
}
}
- // No "ok" revisions, return the first go revision.
+
+ // If there weren't enough "ok" revisions, add enough "not ok"
+ // revisions to satisfy count.
for _, br := range bs.Revisions {
if br.Repo == "go" {
- return br.Revision, nil
+ revisions = append(revisions, br.Revision)
+ if len(revisions) == count {
+ return revisions, nil
+ }
}
}
- return "", errors.New("no revisions on build.golang.org")
+ return nil, errors.New("no revisions on build.golang.org")
}
var tmplDoSomeWork = template.Must(template.New("").Parse(`
@@ -98,6 +120,6 @@
{{range .}}
<form action="/dosomework/{{.}}" method="POST"><button>{{.}}</button></form><br\>
{{end}}
-<form action="/dosomework/linux-amd64-kube" method="POST"><button>linux-amd64-kube</button></form><br\>
+<form action="/dosomework/linux-amd64-kube" method="POST"><input type="text" name="count" id="count" value="1"></input><button>linux-amd64-kube</button></form><br\>
</body></html>
`))
diff --git a/cmd/coordinator/kube.go b/cmd/coordinator/kube.go
index bb6ed6e..2fedd6c 100644
--- a/cmd/coordinator/kube.go
+++ b/cmd/coordinator/kube.go
@@ -45,6 +45,7 @@
initKubeCalled bool
registryPrefix = "gcr.io"
kubeCluster *container.Cluster
+ nodeCount int
)
const (
@@ -53,6 +54,15 @@
memoryUsedMetric = "custom.cloudmonitoring.googleapis.com/cluster/memory_used" // % of available memory in the cluster that is scheduled
serviceLabelKey = "cloud.googleapis.com/service" // allow selection of custom metric based on service name
clusterNameLabelKey = "custom.cloudmonitoring.googleapis.com/cluster_name" // allow selection of custom metric based on cluster name
+ // This is a conservative estimate of the amount of time required for a
+ // build to run, exclusive of scheduling time. It is used to calculate
+ // a worst-case estimate of the time for a build to complete, freeing
+ // up available resources for the next build to run. This would ideally
+ // be adjusted based on observed values during the lifetime of the
+ // coordinator process.
+ timePerBuild = 10 * time.Minute // estimated time in minutes to complete a build
+ timeToSchedule = 90 * time.Second // estimated time for pod to enter running state when cluster resources are available
+ clusterCPUOverhead = 2 // Number of CPUs per node required by the system
)
// initGCE must be called before initKube
@@ -137,22 +147,45 @@
return nil
}
-var kubePool = &kubeBuildletPool{
- cpuCapacity: api.NewQuantity(0, api.DecimalSI),
- cpuUsage: api.NewQuantity(0, api.DecimalSI),
- memoryCapacity: api.NewQuantity(0, api.BinarySI),
- memoryUsage: api.NewQuantity(0, api.BinarySI),
-}
-
// kubeBuildletPool is the Kubernetes buildlet pool.
type kubeBuildletPool struct {
mu sync.Mutex // guards all following
- pods map[string]time.Time // pod instance name -> creationTime
- cpuCapacity *api.Quantity // cpu capacity as reported by the Kubernetes api
- memoryCapacity *api.Quantity
- cpuUsage *api.Quantity
- memoryUsage *api.Quantity
+ pods map[string]podHistory // pod instance name -> podHistory
+ clusterResources *kubeResource // cpu and memory resources of the Kubernetes cluster
+ pendingResources *kubeResource // cpu and memory resources waiting to be scheduled
+ runningResources *kubeResource // cpu and memory resources already running (periodically updated from API)
+}
+
+var kubePool = &kubeBuildletPool{
+ clusterResources: &kubeResource{
+ cpu: api.NewQuantity(0, api.DecimalSI),
+ memory: api.NewQuantity(0, api.BinarySI),
+ },
+ pendingResources: &kubeResource{
+ cpu: api.NewQuantity(0, api.DecimalSI),
+ memory: api.NewQuantity(0, api.BinarySI),
+ },
+ runningResources: &kubeResource{
+ cpu: api.NewQuantity(0, api.DecimalSI),
+ memory: api.NewQuantity(0, api.BinarySI),
+ },
+}
+
+type kubeResource struct {
+ cpu *api.Quantity
+ memory *api.Quantity
+}
+
+type podHistory struct {
+ requestedAt time.Time
+ estReadyAt time.Time
+ actualReadyAt time.Time
+ deletedAt time.Time
+}
+
+func (p podHistory) String() string {
+ return fmt.Sprintf("requested at %v, estimated ready at %v, actual ready at %v, deleted at %v", p.requestedAt, p.estReadyAt, p.actualReadyAt, p.deletedAt)
}
func tryCreateMetrics() {
@@ -205,7 +238,21 @@
ctx := context.Background()
for {
p.pollCapacity(ctx)
- time.Sleep(30 * time.Second)
+ time.Sleep(15 * time.Second)
+ }
+}
+
+// Returns a worst-case estimate of the amount of time before a pod
+// would enter the running state if it were scheduled now.
+func (p *kubeBuildletPool) estTimeToStartPod(ctx context.Context) time.Duration {
+ p.pollCapacity(ctx)
+ scheduledCores := p.pendingResources.cpu.Value() + p.runningResources.cpu.Value() // Running or pending cores
+ clusterCores := p.clusterResources.cpu.Value() - int64(clusterCPUOverhead*nodeCount) // Cores in cluster less system requirements
+
+ if buildlet.BuildletCPU.Value()+scheduledCores < clusterCores {
+ return timeToSchedule
+ } else {
+ return time.Second * time.Duration(float64(scheduledCores)/float64(clusterCores)*timePerBuild.Seconds())
}
}
@@ -222,40 +269,67 @@
}
p.mu.Lock()
- defer p.mu.Unlock()
+ // Calculate the total provisioned, pending, and running CPU and memory
+ // in the cluster
+ provisioned := &kubeResource{
+ cpu: api.NewQuantity(0, api.DecimalSI),
+ memory: api.NewQuantity(0, api.BinarySI),
+ }
+ running := &kubeResource{
+ cpu: api.NewQuantity(0, api.DecimalSI),
+ memory: api.NewQuantity(0, api.BinarySI),
+ }
+ pending := &kubeResource{
+ cpu: api.NewQuantity(0, api.DecimalSI),
+ memory: api.NewQuantity(0, api.BinarySI),
+ }
- // Calculate the total CPU and memory used by pods in the cluster
- var sumCPUUsed = api.NewQuantity(0, api.DecimalSI)
- var sumMemoryUsed = api.NewQuantity(0, api.BinarySI)
- for _, p := range pods {
- for _, c := range p.Spec.Containers {
- sumCPUUsed.Add(c.Resources.Requests[api.ResourceCPU])
- sumMemoryUsed.Add(c.Resources.Requests[api.ResourceMemory])
+ // Resources used by running and pending pods
+ var resourceCounter *kubeResource
+ for _, pod := range pods {
+ switch pod.Status.Phase {
+ case api.PodPending:
+ resourceCounter = pending
+ case api.PodRunning:
+ resourceCounter = running
+ }
+ for _, c := range pod.Spec.Containers {
+ // The Kubernetes API rarely, but can, return a response
+ // with an empty Requests map. Check to be sure...
+ if _, ok := c.Resources.Requests[api.ResourceCPU]; ok {
+ resourceCounter.cpu.Add(c.Resources.Requests[api.ResourceCPU])
+ }
+ if _, ok := c.Resources.Requests[api.ResourceMemory]; ok {
+ resourceCounter.memory.Add(c.Resources.Requests[api.ResourceMemory])
+ }
}
}
- p.cpuUsage = sumCPUUsed
- p.memoryUsage = sumMemoryUsed
+ p.runningResources = running
+ p.pendingResources = pending
- // Calculate the total CPU and memory capacity of the cluster
- var sumCPUCapacity = api.NewQuantity(0, api.DecimalSI)
- var sumMemoryCapacity = api.NewQuantity(0, api.BinarySI)
+ // Resources provisioned to the cluster
+ nodeCount = len(nodes)
for _, n := range nodes {
- sumCPUCapacity.Add(n.Status.Capacity[api.ResourceCPU])
- sumMemoryCapacity.Add(n.Status.Capacity[api.ResourceMemory])
+ provisioned.cpu.Add(n.Status.Capacity[api.ResourceCPU])
+ provisioned.memory.Add(n.Status.Capacity[api.ResourceMemory])
}
- p.cpuCapacity = sumCPUCapacity
- p.memoryCapacity = sumMemoryCapacity
+ p.clusterResources = provisioned
+ p.mu.Unlock()
- // Calculate the % of CPU and memory consumed
- pctCPUConsumed := float64(p.cpuUsage.Value()) / float64(p.cpuCapacity.Value())
- pctMemoryConsumed := float64(p.memoryUsage.Value()) / float64(p.memoryCapacity.Value())
+ // Estimate the time it would take for a build pod to be scheduled
+ // in the current resource environment
+
+ // Calculate requested CPU and memory (both running and pending pods) vs
+ // provisioned capacity in the cluster.
+ pctCPUWanted := float64(p.pendingResources.cpu.Value()+p.runningResources.cpu.Value()) / float64(p.clusterResources.cpu.Value())
+ pctMemoryWanted := float64(p.pendingResources.memory.Value()+p.runningResources.memory.Value()) / float64(p.clusterResources.memory.Value())
t := time.Now().Format(time.RFC3339)
wtr := monitoring.WriteTimeseriesRequest{
Timeseries: []*monitoring.TimeseriesPoint{
{
Point: &monitoring.Point{
- DoubleValue: &pctCPUConsumed,
+ DoubleValue: &pctCPUWanted,
Start: t,
End: t,
},
@@ -270,7 +344,7 @@
},
{
Point: &monitoring.Point{
- DoubleValue: &pctMemoryConsumed,
+ DoubleValue: &pctMemoryWanted,
Start: t,
End: t,
},
@@ -311,6 +385,10 @@
podName := "buildlet-" + typ + "-rn" + randHex(7)
+ // Get an estimate for when the pod will be started/running and set
+ // the context timeout based on that
+ estReady := p.estTimeToStartPod(ctx)
+ ctx, _ = context.WithTimeout(ctx, estReady)
var needDelete bool
lg.logEventTime("creating_kube_pod", podName)
@@ -321,10 +399,15 @@
ImageRegistry: registryPrefix,
Description: fmt.Sprintf("Go Builder for %s at %s", typ),
DeleteIn: deleteIn,
+ OnPodCreating: func() {
+ lg.logEventTime("pod_creating")
+ p.setPodUsed(podName, true)
+ p.updatePodHistory(podName, podHistory{requestedAt: time.Now(), estReadyAt: time.Now().Add(estReady)})
+ needDelete = true
+ },
OnPodCreated: func() {
lg.logEventTime("pod_created")
- p.setPodUsed(podName, true)
- needDelete = true
+ p.updatePodHistory(podName, podHistory{actualReadyAt: time.Now()})
},
OnGotPodInfo: func() {
lg.logEventTime("got_pod_info", "waiting_for_buildlet...")
@@ -335,7 +418,9 @@
if needDelete {
log.Printf("Deleting failed pod %q", podName)
- kubeClient.DeletePod(ctx, podName)
+ if err := kubeClient.DeletePod(context.Background(), podName); err != nil {
+ log.Printf("Error deleting pod %q: %v", podName, err)
+ }
p.setPodUsed(podName, false)
}
return nil, err
@@ -348,7 +433,7 @@
// running the buildlet.
go func() {
<-ctx.Done()
- log.Printf("Deleting pod %q after build context cancel received ", podName)
+ log.Printf("Deleting pod %q after build context completed", podName)
// Giving DeletePod a new context here as the build ctx has been canceled
kubeClient.DeletePod(context.Background(), podName)
p.setPodUsed(podName, false)
@@ -377,31 +462,52 @@
func (p *kubeBuildletPool) capacityString() string {
p.mu.Lock()
defer p.mu.Unlock()
- return fmt.Sprintf("%v/%v CPUs; %v/%v Memory",
- p.cpuUsage, p.cpuCapacity,
- p.memoryUsage, p.memoryCapacity)
+ return fmt.Sprintf("<ul><li>%v CPUs running, %v CPUs pending, %v total CPUs in cluster</li><li>%v memory running, %v memory pending, %v total memory in cluster</li></ul>",
+ p.runningResources.cpu, p.pendingResources.cpu, p.clusterResources.cpu,
+ p.runningResources.memory, p.pendingResources.memory, p.clusterResources.memory)
}
func (p *kubeBuildletPool) setPodUsed(podName string, used bool) {
p.mu.Lock()
defer p.mu.Unlock()
if p.pods == nil {
- p.pods = make(map[string]time.Time)
+ p.pods = make(map[string]podHistory)
}
if used {
- p.pods[podName] = time.Now()
- // Track cpu and memory usage
- p.cpuUsage.Add(buildlet.BuildletCPU)
- p.memoryUsage.Add(buildlet.BuildletMemory)
+ p.pods[podName] = podHistory{requestedAt: time.Now()}
} else {
+ p.pods[podName] = podHistory{deletedAt: time.Now()}
+ // TODO(evanbrown): log this podHistory data for analytics purposes before deleting
delete(p.pods, podName)
- // Track cpu and memory usage
- p.cpuUsage.Sub(buildlet.BuildletCPU)
- p.memoryUsage.Sub(buildlet.BuildletMemory)
}
}
+func (p *kubeBuildletPool) updatePodHistory(podName string, updatedHistory podHistory) error {
+ p.mu.Lock()
+ defer p.mu.Unlock()
+
+ ph, ok := p.pods[podName]
+ if !ok {
+ return fmt.Errorf("pod %q does not exist", podName)
+ }
+
+ if !updatedHistory.actualReadyAt.IsZero() {
+ ph.actualReadyAt = updatedHistory.actualReadyAt
+ }
+ if !updatedHistory.estReadyAt.IsZero() {
+ ph.estReadyAt = updatedHistory.estReadyAt
+ }
+ if !updatedHistory.requestedAt.IsZero() {
+ ph.requestedAt = updatedHistory.requestedAt
+ }
+ if !updatedHistory.deletedAt.IsZero() {
+ ph.deletedAt = updatedHistory.deletedAt
+ }
+ p.pods[podName] = ph
+ return nil
+}
+
func (p *kubeBuildletPool) podUsed(podName string) bool {
p.mu.Lock()
defer p.mu.Unlock()
@@ -412,10 +518,10 @@
func (p *kubeBuildletPool) podsActive() (ret []resourceTime) {
p.mu.Lock()
defer p.mu.Unlock()
- for name, create := range p.pods {
+ for name, ph := range p.pods {
ret = append(ret, resourceTime{
name: name,
- creation: create,
+ creation: ph.requestedAt,
})
}
sort.Sort(byCreationTime(ret))
diff --git a/kubernetes/client.go b/kubernetes/client.go
index 5767465..16c2c1f 100644
--- a/kubernetes/client.go
+++ b/kubernetes/client.go
@@ -53,8 +53,7 @@
// Run creates a new pod resource in the default pod namespace with
// the given pod API specification.
// It returns the pod status once it has entered the Running phase.
-// An error is returned if the pod can not be created, if it does
-// does not enter the running phase within 2 minutes, or if ctx.Done
+// An error is returned if the pod can not be created, or if ctx.Done
// is closed.
func (c *Client) RunPod(ctx context.Context, pod *api.Pod) (*api.Pod, error) {
var podJSON bytes.Buffer
@@ -82,17 +81,27 @@
if err := json.Unmarshal(body, &podResult); err != nil {
return nil, fmt.Errorf("failed to decode pod resources: %v", err)
}
- ctx, cancel := context.WithTimeout(ctx, 2*time.Minute)
- defer cancel()
- createdPod, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
- if err != nil {
- // The pod did not leave the pending state. We should try to manually delete it before
- // returning an error.
- c.DeletePod(context.Background(), podResult.Name)
- return nil, fmt.Errorf("timed out waiting for pod %q to leave pending state: %v", pod.Name, err)
+ retryWait := 1
+ retryMax := retryWait << 3 // retry 3 times
+ for {
+ createdPod, err := c.AwaitPodNotPending(ctx, podResult.Name, podResult.ObjectMeta.ResourceVersion)
+ if err != nil {
+ if err == context.Canceled {
+ return nil, err
+ }
+ if retryWait < retryMax { // retry
+ time.Sleep(time.Duration(retryWait) * time.Second)
+ retryWait = retryWait << 1
+ continue
+ }
+ // The pod did not leave the pending state. We should try to manually delete it before
+ // returning an error.
+ c.DeletePod(context.Background(), podResult.Name)
+ return nil, fmt.Errorf("waiting for pod %q to leave pending state: %v", pod.Name, err)
+ }
+ return createdPod, nil
}
- return createdPod, nil
}
// GetPods returns all pods in the cluster, regardless of status.
@@ -219,11 +228,14 @@
return
}
res, err := ctxhttp.Do(ctx, c.httpClient, req)
- defer res.Body.Close()
if err != nil {
- statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
+ if err != context.Canceled {
+ statusChan <- PodStatusResult{Err: fmt.Errorf("failed to make request: GET %q: %v", getURL, err)}
+ }
+ statusChan <- PodStatusResult{Err: err} // context.Canceled
return
}
+ defer res.Body.Close()
var wps watchPodStatus
reader := bufio.NewReader(res.Body)