sandbox: instrument number of running containers This change adds a metric to count the number of running containers, according to Docker. Updates golang/go#25224 Updates golang/go#38530 Change-Id: Id989986928dff594cb1de0903a56dcffed8220c4 Reviewed-on: https://go-review.googlesource.com/c/playground/+/229680 Run-TryBot: Alexander Rakoczy <alex@golang.org> TryBot-Result: Gobot Gobot <gobot@golang.org> Reviewed-by: Carlos Amedee <carlos@golang.org>
diff --git a/internal/internal.go b/internal/internal.go index 1540e93..274bf2c 100644 --- a/internal/internal.go +++ b/internal/internal.go
@@ -1,3 +1,7 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package internal import ( @@ -64,3 +68,17 @@ } return waitErr } + +// PeriodicallyDo calls f every period until the provided context is cancelled. +func PeriodicallyDo(ctx context.Context, period time.Duration, f func(context.Context, time.Time)) { + ticker := time.NewTicker(period) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + f(ctx, now) + } + } +}
diff --git a/internal/internal_test.go b/internal/internal_test.go new file mode 100644 index 0000000..337bbbc --- /dev/null +++ b/internal/internal_test.go
@@ -0,0 +1,48 @@ +// Copyright 2020 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + +package internal + +import ( + "context" + "testing" + "time" +) + +func TestPeriodicallyDo(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + didWork := make(chan time.Time, 2) + done := make(chan interface{}) + go func() { + PeriodicallyDo(ctx, 100*time.Millisecond, func(ctx context.Context, t time.Time) { + select { + case didWork <- t: + default: + // No need to assert that we can't send, we just care that we sent. + } + }) + close(done) + }() + + select { + case <-time.After(5 * time.Second): + t.Error("PeriodicallyDo() never called f, wanted at least one call") + case <-didWork: + // PeriodicallyDo called f successfully. + } + + select { + case <-done: + t.Errorf("PeriodicallyDo() finished early, wanted it to still be looping") + case <-didWork: + cancel() + } + + select { + case <-time.After(time.Second): + t.Fatal("PeriodicallyDo() never returned, wanted return after context cancellation") + case <-done: + // PeriodicallyDo successfully returned. + } +}
diff --git a/sandbox/metrics.go b/sandbox/metrics.go index 9ec454d..3d80066 100644 --- a/sandbox/metrics.go +++ b/sandbox/metrics.go
@@ -13,6 +13,7 @@ "contrib.go.opencensus.io/exporter/prometheus" "contrib.go.opencensus.io/exporter/stackdriver" "go.opencensus.io/plugin/ochttp" + "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" mrpb "google.golang.org/genproto/googleapis/api/monitoredres" @@ -22,6 +23,32 @@ // * The views are prefixed with go-playground-sandbox. // * ochttp.KeyServerRoute is added as a tag to label metrics per-route. var ( + mContainers = stats.Int64("go-playground/sandbox/container_count", "number of sandbox containers", stats.UnitDimensionless) + mUnwantedContainers = stats.Int64("go-playground/sandbox/unwanted_container_count", "number of sandbox containers that are unexpectedly running", stats.UnitDimensionless) + mMaxContainers = stats.Int64("go-playground/sandbox/max_container_count", "target number of sandbox containers", stats.UnitDimensionless) + + containerCount = &view.View{ + Name: "go-playground/sandbox/container_count", + Description: "Number of running sandbox containers", + TagKeys: nil, + Measure: mContainers, + Aggregation: view.LastValue(), + } + unwantedContainerCount = &view.View{ + Name: "go-playground/sandbox/unwanted_container_count", + Description: "Number of running sandbox containers that are not being tracked by the sandbox", + TagKeys: nil, + Measure: mUnwantedContainers, + Aggregation: view.LastValue(), + } + maxContainerCount = &view.View{ + Name: "go-playground/sandbox/max_container_count", + Description: "Maximum number of containers to create", + TagKeys: nil, + Measure: mMaxContainers, + Aggregation: view.LastValue(), + } + ServerRequestCountView = &view.View{ Name: "go-playground-sandbox/http/server/request_count", Description: "Count of HTTP requests started", @@ -72,6 +99,9 @@ // When the sandbox is not running on GCE, it will host metrics through a prometheus HTTP handler. func newMetricService() (*metricService, error) { err := view.Register( + containerCount, + unwantedContainerCount, + maxContainerCount, ServerRequestCountView, ServerRequestBytesView, ServerResponseBytesView,
diff --git a/sandbox/sandbox.go b/sandbox/sandbox.go index f2a49e7..59ba51a 100644 --- a/sandbox/sandbox.go +++ b/sandbox/sandbox.go
@@ -11,6 +11,7 @@ package main import ( + "bufio" "bytes" "context" "crypto/rand" @@ -31,6 +32,7 @@ "time" "go.opencensus.io/plugin/ochttp" + "go.opencensus.io/stats" "go.opencensus.io/trace" "golang.org/x/playground/internal" "golang.org/x/playground/sandbox/sandboxtypes" @@ -146,6 +148,9 @@ mux.Handle("/run", ochttp.WithRouteTag(http.HandlerFunc(runHandler), "/run")) go makeWorkers() + go internal.PeriodicallyDo(context.Background(), 10*time.Second, func(ctx context.Context, _ time.Time) { + countDockerContainers(ctx) + }) trace.ApplyConfig(trace.Config{DefaultSampler: trace.NeverSample()}) httpServer = &http.Server{ @@ -155,6 +160,70 @@ log.Fatal(httpServer.ListenAndServe()) } +// dockerContainer is the structure of each line output from docker ps. +type dockerContainer struct { + // ID is the docker container ID. + ID string `json:"ID"` + // Image is the docker image name. + Image string `json:"Image"` + // Names is the docker container name. + Names string `json:"Names"` +} + +// countDockerContainers records the metric for the current number of docker containers. +// It also records the count of any unwanted containers. +func countDockerContainers(ctx context.Context) { + cs, err := listDockerContainers(ctx) + if err != nil { + log.Printf("Error counting docker containers: %v", err) + } + stats.Record(ctx, mContainers.M(int64(len(cs)))) + var unwantedCount int64 + for _, c := range cs { + if c.Names != "" && !isContainerWanted(c.Names) { + unwantedCount++ + } + } + stats.Record(ctx, mUnwantedContainers.M(unwantedCount)) +} + +// listDockerContainers returns the current running play_run containers reported by docker. +func listDockerContainers(ctx context.Context) ([]dockerContainer, error) { + out := new(bytes.Buffer) + cmd := exec.Command("docker", "ps", "--quiet", "--filter", "name=play_run_", "--format", "{{json .}}") + cmd.Stdout, cmd.Stderr = out, out + if err := cmd.Start(); err != nil { + return nil, fmt.Errorf("listDockerContainers: cmd.Start() failed: %w", err) + } + ctx, cancel := context.WithTimeout(ctx, time.Second) + defer cancel() + if err := internal.WaitOrStop(ctx, cmd, os.Interrupt, 250*time.Millisecond); err != nil { + return nil, fmt.Errorf("listDockerContainers: internal.WaitOrStop() failed: %w", err) + } + return parseDockerContainers(out.Bytes()) +} + +// parseDockerContainers parses the json formatted docker output from docker ps. +// +// If there is an error scanning the input, or non-JSON output is encountered, an error is returned. +func parseDockerContainers(b []byte) ([]dockerContainer, error) { + // Parse the output to ensure it is well-formatted in the structure we expect. + var containers []dockerContainer + // Each output line is it's own JSON object, so unmarshal one line at a time. + scanner := bufio.NewScanner(bytes.NewReader(b)) + for scanner.Scan() { + var do dockerContainer + if err := json.Unmarshal(scanner.Bytes(), &do); err != nil { + return nil, fmt.Errorf("parseDockerContainers: error parsing docker ps output: %w", err) + } + containers = append(containers, do) + } + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("parseDockerContainers: error reading docker ps output: %w", err) + } + return containers, nil +} + func handleSignals() { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGINT) @@ -292,8 +361,10 @@ } func makeWorkers() { + ctx := context.Background() + stats.Record(ctx, mMaxContainers.M(int64(*numWorkers))) for { - c, err := startContainer(context.Background()) + c, err := startContainer(ctx) if err != nil { log.Printf("error starting container: %v", err) time.Sleep(5 * time.Second) @@ -332,6 +403,12 @@ } } +func isContainerWanted(name string) bool { + wantedMu.Lock() + defer wantedMu.Unlock() + return containerWanted[name] +} + func getContainer(ctx context.Context) (*Container, error) { select { case c := <-readyContainer:
diff --git a/sandbox/sandbox_test.go b/sandbox/sandbox_test.go index c5ea09a..fac52bb 100644 --- a/sandbox/sandbox_test.go +++ b/sandbox/sandbox_test.go
@@ -10,6 +10,8 @@ "strings" "testing" "testing/iotest" + + "github.com/google/go-cmp/cmp" ) func TestLimitedWriter(t *testing.T) { @@ -182,3 +184,43 @@ t.Errorf("dst2.Bytes() = %q, wanted %q", dst2.Bytes(), " and this is after") } } + +func TestParseDockerContainers(t *testing.T) { + cases := []struct { + desc string + output string + want []dockerContainer + wantErr bool + }{ + { + desc: "normal output (container per line)", + output: `{"Command":"\"/usr/local/bin/play…\"","CreatedAt":"2020-04-23 17:44:02 -0400 EDT","ID":"f7f170fde076","Image":"gcr.io/golang-org/playground-sandbox-gvisor:latest","Labels":"","LocalVolumes":"0","Mounts":"","Names":"play_run_a02cfe67","Networks":"none","Ports":"","RunningFor":"8 seconds ago","Size":"0B","Status":"Up 7 seconds"} +{"Command":"\"/usr/local/bin/play…\"","CreatedAt":"2020-04-23 17:44:02 -0400 EDT","ID":"af872e55a773","Image":"gcr.io/golang-org/playground-sandbox-gvisor:latest","Labels":"","LocalVolumes":"0","Mounts":"","Names":"play_run_0a69c3e8","Networks":"none","Ports":"","RunningFor":"8 seconds ago","Size":"0B","Status":"Up 7 seconds"}`, + want: []dockerContainer{ + {ID: "f7f170fde076", Image: "gcr.io/golang-org/playground-sandbox-gvisor:latest", Names: "play_run_a02cfe67"}, + {ID: "af872e55a773", Image: "gcr.io/golang-org/playground-sandbox-gvisor:latest", Names: "play_run_0a69c3e8"}, + }, + wantErr: false, + }, + { + desc: "empty output", + wantErr: false, + }, + { + desc: "malformatted output", + output: `xyzzy{}`, + wantErr: true, + }, + } + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + cs, err := parseDockerContainers([]byte(tc.output)) + if (err != nil) != tc.wantErr { + t.Errorf("parseDockerContainers(_) = %v, %v, wantErr: %v", cs, err, tc.wantErr) + } + if diff := cmp.Diff(tc.want, cs); diff != "" { + t.Errorf("parseDockerContainers() mismatch (-want +got):\n%s", diff) + } + }) + } +}