internal, internal/coordinator/pool: add a shared polling function

This adds a polling function that is tested and can be reused by
callers in the x/build repository. This was suggested in the review of
CL 247907. The polling function replaces the pollers used in the EC2 buildlet pool.

For golang/go#36841

Change-Id: I120ceb83e2740f0bdc5ee2423e0edd3ad727bf4b
Reviewed-on: https://go-review.googlesource.com/c/build/+/255358
Trust: Carlos Amedee <carlos@golang.org>
Run-TryBot: Carlos Amedee <carlos@golang.org>
TryBot-Result: Go Bot <gobot@golang.org>
Reviewed-by: Bryan C. Mills <bcmills@google.com>
Reviewed-by: Alexander Rakoczy <alex@golang.org>
diff --git a/internal/coordinator/pool/ec2.go b/internal/coordinator/pool/ec2.go
index 26b44b7..38ccef2 100644
--- a/internal/coordinator/pool/ec2.go
+++ b/internal/coordinator/pool/ec2.go
@@ -20,6 +20,7 @@
 	"golang.org/x/build/buildenv"
 	"golang.org/x/build/buildlet"
 	"golang.org/x/build/dashboard"
+	"golang.org/x/build/internal"
 	"golang.org/x/build/internal/cloud"
 	"golang.org/x/build/internal/spanlog"
 )
@@ -72,10 +73,6 @@
 
 // EC2Buildlet manages a pool of AWS EC2 buildlets.
 type EC2Buildlet struct {
-	once sync.Once
-	// done channel closing will signal the pollers to discontinue polling
-	done chan struct{}
-
 	// awsClient is the client used to interact with AWS services.
 	awsClient awsClient
 	// buildEnv contains the build enviornment settings.
@@ -93,8 +90,12 @@
 	isRemoteBuildlet IsRemoteBuildletFunc
 	// ledger tracks instances and their resource allocations.
 	ledger *ledger
+	// cancelPoll will signal to the pollers to discontinue polling.
+	cancelPoll context.CancelFunc
 	// vmDeleteTimeout contains the timeout used to determine if a VM should be deleted.
 	vmDeleteTimeout time.Duration
+	// pollWait waits for all pollers to terminate polling.
+	pollWait sync.WaitGroup
 }
 
 // ec2BuildletClient represents an EC2 buildlet client in the buildlet package.
@@ -111,11 +112,12 @@
 	if fn == nil {
 		return nil, errors.New("remote buildlet check function is not set")
 	}
