internal, internal/coordinator/pool: add a shared polling function
This adds a polling function that is tested and can be reused by
callers in the x/build repository. This was suggested in the review of
CL 247907. The polling function replaces the pollers used in the EC2 buildlet pool.
For golang/go#36841
Change-Id: I120ceb83e2740f0bdc5ee2423e0edd3ad727bf4b
Reviewed-on: https://go-review.googlesource.com/c/build/+/255358
Trust: Carlos Amedee <carlos@golang.org>
Run-TryBot: Carlos Amedee <carlos@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Bryan C. Mills <bcmills@google.com>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
diff --git a/internal/coordinator/pool/ec2.go b/internal/coordinator/pool/ec2.go
index 26b44b7..38ccef2 100644
--- a/internal/coordinator/pool/ec2.go
+++ b/internal/coordinator/pool/ec2.go
@@ -20,6 +20,7 @@
"golang.org/x/build/buildenv"
"golang.org/x/build/buildlet"
"golang.org/x/build/dashboard"
+ "golang.org/x/build/internal"
"golang.org/x/build/internal/cloud"
"golang.org/x/build/internal/spanlog"
)
@@ -72,10 +73,6 @@
// EC2Buildlet manages a pool of AWS EC2 buildlets.
type EC2Buildlet struct {
- once sync.Once
- // done channel closing will signal the pollers to discontinue polling
- done chan struct{}
-
// awsClient is the client used to interact with AWS services.
awsClient awsClient
// buildEnv contains the build enviornment settings.
@@ -93,8 +90,12 @@
isRemoteBuildlet IsRemoteBuildletFunc
// ledger tracks instances and their resource allocations.
ledger *ledger
+ // cancelPoll will signal to the pollers to discontinue polling.
+ cancelPoll context.CancelFunc
// vmDeleteTimeout contains the timeout used to determine if a VM should be deleted.
vmDeleteTimeout time.Duration
+ // pollWait waits for all pollers to terminate polling.
+ pollWait sync.WaitGroup
}
// ec2BuildletClient represents an EC2 buildlet client in the buildlet package.
@@ -111,11 +112,12 @@
if fn == nil {
return nil, errors.New("remote buildlet check function is not set")
}
+ ctx, cancel := context.WithCancel(context.Background())
b := &EC2Buildlet{
awsClient: client,
buildEnv: buildEnv,
buildletClient: buildlet.NewEC2Client(client),
- done: make(chan struct{}),
+ cancelPoll: cancel,
hosts: hosts,
isRemoteBuildlet: fn,
ledger: newLedger(),
@@ -124,14 +126,34 @@
for _, opt := range opts {
opt(b)
}
- if err := b.retrieveAndSetQuota(); err != nil {
+ if err := b.retrieveAndSetQuota(ctx); err != nil {
return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
}
if err := b.retrieveAndSetInstanceTypes(); err != nil {
return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
}
- go b.cleanupUnusedVMs()
- go b.pollQuota()
+
+ b.pollWait.Add(1)
+ // polls for the EC2 quota data and sets the quota data in
+ // the ledger. When the context has been cancelled, the polling will stop.
+ go func() {
+ go internal.PeriodicallyDo(ctx, time.Hour, func(ctx context.Context, _ time.Time) {
+ log.Printf("retrieveing EC2 quota")
+ _ = b.retrieveAndSetQuota(ctx)
+ })
+ b.pollWait.Done()
+ }()
+
+ b.pollWait.Add(1)
+ // poll queries for VMs which are not tracked in the ledger and
+ // deletes them. When the context has been cancelled, the polling will stop.
+ go func() {
+ go internal.PeriodicallyDo(ctx, 2*time.Minute, func(ctx context.Context, _ time.Time) {
+ log.Printf("cleaning up unused EC2 instances")
+ b.destroyUntrackedInstances(ctx)
+ })
+ b.pollWait.Done()
+ }()
// TODO(golang.org/issues/38337) remove once a package level variable is no longer
// required by the main package.
@@ -256,45 +278,24 @@
// Close stops the pollers used by the EC2Buildlet pool from running.
func (eb *EC2Buildlet) Close() {
- eb.once.Do(func() {
- close(eb.done)
- })
+ eb.cancelPoll()
+ eb.pollWait.Wait()
}
// retrieveAndSetQuota queries EC2 for account relevant quotas and sets the quota in the ledger.
-func (eb *EC2Buildlet) retrieveAndSetQuota() error {
- ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+func (eb *EC2Buildlet) retrieveAndSetQuota(ctx context.Context) error {
+ ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
cpuQuota, err := eb.awsClient.Quota(ctx, cloud.QuotaServiceEC2, cloud.QuotaCodeCPUOnDemand)
if err != nil {
- log.Printf("unable to query for cpu quota: %s", err)
+ log.Printf("unable to query for EC2 cpu quota: %s", err)
return err
}
eb.ledger.SetCPULimit(cpuQuota)
return nil
}
-// pollQuota repeatedly polls for the EC2 quota data and sets the quota data in
-// the ledger. It stops polling once the done channel has been closed.
-func (eb *EC2Buildlet) pollQuota() {
- t := time.NewTicker(time.Hour)
- defer t.Stop()
- for {
- select {
- case <-t.C:
- err := eb.retrieveAndSetQuota()
- if err != nil {
- log.Printf("polling for EC2 quota failed: %s", err)
- }
- case <-eb.done:
- // closing the done channel signals the end of the polling loop.
- log.Printf("stopped polling for EC2 quota")
- return
- }
- }
-}
-
// retrieveAndSetInstanceTypes retrieves the ARM64 instance types from the EC2
// service and sets them in the ledger.
func (eb *EC2Buildlet) retrieveAndSetInstanceTypes() error {
@@ -303,35 +304,17 @@
its, err := eb.awsClient.InstanceTypesARM(ctx)
if err != nil {
- return fmt.Errorf("unable to retrieve instance types: %w", err)
+ return fmt.Errorf("unable to retrieve EC2 instance types: %w", err)
}
eb.ledger.UpdateInstanceTypes(its)
log.Printf("ec2 buildlet pool instance types updated")
return nil
}
-// cleanupUnusedVMs periodically queries for VMs which are not tracked in the ledger and
-// deletes them. If the done channel has been closed then the polling will exit.
-func (eb *EC2Buildlet) cleanupUnusedVMs() {
- t := time.NewTicker(2 * time.Minute)
- defer t.Stop()
- for {
- select {
- case <-t.C:
- log.Printf("cleaning up unused EC2 instances")
- eb.destroyUntrackedInstances()
- case <-eb.done:
- // closing the done channel signals the end of the polling loop.
- log.Printf("stopped cleaning up unused EC2 instances")
- return
- }
- }
-}
-
// destroyUntrackedInstances searches for VMs which exist but are not being tracked in the
// ledger and deletes them.
-func (eb *EC2Buildlet) destroyUntrackedInstances() {
- ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+func (eb *EC2Buildlet) destroyUntrackedInstances(ctx context.Context) {
+ ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
insts, err := eb.awsClient.RunningInstances(ctx)
diff --git a/internal/coordinator/pool/ec2_test.go b/internal/coordinator/pool/ec2_test.go
index 1436c81..dc501e8 100644
--- a/internal/coordinator/pool/ec2_test.go
+++ b/internal/coordinator/pool/ec2_test.go
@@ -462,23 +462,13 @@
}
func TestEC2BuildletClose(t *testing.T) {
+ cancelled := false
pool := &EC2Buildlet{
- done: make(chan struct{}),
+ cancelPoll: func() { cancelled = true },
}
- defer func() {
- if r := recover(); r != nil {
- t.Errorf("EC2Buildlet.Close() paniced=%s", r)
- }
- }()
pool.Close()
- pool.Close()
- select {
- case _, ok := <-pool.done:
- if ok {
- t.Error("EC2Buildlet.done not closed; read from channel")
- }
- default:
- t.Error("EC2Buildlet.done not closed: waiting for read")
+ if !cancelled {
+ t.Error("EC2Buildlet.pollCancel not called")
}
}
@@ -487,7 +477,7 @@
awsClient: cloud.NewFakeAWSClient(),
ledger: newLedger(),
}
- err := pool.retrieveAndSetQuota()
+ err := pool.retrieveAndSetQuota(context.Background())
if err != nil {
t.Errorf("EC2Buildlet.retrieveAndSetQuota(ctx) = %s; want nil", err)
}
@@ -556,7 +546,7 @@
},
},
}
- pool.destroyUntrackedInstances()
+ pool.destroyUntrackedInstances(context.Background())
wantInstCount := 3
gotInsts, err := awsC.RunningInstances(context.Background())
if err != nil || len(gotInsts) != wantInstCount {
diff --git a/internal/internal.go b/internal/internal.go
new file mode 100644
index 0000000..c7df7b9
--- /dev/null
+++ b/internal/internal.go
@@ -0,0 +1,24 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+ "context"
+ "time"
+)
+
+// PeriodicallyDo calls f every period until the provided context is cancelled.
+func PeriodicallyDo(ctx context.Context, period time.Duration, f func(context.Context, time.Time)) {
+ ticker := time.NewTicker(period)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case now := <-ticker.C:
+ f(ctx, now)
+ }
+ }
+}
diff --git a/internal/internal_test.go b/internal/internal_test.go
new file mode 100644
index 0000000..adff1c5
--- /dev/null
+++ b/internal/internal_test.go
@@ -0,0 +1,41 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+ "context"
+ "testing"
+ "time"
+)
+
+func TestPeriodicallyDo(t *testing.T) {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ didWork := make(chan time.Time, 2)
+ done := make(chan interface{})
+ go func() {
+ PeriodicallyDo(ctx, time.Millisecond, func(ctx context.Context, t time.Time) {
+ select {
+ case didWork <- t:
+ case <-ctx.Done():
+ }
+ })
+ close(done)
+ }()
+ select {
+ case <-time.After(5 * time.Second):
+ t.Error("PeriodicallyDo() never called f, wanted at least one call")
+ case <-didWork:
+ // PeriodicallyDo called f successfully.
+ }
+ select {
+ case <-done:
+ t.Errorf("PeriodicallyDo() finished early, wanted it to still be looping")
+ case <-didWork:
+ cancel()
+ }
+ <-done
+}