internal/pool: move the kubernetes buildlet pool into a pool package
This is a set in a series of steps which will move everything buildlet
pool related into a pool package.
Updates golang/go#36841
Change-Id: I8efb1f94c7b929be559004d9f455bca0370c7800
Reviewed-on: https://go-review.googlesource.com/c/build/+/227768
Run-TryBot: Carlos Amedee <carlos@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
diff --git a/cmd/coordinator/coordinator.go b/cmd/coordinator/coordinator.go
index 6dc50a6..d203b59 100644
--- a/cmd/coordinator/coordinator.go
+++ b/cmd/coordinator/coordinator.go
@@ -283,9 +283,9 @@
}
// TODO(evanbrown: disable kubePool if init fails)
- err = initKube()
+ err = pool.InitKube(monitorGitMirror)
if err != nil {
- kubeErr = err
+ pool.KubeSetErr(err)
log.Printf("Kube support disabled due to error initializing Kubernetes: %v", err)
}
@@ -346,8 +346,8 @@
pool.GetGCEBuildletPool().SetEnabled(*devEnableGCE)
} else {
go pool.GetGCEBuildletPool().CleanUpOldVMs()
- if kubeErr == nil {
- go kubePool.cleanUpOldPodsLoop(context.Background())
+ if pool.KubeErr() == nil {
+ go pool.KubePool().CleanUpOldPodsLoop(context.Background())
}
if pool.GCEInStaging() {
@@ -1628,10 +1628,10 @@
case conf.IsVM():
return pool.GetGCEBuildletPool()
case conf.IsContainer():
- if pool.GCEBuildEnv().PreferContainersOnCOS || kubeErr != nil {
+ if pool.GCEBuildEnv().PreferContainersOnCOS || pool.KubeErr() != nil {
return pool.GetGCEBuildletPool() // it also knows how to do containers.
} else {
- return kubePool
+ return pool.KubePool()
}
case conf.IsReverse:
return reversePool
@@ -1820,7 +1820,7 @@
}
func (st *buildStatus) getCrossCompileConfig() *dashboard.CrossCompileConfig {
- if kubeErr != nil {
+ if pool.KubeErr() != nil {
return nil
}
config := st.conf.CrossCompileConfig
diff --git a/cmd/coordinator/status.go b/cmd/coordinator/status.go
index 063113c..0261e08 100644
--- a/cmd/coordinator/status.go
+++ b/cmd/coordinator/status.go
@@ -184,7 +184,7 @@
func gitMirrorErrors() (errs []string) {
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
- pods, err := goKubeClient.GetPods(ctx)
+ pods, err := pool.KubeGoClient().GetPods(ctx)
if err != nil {
log.Println("gitMirrorErrors: goKubeClient.GetPods:", err)
return []string{"failed to get pods; can't query gitmirror status"}
@@ -662,7 +662,7 @@
data.GCEPoolStatus = template.HTML(buf.String())
buf.Reset()
- kubePool.WriteHTMLStatus(&buf)
+ pool.KubePool().WriteHTMLStatus(&buf)
data.KubePoolStatus = template.HTML(buf.String())
buf.Reset()
diff --git a/cmd/coordinator/kube.go b/internal/coordinator/pool/kube.go
similarity index 85%
rename from cmd/coordinator/kube.go
rename to internal/coordinator/pool/kube.go
index c901d79..bef007a 100644
--- a/cmd/coordinator/kube.go
+++ b/internal/coordinator/pool/kube.go
@@ -5,7 +5,7 @@
// +build go1.13
// +build linux darwin
-package main
+package pool
import (
"context"
@@ -22,7 +22,6 @@
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
- "golang.org/x/build/internal/coordinator/pool"
"golang.org/x/build/internal/sourcecache"
"golang.org/x/build/kubernetes"
"golang.org/x/build/kubernetes/api"
@@ -34,6 +33,15 @@
This file implements the Kubernetes-based buildlet pool.
*/
+const (
+ // podDeleteTimeout is how long before we delete a VM.
+ // In practice this need only be as long as the slowest
+ // builder (plan9 currently), because on startup this program
+ // already deletes all buildlets it doesn't know about
+ // (i.e. ones from a previous instance of the coordinator).
+ podDeleteTimeout = 45 * time.Minute
+)
+
// Initialized by initKube:
var (
buildletsKubeClient *kubernetes.Client // for "buildlets" cluster
@@ -43,14 +51,17 @@
kubeCluster *container.Cluster
)
-// initGCE must be called before initKube
-func initKube() error {
- if pool.GCEBuildEnv().KubeBuild.MaxNodes == 0 {
+// MonitorGitMirrorFunc defines a function used to monitor gitmirror.
+type MonitorGitMirrorFunc func()
+
+// InitGCE must be called before initKube
+func InitKube(monitorGitMirror MonitorGitMirrorFunc) error {
+ if GCEBuildEnv().KubeBuild.MaxNodes == 0 {
return errors.New("Kubernetes builders disabled due to KubeBuild.MaxNodes == 0")
}
// projectID was set by initGCE
- registryPrefix += "/" + pool.GCEBuildEnv().ProjectName
+ registryPrefix += "/" + GCEBuildEnv().ProjectName
if !hasCloudPlatformScope() {
return errors.New("coordinator not running with access to the Cloud Platform scope.")
}
@@ -59,19 +70,19 @@
defer cancel() // ctx is only used for discovery and connect; not retained.
var err error
buildletsKubeClient, err = gke.NewClient(ctx,
- pool.GCEBuildEnv().KubeBuild.Name,
- gke.OptZone(pool.GCEBuildEnv().ControlZone),
- gke.OptProject(pool.GCEBuildEnv().ProjectName),
- gke.OptTokenSource(pool.GCPCredentials().TokenSource))
+ GCEBuildEnv().KubeBuild.Name,
+ gke.OptZone(GCEBuildEnv().ControlZone),
+ gke.OptProject(GCEBuildEnv().ProjectName),
+ gke.OptTokenSource(GCPCredentials().TokenSource))
if err != nil {
return err
}
goKubeClient, err = gke.NewClient(ctx,
- pool.GCEBuildEnv().KubeTools.Name,
- gke.OptZone(pool.GCEBuildEnv().ControlZone),
- gke.OptProject(pool.GCEBuildEnv().ProjectName),
- gke.OptTokenSource(pool.GCPCredentials().TokenSource))
+ GCEBuildEnv().KubeTools.Name,
+ gke.OptZone(GCEBuildEnv().ControlZone),
+ gke.OptProject(GCEBuildEnv().ProjectName),
+ gke.OptTokenSource(GCPCredentials().TokenSource))
if err != nil {
return err
}
@@ -85,6 +96,26 @@
return nil
}
+// KubeSetErr sets the kube error to passed in value.
+func KubeSetErr(err error) {
+ kubeErr = err
+}
+
+// KubeErr retrieves the kube error value.
+func KubeErr() error {
+ return kubeErr
+}
+
+// KubePool returns the kube buildlet pool.
+func KubePool() *kubeBuildletPool {
+ return kubePool
+}
+
+// KubeGoClient retrieves a kube client for the go cluster.
+func KubeGoClient() *kubernetes.Client {
+ return goKubeClient
+}
+
// kubeBuildletPool is the Kubernetes buildlet pool.
type kubeBuildletPool struct {
mu sync.Mutex // guards all following
@@ -136,12 +167,12 @@
func (p *kubeBuildletPool) pollCapacity(ctx context.Context) {
nodes, err := buildletsKubeClient.GetNodes(ctx)
if err != nil {
- log.Printf("failed to retrieve nodes to calculate cluster capacity for %s/%s: %v", pool.GCEBuildEnv().ProjectName, pool.GCEBuildEnv().Region(), err)
+ log.Printf("failed to retrieve nodes to calculate cluster capacity for %s/%s: %v", GCEBuildEnv().ProjectName, GCEBuildEnv().Region(), err)
return
}
pods, err := buildletsKubeClient.GetPods(ctx)
if err != nil {
- log.Printf("failed to retrieve pods to calculate cluster capacity for %s/%s: %v", pool.GCEBuildEnv().ProjectName, pool.GCEBuildEnv().Region(), err)
+ log.Printf("failed to retrieve pods to calculate cluster capacity for %s/%s: %v", GCEBuildEnv().ProjectName, GCEBuildEnv().Region(), err)
return
}
@@ -210,7 +241,7 @@
}
-func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg pool.Logger) (*buildlet.Client, error) {
+func (p *kubeBuildletPool) GetBuildlet(ctx context.Context, hostType string, lg Logger) (*buildlet.Client, error) {
hconf, ok := dashboard.Hosts[hostType]
if !ok || !hconf.IsContainer() {
return nil, fmt.Errorf("kubepool: invalid host type %q", hostType)
@@ -222,7 +253,7 @@
panic("expect non-nil buildletsKubeClient")
}
- deleteIn, ok := ctx.Value(pool.BuildletTimeoutOpt{}).(time.Duration)
+ deleteIn, ok := ctx.Value(BuildletTimeoutOpt{}).(time.Duration)
if !ok {
deleteIn = podDeleteTimeout
}
@@ -237,7 +268,7 @@
log.Printf("Creating Kubernetes pod %q for %s", podName, hostType)
bc, err := buildlet.StartPod(ctx, buildletsKubeClient, podName, hostType, buildlet.PodOpts{
- ProjectID: pool.GCEBuildEnv().ProjectName,
+ ProjectID: GCEBuildEnv().ProjectName,
ImageRegistry: registryPrefix,
Description: fmt.Sprintf("Go Builder for %s", hostType),
DeleteIn: deleteIn,
@@ -354,16 +385,16 @@
return ok
}
-func (p *kubeBuildletPool) podsActive() (ret []pool.ResourceTime) {
+func (p *kubeBuildletPool) podsActive() (ret []ResourceTime) {
p.mu.Lock()
defer p.mu.Unlock()
for name, ph := range p.pods {
- ret = append(ret, pool.ResourceTime{
+ ret = append(ret, ResourceTime{
Name: name,
Creation: ph.requestedAt,
})
}
- sort.Sort(pool.ByCreationTime(ret))
+ sort.Sort(ByCreationTime(ret))
return ret
}
@@ -376,7 +407,7 @@
return fmt.Sprintf("Kubernetes pool capacity: %d/%d", inUse, total)
}
-// cleanUpOldPods loops forever and periodically enumerates pods
+// CleanUpOldPods loops forever and periodically enumerates pods
// and deletes those which have expired.
//
// A Pod is considered expired if it has a "delete-at" metadata
@@ -389,7 +420,7 @@
// stranded and wasting resources forever, we instead set the
// "delete-at" metadata attribute on them when created to some time
// that's well beyond their expected lifetime.
-func (p *kubeBuildletPool) cleanUpOldPodsLoop(ctx context.Context) {
+func (p *kubeBuildletPool) CleanUpOldPodsLoop(ctx context.Context) {
if buildletsKubeClient == nil {
log.Printf("cleanUpOldPods: no buildletsKubeClient configured; aborting.")
return
@@ -438,7 +469,7 @@
}
if err == nil && time.Now().Unix() > unixDeadline {
stats.DeletedOld++
- log.Printf("cleanUpOldPods: Deleting expired pod %q in zone %q ...", pod.Name, pool.GCEBuildEnv().ControlZone)
+ log.Printf("cleanUpOldPods: Deleting expired pod %q in zone %q ...", pod.Name, GCEBuildEnv().ControlZone)
err = buildletsKubeClient.DeletePod(ctx, pod.Name)
if err != nil {
log.Printf("cleanUpOldPods: problem deleting old pod %q: %v", pod.Name, err)
@@ -468,5 +499,5 @@
}
func hasCloudPlatformScope() bool {
- return pool.HasScope(container.CloudPlatformScope)
+ return HasScope(container.CloudPlatformScope)
}