sandbox: instrument number of running containers

This change adds a metric to count the number of running containers,
according to Docker.

Updates golang/go#25224
Updates golang/go#38530

Change-Id: Id989986928dff594cb1de0903a56dcffed8220c4
Reviewed-on: https://go-review.googlesource.com/c/playground/+/229680
Run-TryBot: Alexander Rakoczy <alex@golang.org>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Carlos Amedee <carlos@golang.org>
diff --git a/internal/internal.go b/internal/internal.go
index 1540e93..274bf2c 100644
--- a/internal/internal.go
+++ b/internal/internal.go
@@ -1,3 +1,7 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
 package internal
 
 import (
@@ -64,3 +68,17 @@
 	}
 	return waitErr
 }
+
+// PeriodicallyDo calls f every period until the provided context is cancelled.
+func PeriodicallyDo(ctx context.Context, period time.Duration, f func(context.Context, time.Time)) {
+	ticker := time.NewTicker(period)
+	defer ticker.Stop()
+	for {
+		select {
+		case <-ctx.Done():
+			return
+		case now := <-ticker.C:
+			f(ctx, now)
+		}
+	}
+}
diff --git a/internal/internal_test.go b/internal/internal_test.go
new file mode 100644
index 0000000..337bbbc
--- /dev/null
+++ b/internal/internal_test.go
@@ -0,0 +1,48 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package internal
+
+import (
+	"context"
+	"testing"
+	"time"
+)
+
+func TestPeriodicallyDo(t *testing.T) {
+	ctx, cancel := context.WithCancel(context.Background())
+	didWork := make(chan time.Time, 2)
+	done := make(chan interface{})
+	go func() {
+		PeriodicallyDo(ctx, 100*time.Millisecond, func(ctx context.Context, t time.Time) {
+			select {
+			case didWork <- t:
+			default:
+				// No need to assert that we can't send, we just care that we sent.
+			}
+		})
+		close(done)
+	}()
+
+	select {
+	case <-time.After(5 * time.Second):
+		t.Error("PeriodicallyDo() never called f, wanted at least one call")
+	case <-didWork:
+		// PeriodicallyDo called f successfully.
+	}
+
+	select {
+	case <-done:
+		t.Errorf("PeriodicallyDo() finished early, wanted it to still be looping")
+	case <-didWork:
+		cancel()
+	}
+
+	select {
+	case <-time.After(time.Second):
+		t.Fatal("PeriodicallyDo() never returned, wanted return after context cancellation")
+	case <-done:
+		// PeriodicallyDo successfully returned.
+	}
+}
diff --git a/sandbox/metrics.go b/sandbox/metrics.go
index 9ec454d..3d80066 100644
--- a/sandbox/metrics.go
+++ b/sandbox/metrics.go
@@ -13,6 +13,7 @@
 	"contrib.go.opencensus.io/exporter/prometheus"
 	"contrib.go.opencensus.io/exporter/stackdriver"
 	"go.opencensus.io/plugin/ochttp"
+	"go.opencensus.io/stats"
 	"go.opencensus.io/stats/view"
 	"go.opencensus.io/tag"
 	mrpb "google.golang.org/genproto/googleapis/api/monitoredres"
