start,internal/upload: add two tests for concurrent upload

Add two tests, one for telemetry.Start and an other for upload.Run,
which execute the upload concurrently.

The test for telemetry.Start succeeds, due to the concurrency safety
from the exclusive acquisition of the upload.token file. The test for
upload.Run results in incorrect upload counts and occasional invalid
report json (due to write shearing). Despite the upload.token guard,
upload.Run should be more concurrency safe, since there is still a race
condition when the upload.token is released. A subsequent CL will add
more safeguards.

For golang/go#65970

Change-Id: Ic7e57b1ee794a58340901289930250bf5114fdf6
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/586141
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
diff --git a/internal/upload/run_test.go b/internal/upload/run_test.go
index 5898ce7..e80eca7 100644
--- a/internal/upload/run_test.go
+++ b/internal/upload/run_test.go
@@ -15,6 +15,7 @@
 	"regexp"
 	"runtime"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -468,6 +469,7 @@
 		})
 	}
 }
+
 func TestRun_DebugLog(t *testing.T) {
 	// This test verifies that the uploader honors the telemetry mode, as well as
 	// its asof date.
@@ -540,6 +542,60 @@
 	}
 }
 
+func TestRun_Concurrent(t *testing.T) {
+	t.Skip("Run is not concurrency safe")
+
+	testenv.SkipIfUnsupportedPlatform(t)
+
+	prog := regtest.NewIncProgram(t, "prog1", "counter")
+
+	telemetryDir := t.TempDir()
+	now := time.Now()
+
+	// Seed two weeks of uploads.
+	// These should *all* be uploaded as they will be neither too old,
+	// nor too new.
+	incCount := 0
+	for i := -21; i < -7; i++ {
+		incCount++
+		asof := now.Add(time.Duration(i) * 24 * time.Hour)
+		if out, err := regtest.RunProgAsOf(t, telemetryDir, asof, prog); err != nil {
+			t.Fatalf("failed to run program: %s", out)
+		}
+	}
+
+	cfg, getUploads := runConfig(t, telemetryDir, []string{"counter"}, nil)
+
+	var wg sync.WaitGroup
+	for i := 0; i < 5; i++ {
+		i := i
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			if err := upload.Run(cfg); err != nil {
+				t.Errorf("upload.Run #%d failed: %v", i, err)
+			}
+		}()
+	}
+	wg.Wait()
+
+	uploads := getUploads()
+	uploadedCount := 0
+	for i, upload := range uploads {
+		var got telemetry.Report
+		if err := json.Unmarshal(upload, &got); err != nil {
+			t.Fatal(err)
+		}
+		if got, want := len(got.Programs), 1; got != want {
+			t.Fatalf("got %d programs in upload #%d, want %d", got, i, want)
+		}
+		uploadedCount += int(got.Programs[0].Counters["counter"])
+	}
+	if uploadedCount != incCount {
+		t.Errorf("uploaded %d total observations, want %d", uploadedCount, incCount)
+	}
+}
+
 func getDebugLogs(t *testing.T, debugDir string) []string {
 	t.Helper()
 	if stat, err := os.Stat(debugDir); err != nil || !stat.IsDir() {
diff --git a/start_test.go b/start_test.go
index b74ed89..2a32d98 100644
--- a/start_test.go
+++ b/start_test.go
@@ -12,6 +12,7 @@
 	"os"
 	"os/exec"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 
@@ -19,6 +20,7 @@
 	"golang.org/x/telemetry/counter"
 	"golang.org/x/telemetry/counter/countertest"
 	"golang.org/x/telemetry/internal/configtest"
+	ic "golang.org/x/telemetry/internal/counter"
 	"golang.org/x/telemetry/internal/regtest"
 	it "golang.org/x/telemetry/internal/telemetry"
 	"golang.org/x/telemetry/internal/testenv"
@@ -30,18 +32,85 @@
 	runStartEnv     = "X_TELEMETRY_TEST_START"
 	telemetryDirEnv = "X_TELEMETRY_TEST_START_TELEMETRY_DIR"
 	uploadURLEnv    = "X_TELEMETRY_TEST_START_UPLOAD_URL"
+	asofEnv         = "X_TELEMETRY_TEST_START_ASOF"
 )
 
 func TestMain(m *testing.M) {
 	// TestStart can't use internal/regtest, because Start itself also uses
 	// fork+exec to start a subprocess, which does not interact well with the
 	// fork+exec trick used by regtest.RunProg.
-	if os.Getenv(runStartEnv) != "" {
-		os.Exit(runStart())
+	if prog := os.Getenv(runStartEnv); prog != "" {
+		os.Exit(runProg(prog))
 	}
 	os.Exit(m.Run())
 }
 
+// runProg runs the given program.
+// See the switch statement below.
+func runProg(prog string) int {
+
+	mustGetEnv := func(envvar string) string {
+		v := os.Getenv(envvar)
+		if v == "" {
+			log.Fatalf("missing required environment var %q", envvar)
+		}
+		return v
+	}
+
+	// Get the fake time used by all programs.
+	asof, err := time.Parse("2006-01-02", mustGetEnv(asofEnv))
+	if err != nil {
+		log.Fatalf("parsing %s: %v", asofEnv, err)
+	}
+
+	// Set global state.
+	ic.CounterTime = func() time.Time { return asof } // must be done before Open
+	countertest.Open(mustGetEnv(telemetryDirEnv))
+
+	switch prog {
+	case "setmode":
+		// Use the modified time above for the asof time.
+		if err := it.Default.SetModeAsOf("on", asof); err != nil {
+			log.Fatalf("setting mode: %v", err)
+		}
+	case "inc":
+		// (CounterTime is already set above)
+		counter.Inc("teststart/counter")
+
+	case "start":
+		res := telemetry.Start(telemetry.Config{
+			// No need to set TelemetryDir since the Default dir is already set by countertest.Open.
+			Upload:          true,
+			UploadURL:       mustGetEnv(uploadURLEnv),
+			UploadStartTime: asof,
+		})
+		res.Wait()
+	default:
+		log.Fatalf("unknown program %q", prog)
+	}
+	return 0
+}
+
+func execProg(t *testing.T, telemetryDir, prog string, asof time.Time, env ...string) {
+	// Run the runStart function below, via a fork+exec trick.
+	exe, err := os.Executable()
+	if err != nil {
+		t.Error(err)
+		return
+	}
+	cmd := exec.Command(exe, "** TestStart **") // this unused arg is just for ps(1)
+	cmd.Stderr = os.Stderr
+	cmd.Env = os.Environ()
+	cmd.Env = append(cmd.Env, asofEnv+"="+asof.Format("2006-01-02"))
+	cmd.Env = append(cmd.Env, telemetryDirEnv+"="+telemetryDir)
+	cmd.Env = append(cmd.Env, runStartEnv+"="+prog) // see TestMain
+	cmd.Env = append(cmd.Env, env...)
+	out, err := cmd.Output()
+	if err != nil {
+		t.Errorf("program failed unexpectedly (%v)\n%s", err, out)
+	}
+}
+
 func TestStart(t *testing.T) {
 	testenv.SkipIfUnsupportedPlatform(t)
 	testenv.MustHaveExec(t)
@@ -49,7 +118,7 @@
 	// This test sets up a telemetry environment, and then runs a test program
 	// that increments a counter, and uploads via telemetry.Start.
 
-	t.Setenv(telemetryDirEnv, t.TempDir())
+	telemetryDir := t.TempDir()
 
 	uploaded := false
 	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
@@ -63,53 +132,71 @@
 			}
 		}
 	}))