+	ctx, cancel := context.WithCancel(context.Background())
 	b := &EC2Buildlet{
 		awsClient:        client,
 		buildEnv:         buildEnv,
 		buildletClient:   buildlet.NewEC2Client(client),
-		done:             make(chan struct{}),
+		cancelPoll:       cancel,
 		hosts:            hosts,
 		isRemoteBuildlet: fn,
 		ledger:           newLedger(),
@@ -124,14 +126,34 @@
 	for _, opt := range opts {
 		opt(b)
 	}
-	if err := b.retrieveAndSetQuota(); err != nil {
+	if err := b.retrieveAndSetQuota(ctx); err != nil {
 		return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
 	}
 	if err := b.retrieveAndSetInstanceTypes(); err != nil {
 		return nil, fmt.Errorf("unable to create EC2 pool: %w", err)
 	}
-	go b.cleanupUnusedVMs()
-	go b.pollQuota()
+
+	b.pollWait.Add(1)
+	// polls for the EC2 quota data and sets the quota data in
+	// the ledger. When the context has been cancelled, the polling will stop.
+	go func() {
+		go internal.PeriodicallyDo(ctx, time.Hour, func(ctx context.Context, _ time.Time) {
+			log.Printf("retrieveing EC2 quota")
+			_ = b.retrieveAndSetQuota(ctx)
+		})
+		b.pollWait.Done()
+	}()
+
+	b.pollWait.Add(1)
+	// poll queries for VMs which are not tracked in the ledger and
+	// deletes them. When the context has been cancelled, the polling will stop.
+	go func() {
+		go internal.PeriodicallyDo(ctx, 2*time.Minute, func(ctx context.Context, _ time.Time) {
+			log.Printf("cleaning up unused EC2 instances")
+			b.destroyUntrackedInstances(ctx)
+		})
+		b.pollWait.Done()
+	}()
 
 	// TODO(golang.org/issues/38337) remove once a package level variable is no longer
 	// required by the main package.
@@ -256,45 +278,24 @@
 
 // Close stops the pollers used by the EC2Buildlet pool from running.
 func (eb *EC2Buildlet) Close() {
-	eb.once.Do(func() {
-		close(eb.done)
-	})
+	eb.cancelPoll()
+	eb.pollWait.Wait()
 }
 
 // retrieveAndSetQuota queries EC2 for account relevant quotas and sets the quota in the ledger.
-func (eb *EC2Buildlet) retrieveAndSetQuota() error {
-	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+func (eb *EC2Buildlet) retrieveAndSetQuota(ctx context.Context) error {
+	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
 	defer cancel()
 
 	cpuQuota, err := eb.awsClient.Quota(ctx, cloud.QuotaServiceEC2, cloud.QuotaCodeCPUOnDemand)
 	if err != nil {
-		log.Printf("unable to query for cpu quota: %s", err)
+		log.Printf("unable to query for EC2 cpu quota: %s", err)
 		return err
 	}
 	eb.ledger.SetCPULimit(cpuQuota)
 	return nil
 }
 
-// pollQuota repeatedly polls for the EC2 quota data and sets the quota data in
-// the ledger. It stops polling once the done channel has been closed.
-func (eb *EC2Buildlet) pollQuota() {
-	t := time.NewTicker(time.Hour)
-	defer t.Stop()
-	for {
-		select {
-		case <-t.C:
-			err := eb.retrieveAndSetQuota()
-			if err != nil {
-				log.Printf("polling for EC2 quota failed: %s", err)
-			}
-		case <-eb.done:
-			// closing the done channel signals the end of the polling loop.
-			log.Printf("stopped polling for EC2 quota")
-			return
-		}
-	}
-}
-
 // retrieveAndSetInstanceTypes retrieves the ARM64 instance types from the EC2
 // service and sets them in the ledger.
 func (eb *EC2Buildlet) retrieveAndSetInstanceTypes() error {
@@ -303,35 +304,17 @@
 
 	its, err := eb.awsClient.InstanceTypesARM(ctx)
 	if err != nil {
-		return fmt.Errorf("unable to retrieve instance types: %w", err)
+		return fmt.Errorf("unable to retrieve EC2 instance types: %w", err)
 	}
 	eb.ledger.UpdateInstanceTypes(its)
 	log.Printf("ec2 buildlet pool instance types updated")
 	return nil
 }
 
-// cleanupUnusedVMs periodically queries for VMs which are not tracked in the ledger and
-// deletes them. If the done channel has been closed then the polling will exit.
-func (eb *EC2Buildlet) cleanupUnusedVMs() {
-	t := time.NewTicker(2 * time.Minute)
-	defer t.Stop()
-	for {
-		select {
-		case <-t.C:
-			log.Printf("cleaning up unused EC2 instances")
-			eb.destroyUntrackedInstances()
-		case <-eb.done:
-			// closing the done channel signals the end of the polling loop.
-			log.Printf("stopped cleaning up unused EC2 instances")
-			return
-		}
-	}
-}
-
 // destroyUntrackedInstances searches for VMs which exist but are not being tracked in the
 // ledger and deletes them.
-func (eb *EC2Buildlet) destroyUntrackedInstances() {
-	ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
+func (eb *EC2Buildlet) destroyUntrackedInstances(ctx context.Context) {
+	ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
 	defer cancel()
 
 	insts, err := eb.awsClient.RunningInstances(ctx)
diff --git a/internal/coordinator/pool/ec2_test.go b/internal/coordinator/pool/ec2_test.go
index 1436c81..dc501e8 100644
--- a/internal/coordinator/pool/ec2_test.go
+++ b/internal/coordinator/pool/ec2_test.go
@@ -462,23 +462,13 @@
 }
 
 func TestEC2BuildletClose(t *testing.T) {
+	cancelled := false
 	pool := &EC2Buildlet{
-		done: make(chan struct{}),
+		cancelPoll: func() { cancelled = true },
 	}
-	defer func() {
-		if r := recover(); r != nil {
-			t.Errorf("EC2Buildlet.Close() paniced=%s", r)
-		}
-	}()
 	pool.Close()
-	pool.Close()
-	select {
-	case _, ok := <-pool.done:
-		if ok {
-			t.Error("EC2Buildlet.done not closed; read from channel")
-		}
-	default:
-		t.Error("EC2Buildlet.done not closed: waiting for read")
+	if !cancelled {
+		t.Error("EC2Buildlet.pollCancel not called")
 	}
 }
 
@@ -487,7 +477,7 @@
 		awsClient: cloud.NewFakeAWSClient(),
 		ledger:    newLedger(),
 	}
-	err := pool.retrieveAndSetQuota()
+	err := pool.retrieveAndSetQuota(context.Background())
 	if err != nil {
 		t.Errorf("EC2Buildlet.retrieveAndSetQuota(ctx) = %s; want nil", err)
 	}
@@ -556,7 +546,7 @@
 			},
 		},
 	}
-	pool.destroyUntrackedInstances()
+	pool.destroyUntrackedInstances(context.Background())
 	wantInstCount := 3
 	gotInsts, err := awsC.RunningInstances(context.Background())
 	if err != nil || len(gotInsts) != wantInstCount {
diff --git a/internal/internal.go b/internal/internal.go
new file mode 100644
index 0000000..c7df7b9
--- /dev/null
+++ b/internal/internal.go
@@ -0,0 +1,24 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"context"
+	"time"
+)
+
+// PeriodicallyDo calls f every period until the provided context is cancelled.
+func PeriodicallyDo(ctx context.Context, period time.Duration, f func(context.Context, time.Time)) {
+	ticker := time.NewTicker(period)
+	defer ticker.Stop()
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case now := <-ticker.C:
+			f(ctx, now)
+		}
+	}
+}
diff --git a/internal/internal_test.go b/internal/internal_test.go
new file mode 100644
index 0000000..adff1c5
--- /dev/null
+++ b/internal/internal_test.go
@@ -0,0 +1,41 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"context"
+	"testing"
+	"time"
+)
+
+func TestPeriodicallyDo(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	defer cancel()
+
+	didWork := make(chan time.Time, 2)
+	done := make(chan interface{})
+	go func() {
+		PeriodicallyDo(ctx, time.Millisecond, func(ctx context.Context, t time.Time) {
+			select {
+			case didWork <- t:
+			case <-ctx.Done():
+			}
+		})
+		close(done)
+	}()
+	select {
+	case <-time.After(5 * time.Second):
+		t.Error("PeriodicallyDo() never called f, wanted at least one call")
+	case <-didWork:
+		// PeriodicallyDo called f successfully.
+	}
+	select {
+	case <-done:
+		t.Errorf("PeriodicallyDo() finished early, wanted it to still be looping")
+	case <-didWork:
+		cancel()
+	}
+	<-done
+}