@@ -22,6 +23,32 @@
 //  * The views are prefixed with go-playground-sandbox.
 //  * ochttp.KeyServerRoute is added as a tag to label metrics per-route.
 var (
+	mContainers         = stats.Int64("go-playground/sandbox/container_count", "number of sandbox containers", stats.UnitDimensionless)
+	mUnwantedContainers = stats.Int64("go-playground/sandbox/unwanted_container_count", "number of sandbox containers that are unexpectedly running", stats.UnitDimensionless)
+	mMaxContainers      = stats.Int64("go-playground/sandbox/max_container_count", "target number of sandbox containers", stats.UnitDimensionless)
+
+	containerCount = &view.View{
+		Name:        "go-playground/sandbox/container_count",
+		Description: "Number of running sandbox containers",
+		TagKeys:     nil,
+		Measure:     mContainers,
+		Aggregation: view.LastValue(),
+	}
+	unwantedContainerCount = &view.View{
+		Name:        "go-playground/sandbox/unwanted_container_count",
+		Description: "Number of running sandbox containers that are not being tracked by the sandbox",
+		TagKeys:     nil,
+		Measure:     mUnwantedContainers,
+		Aggregation: view.LastValue(),
+	}
+	maxContainerCount = &view.View{
+		Name:        "go-playground/sandbox/max_container_count",
+		Description: "Maximum number of containers to create",
+		TagKeys:     nil,
+		Measure:     mMaxContainers,
+		Aggregation: view.LastValue(),
+	}
+
 	ServerRequestCountView = &view.View{
 		Name:        "go-playground-sandbox/http/server/request_count",
 		Description: "Count of HTTP requests started",
@@ -72,6 +99,9 @@
 // When the sandbox is not running on GCE, it will host metrics through a prometheus HTTP handler.
 func newMetricService() (*metricService, error) {
 	err := view.Register(
+		containerCount,
+		unwantedContainerCount,
+		maxContainerCount,
 		ServerRequestCountView,
 		ServerRequestBytesView,
 		ServerResponseBytesView,
diff --git a/sandbox/sandbox.go b/sandbox/sandbox.go
index f2a49e7..59ba51a 100644
--- a/sandbox/sandbox.go
+++ b/sandbox/sandbox.go
@@ -11,6 +11,7 @@
 package main
 
 import (
+	"bufio"
 	"bytes"
 	"context"
 	"crypto/rand"
@@ -31,6 +32,7 @@
 	"time"
 
 	"go.opencensus.io/plugin/ochttp"
+	"go.opencensus.io/stats"
 	"go.opencensus.io/trace"
 	"golang.org/x/playground/internal"
 	"golang.org/x/playground/sandbox/sandboxtypes"
@@ -146,6 +148,9 @@
 	mux.Handle("/run", ochttp.WithRouteTag(http.HandlerFunc(runHandler), "/run"))
 
 	go makeWorkers()
+	go internal.PeriodicallyDo(context.Background(), 10*time.Second, func(ctx context.Context, _ time.Time) {
+		countDockerContainers(ctx)
+	})
 
 	trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()})
 	httpServer = &http.Server{
@@ -155,6 +160,70 @@
 	log.Fatal(httpServer.ListenAndServe())
 }
 
+// dockerContainer is the structure of each line output from docker ps.
+type dockerContainer struct {
+	// ID is the docker container ID.
+	ID string `json:"ID"`
+	// Image is the docker image name.
+	Image string `json:"Image"`
+	// Names is the docker container name.
+	Names string `json:"Names"`
+}
+
+// countDockerContainers records the metric for the current number of docker containers.
+// It also records the count of any unwanted containers.
+func countDockerContainers(ctx context.Context) {
+	cs, err := listDockerContainers(ctx)
+	if err != nil {
+		log.Printf("Error counting docker containers: %v", err)
+	}
+	stats.Record(ctx, mContainers.M(int64(len(cs))))
+	var unwantedCount int64
+	for _, c := range cs {
+		if c.Names != "" && !isContainerWanted(c.Names) {
+			unwantedCount++
+		}
+	}
+	stats.Record(ctx, mUnwantedContainers.M(unwantedCount))
+}
+
+// listDockerContainers returns the current running play_run containers reported by docker.
+func listDockerContainers(ctx context.Context) ([]dockerContainer, error) {
+	out := new(bytes.Buffer)
+	cmd := exec.Command("docker", "ps", "--quiet", "--filter", "name=play_run_", "--format", "{{json .}}")
+	cmd.Stdout, cmd.Stderr = out, out
+	if err := cmd.Start(); err != nil {
+		return nil, fmt.Errorf("listDockerContainers: cmd.Start() failed: %w", err)
+	}
+	ctx, cancel := context.WithTimeout(ctx, time.Second)
+	defer cancel()
+	if err := internal.WaitOrStop(ctx, cmd, os.Interrupt, 250*time.Millisecond); err != nil {
+		return nil, fmt.Errorf("listDockerContainers: internal.WaitOrStop() failed: %w", err)
+	}
+	return parseDockerContainers(out.Bytes())
+}
+
+// parseDockerContainers parses the json formatted docker output from docker ps.
+//
+// If there is an error scanning the input, or non-JSON output is encountered, an error is returned.
+func parseDockerContainers(b []byte) ([]dockerContainer, error) {
+	// Parse the output to ensure it is well-formatted in the structure we expect.
+	var containers []dockerContainer
+	// Each output line is it's own JSON object, so unmarshal one line at a time.
+	scanner := bufio.NewScanner(bytes.NewReader(b))
+	for scanner.Scan() {
+		var do dockerContainer
+		if err := json.Unmarshal(scanner.Bytes(), &do); err != nil {
+			return nil, fmt.Errorf("parseDockerContainers: error parsing docker ps output: %w", err)
+		}
+		containers = append(containers, do)
+	}
+	if err := scanner.Err(); err != nil {
+		return nil, fmt.Errorf("parseDockerContainers: error reading docker ps output: %w", err)
+	}
+	return containers, nil
+}
+
 func handleSignals() {
 	c := make(chan os.Signal, 1)
 	signal.Notify(c, syscall.SIGINT)
@@ -292,8 +361,10 @@
 }
 
 func makeWorkers() {
+	ctx := context.Background()
+	stats.Record(ctx, mMaxContainers.M(int64(*numWorkers)))
 	for {
-		c, err := startContainer(context.Background())
+		c, err := startContainer(ctx)
 		if err != nil {
 			log.Printf("error starting container: %v", err)
 			time.Sleep(5 * time.Second)
@@ -332,6 +403,12 @@
 	}
 }
 
+func isContainerWanted(name string) bool {
+	wantedMu.Lock()
+	defer wantedMu.Unlock()
+	return containerWanted[name]
+}
+
 func getContainer(ctx context.Context) (*Container, error) {
 	select {
 	case c := <-readyContainer:
diff --git a/sandbox/sandbox_test.go b/sandbox/sandbox_test.go
index c5ea09a..fac52bb 100644
--- a/sandbox/sandbox_test.go
+++ b/sandbox/sandbox_test.go
@@ -10,6 +10,8 @@
 	"strings"
 	"testing"
 	"testing/iotest"
+
+	"github.com/google/go-cmp/cmp"
 )
 
 func TestLimitedWriter(t *testing.T) {
@@ -182,3 +184,43 @@
 		t.Errorf("dst2.Bytes() = %q, wanted %q", dst2.Bytes(), " and this is after")
 	}
 }
+
+func TestParseDockerContainers(t *testing.T) {
+	cases := []struct {
+		desc    string
+		output  string
+		want    []dockerContainer
+		wantErr bool
+	}{
+		{
+			desc: "normal output (container per line)",
+			output: `{"Command":"\"/usr/local/bin/play…\"","CreatedAt":"2020-04-23 17:44:02 -0400 EDT","ID":"f7f170fde076","Image":"gcr.io/golang-org/playground-sandbox-gvisor:latest","Labels":"","LocalVolumes":"0","Mounts":"","Names":"play_run_a02cfe67","Networks":"none","Ports":"","RunningFor":"8 seconds ago","Size":"0B","Status":"Up 7 seconds"}
+{"Command":"\"/usr/local/bin/play…\"","CreatedAt":"2020-04-23 17:44:02 -0400 EDT","ID":"af872e55a773","Image":"gcr.io/golang-org/playground-sandbox-gvisor:latest","Labels":"","LocalVolumes":"0","Mounts":"","Names":"play_run_0a69c3e8","Networks":"none","Ports":"","RunningFor":"8 seconds ago","Size":"0B","Status":"Up 7 seconds"}`,
+			want: []dockerContainer{
+				{ID: "f7f170fde076", Image: "gcr.io/golang-org/playground-sandbox-gvisor:latest", Names: "play_run_a02cfe67"},
+				{ID: "af872e55a773", Image: "gcr.io/golang-org/playground-sandbox-gvisor:latest", Names: "play_run_0a69c3e8"},
+			},
+			wantErr: false,
+		},
+		{
+			desc:    "empty output",
+			wantErr: false,
+		},
+		{
+			desc:    "malformatted output",
+			output:  `xyzzy{}`,
+			wantErr: true,
+		},
+	}
+	for _, tc := range cases {
+		t.Run(tc.desc, func(t *testing.T) {
+			cs, err := parseDockerContainers([]byte(tc.output))
+			if (err != nil) != tc.wantErr {
+				t.Errorf("parseDockerContainers(_) = %v, %v, wantErr: %v", cs, err, tc.wantErr)
+			}
+			if diff := cmp.Diff(tc.want, cs); diff != "" {
+				t.Errorf("parseDockerContainers() mismatch (-want +got):\n%s", diff)
+			}
+		})
+	}
+}