-	t.Setenv(uploadURLEnv, server.URL)
+	uploadEnv := []string{uploadURLEnv + "=" + server.URL}
 
 	uc := regtest.CreateTestUploadConfig(t, []string{"teststart/counter"}, nil)
-	env := configtest.LocalProxyEnv(t, uc, "v1.2.3")
-	for _, e := range env {
-		kv := strings.SplitN(e, "=", 2)
-		t.Setenv(kv[0], kv[1])
-	}
+	uploadEnv = append(uploadEnv, configtest.LocalProxyEnv(t, uc, "v1.2.3")...)
 
-	// Run the runStart function below, via a fork+exec trick.
-	exe, err := os.Executable()
-	if err != nil {
-		t.Fatal(err)
-	}
-	cmd := exec.Command(exe, "** TestStart **")      // this unused arg is just for ps(1)
-	cmd.Env = append(os.Environ(), runStartEnv+"=1") // see TestMain
-	out, err := cmd.CombinedOutput()
-	if err != nil {
-		t.Fatalf("program failed unexpectedly (%v)\n%s", err, out)
-	}
+	// Script programs.
+	now := time.Now()
+	execProg(t, telemetryDir, "setmode", now.Add(-30*24*time.Hour)) // back-date telemetry acceptance
+	execProg(t, telemetryDir, "inc", now.Add(-8*24*time.Hour))      // increment the counter
+	execProg(t, telemetryDir, "start", now, uploadEnv...)           // run start
 
 	if !uploaded {
 		t.Fatalf("no upload occurred on %v", os.Getpid())
 	}
 }
 
-func runStart() int {
-	mustGetEnv := func(envvar string) string {
-		v := os.Getenv(envvar)
-		if v == "" {
-			log.Fatalf("missing required environment var %q", envvar)
+func TestConcurrentStart(t *testing.T) {
+	testenv.SkipIfUnsupportedPlatform(t)
+	testenv.MustHaveExec(t)
+
+	telemetryDir := t.TempDir()
+
+	var uploadMu sync.Mutex
+	uploads := map[string]int{} // date -> uploads
+	server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+		key := r.URL.Path
+		if idx := strings.LastIndex(r.URL.Path, "/"); idx >= 0 {
+			key = r.URL.Path[idx+len("/"):]
 		}
-		return v
+		uploadMu.Lock()
+		uploads[key]++
+		uploadMu.Unlock()
+	}))
+	uploadEnv := []string{uploadURLEnv + "=" + server.URL}
+
+	uc := regtest.CreateTestUploadConfig(t, []string{"teststart/counter"}, nil)
+	uploadEnv = append(uploadEnv, configtest.LocalProxyEnv(t, uc, "v1.2.3")...)
+
+	now := time.Now()
+	execProg(t, telemetryDir, "setmode", now.Add(-365*24*time.Hour)) // back-date telemetry acceptance
+	execProg(t, telemetryDir, "inc", now.Add(-8*24*time.Hour))       // increment the counter
+
+	// Populate three weeks of counters to upload.
+	for i := -28; i < -7; i++ { // Populate three weeks of counters to upload.
+		execProg(t, telemetryDir, "inc", now.Add(time.Duration(i)*24*time.Hour))
 	}
 
-	countertest.Open(mustGetEnv(telemetryDirEnv))
-	counter.Inc("teststart/counter")
-	if err := it.Default.SetModeAsOf("on", time.Now().Add(-8*24*time.Hour)); err != nil {
-		log.Fatalf("setting mode: %v", err)
+	// Run start concurrently.
+	var wg sync.WaitGroup
+	for i := 0; i < 10; i++ {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+			execProg(t, telemetryDir, "start", now, uploadEnv...)
+		}()
 	}
+	wg.Wait()
 
-	res := telemetry.Start(telemetry.Config{
-		// No need to set TelemetryDir since the Default dir is already set by countertest.Open.
-		Upload:          true,
-		UploadURL:       mustGetEnv(uploadURLEnv),
-		UploadStartTime: time.Now().Add(8 * 24 * time.Hour),
-	})
-	res.Wait()
-	return 0
+	// Expect exactly three weeks to be uploaded.
+	if got, want := len(uploads), 3; got != want {
+		t.Errorf("got %d report dates, want %d", got, want)
+	}
+	for asof, n := range uploads {
+		if n != 1 {
+			t.Errorf("got %d reports for %s, want 1", n, asof)
+		}
+	}
 }