internal/upload: don't skip all uploads on the first report failure

A single failure of createReport should not prevent the upload of other
reports. Fix this by logging and proceeding when there is a failure.

To test this failure, and to generally make it easier to exercise upload
bugs, make the following testing improvements:

- Add regtest.RunProgAsOf, which is like RunProg but sets CounterTime
  (newly exposed) to a time in the past.
- Add regtest.NewIncProgram for the common use case of a program that
  just increments counters and exits.
- Export CreateTestUploadConfig and CreateTestUploadServer.
- Have CreateTestUploadServer implement its own cleanup.
- Add a testWriter to echo upload logs to t.Log.

Together, these helpers make it relatively easy to write an ad-hoc
upload test using only the public counter and upload APIs.

For golang/go#65970

Change-Id: I9f54ad22a1f69cc6162ebe5628ad3287b89bbee1
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/584400
Auto-Submit: Robert Findley <rfindley@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
diff --git a/internal/counter/counter_test.go b/internal/counter/counter_test.go
index e3d6ff5..9e41f26 100644
--- a/internal/counter/counter_test.go
+++ b/internal/counter/counter_test.go
@@ -225,7 +225,7 @@
 	t.Logf("GOOS %s GOARCH %s", runtime.GOOS, runtime.GOARCH)
 	setup(t)
 	defer restore()
-	now := counterTime().UTC()
+	now := CounterTime().UTC()
 	year, month, day := now.Date()
 	// preserve time location as done in (*file).filename.
 	testStartTime := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
@@ -314,7 +314,7 @@
 	setup(t)
 	// get all the 49 combinations of today and when the week ends
 	for i := 0; i < 7; i++ {
-		counterTime = future(i)
+		CounterTime = future(i)
 		for index := range "0123456" {
 			os.WriteFile(filepath.Join(telemetry.Default.LocalDir(), "weekends"), []byte{byte(index + '0')}, 0666)
 			var f file
@@ -511,7 +511,7 @@
 }
 
 func restore() {
-	counterTime = time.Now().UTC
+	CounterTime = func() time.Time { return time.Now().UTC() }
 }
 
 func (f *file) New(name string) *Counter {
diff --git a/internal/counter/file.go b/internal/counter/file.go
index 12181b2..43297f9 100644
--- a/internal/counter/file.go
+++ b/internal/counter/file.go
@@ -256,9 +256,9 @@
 
 func nop() {}
 
-// counterTime returns the current UTC time.
+// CounterTime returns the current UTC time.
 // Mutable for testing.
-var counterTime = func() time.Time {
+var CounterTime = func() time.Time {
 	return time.Now().UTC()
 }
 
@@ -280,7 +280,7 @@
 		previous.close()
 	}
 
-	name, expire, err := f.filename(counterTime())
+	name, expire, err := f.filename(CounterTime())
 	if err != nil {
 		// This could be mode == "off" (when rotate is called for the first time)
 		ret := nop
diff --git a/internal/counter/rotate_test.go b/internal/counter/rotate_test.go
index 9e8ae49..0175df7 100644
--- a/internal/counter/rotate_test.go
+++ b/internal/counter/rotate_test.go
@@ -70,7 +70,7 @@
 	}
 	// move into the future and rotate the file, remapping it
 	now := getnow()
-	counterTime = func() time.Time { return now.Add(7 * 24 * time.Hour) }
+	CounterTime = func() time.Time { return now.Add(7 * 24 * time.Hour) }
 	f.rotate()
 
 	// c has value 0 in the new file
@@ -101,7 +101,7 @@
 
 	// simulate failure to remap
 	oldmap := memmap
-	counterTime = func() time.Time { return now.Add(14 * 24 * time.Hour) }
+	CounterTime = func() time.Time { return now.Add(14 * 24 * time.Hour) }
 	memmap = func(*os.File, *mmap.Data) (mmap.Data, error) { return mmap.Data{}, fmt.Errorf("too bad") }
 	f.rotate()
 	memmap = oldmap
@@ -135,7 +135,7 @@
 
 // return the current date according to counterTime()
 func getnow() time.Time {
-	year, month, day := counterTime().Date()
+	year, month, day := CounterTime().Date()
 	now := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
 	return now
 }
@@ -187,7 +187,7 @@
 		}
 		fd.Close()
 	}
-	counterTime = func() time.Time { return now.Add(7 * 24 * time.Hour) }
+	CounterTime = func() time.Time { return now.Add(7 * 24 * time.Hour) }
 	f.rotate()
 	fi, err := os.ReadDir(telemetry.Default.LocalDir())
 	if err != nil || len(fi) != 3 {
diff --git a/internal/regtest/regtest.go b/internal/regtest/regtest.go
index 51198eb..d0d6a7d 100644
--- a/internal/regtest/regtest.go
+++ b/internal/regtest/regtest.go
@@ -8,6 +8,7 @@
 
 import (
 	"fmt"
+	"log"
 	"os"
 	"os/exec"
 	"path/filepath"
@@ -15,18 +16,23 @@
 	"runtime/debug"
 	"strings"
 	"testing"
+	"time"
 
+	"golang.org/x/telemetry/counter"
 	"golang.org/x/telemetry/counter/countertest"
+	internalcounter "golang.org/x/telemetry/internal/counter"
 	"golang.org/x/telemetry/internal/telemetry"
 )
 
 const (
 	telemetryDirEnvVar = "_COUNTERTEST_RUN_TELEMETRY_DIR"
+	asofEnvVar         = "_COUNTERTEST_ASOF"
 	entryPointEnvVar   = "_COUNTERTEST_ENTRYPOINT"
 )
 
 var (
 	telemetryDirEnvVarValue = os.Getenv(telemetryDirEnvVar)
+	asofEnvVarValue         = os.Getenv(asofEnvVar)
 	entryPointEnvVarValue   = os.Getenv(entryPointEnvVar)
 )
 
@@ -46,6 +52,16 @@
 	if telemetryDirEnvVarValue != "" && entryPointEnvVarValue == name {
 		// We are running the separate process that was spawned by RunProg.
 		fmt.Fprintf(os.Stderr, "running program %q\n", name)
+		if asofEnvVarValue != "" {
+			asof, err := time.Parse("2006-01-02", asofEnvVarValue)
+			if err != nil {
+				log.Fatalf("error parsing asof time %q: %v", asof, err)
+			}
+			fmt.Fprintf(os.Stderr, "setting counter time to %s\n", name)
+			internalcounter.CounterTime = func() time.Time {
+				return asof
+			}
+		}
 		countertest.Open(telemetryDirEnvVarValue)
 		os.Exit(fn())
 	}
@@ -62,6 +78,17 @@
 	return Program(name)
 }
 
+// NewIncProgram returns a basic program that increments the given counters and
+// exits with status 0.
+func NewIncProgram(t *testing.T, name string, counters ...string) Program {
+	return NewProgram(t, name, func() int {
+		for _, c := range counters {
+			counter.Inc(c)
+		}
+		return 0
+	})
+}
+
 // registeredPrograms stores all registered program names to detect duplicate registrations.
 var registeredPrograms = make(map[string]map[string]bool) // test name -> program name -> exist
 
@@ -70,6 +97,12 @@
 // but all the programs must be registered with NewProgram before the first
 // call to RunProg.
 func RunProg(t *testing.T, telemetryDir string, prog Program) ([]byte, error) {
+	return RunProgAsOf(t, telemetryDir, time.Time{}, prog)
+}
+
+// RunProgAsOf is like RunProg, but executes the program as of a specific
+// counter time.
+func RunProgAsOf(t *testing.T, telemetryDir string, asof time.Time, prog Program) ([]byte, error) {
 	if telemetryDirEnvVarValue != "" {
 		fmt.Fprintf(os.Stderr, "unknown program %q\n %s %s", prog, telemetryDirEnvVarValue, entryPointEnvVarValue)
 		os.Exit(2)
@@ -83,6 +116,9 @@
 	// Spawn a subprocess to run the 'prog' by setting telemetryDirEnvVar.
 	cmd := exec.Command(testBin, "-test.run", fmt.Sprintf("^%s$", testName))
 	cmd.Env = append(os.Environ(), telemetryDirEnvVar+"="+telemetryDir, entryPointEnvVar+"="+string(prog))
+	if !asof.IsZero() {
+		cmd.Env = append(cmd.Env, asofEnvVar+"="+asof.Format("2006-01-02"))
+	}
 	return cmd.CombinedOutput()
 }
 
diff --git a/internal/upload/dates_test.go b/internal/upload/dates_test.go
index 1b94244..4dba177 100644
--- a/internal/upload/dates_test.go
+++ b/internal/upload/dates_test.go
@@ -43,12 +43,11 @@
 	if out, err := regtest.RunProg(t, telemetryDir, prog); err != nil {
 		t.Fatalf("failed to run program: %s", out)
 	}
-	uc := createTestUploadConfig(t, []string{"knownCounter"}, []string{"aStack"})
+	uc := CreateTestUploadConfig(t, []string{"knownCounter"}, []string{"aStack"})
 	env := configtest.LocalProxyEnv(t, uc, "v1.2.3")
 
 	// Start upload server
-	srv, uploaded := createTestUploadServer(t)
-	defer srv.Close()
+	srv, uploaded := CreateTestUploadServer(t)
 
 	// make it impossible to write a log by creating a non-directory with the log's name
 	logName := filepath.Join(telemetryDir, "debug")
@@ -75,7 +74,9 @@
 	// and scheduled to get expired in the future.
 	// Let's pretend telemetry was enabled a year ago by mutating the mode file,
 	// we are in the future, and test if the count files are successfully uploaded.
-	uploader.dir.SetModeAsOf("on", uploader.startTime.Add(-365*24*time.Hour).UTC())
+	if err := uploader.dir.SetModeAsOf("on", uploader.startTime.Add(-365*24*time.Hour).UTC()); err != nil {
+		t.Fatal(err)
+	}
 	uploadedContent, fname := subtest(t, uploader) // TODO(hyangah) : inline
 
 	if want, got := [][]byte{uploadedContent}, uploaded(); !reflect.DeepEqual(want, got) {
@@ -88,10 +89,11 @@
 	}
 }
 
-// createTestUploadServer creates a test server that records the uploaded data.
-func createTestUploadServer(t *testing.T) (*httptest.Server, func() [][]byte) {
+// CreateTestUploadServer creates a test server that records the uploaded data.
+// The server is closed as part of cleaning up t.
+func CreateTestUploadServer(t *testing.T) (*httptest.Server, func() [][]byte) {
 	s := &uploadQueue{}
-	return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+	srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
 		buf, err := io.ReadAll(r.Body)
 		if err != nil {
 			t.Errorf("invalid request received: %v", err)
@@ -99,7 +101,9 @@
 			return
 		}
 		s.Append(buf)
-	})), s.Get
+	}))
+	t.Cleanup(srv.Close)
+	return srv, s.Get
 }
 
 // failingUploadServer creates a test server that returns errors
@@ -128,7 +132,7 @@
 	if out, err := regtest.RunProg(t, telemetryDir, prog); err != nil {
 		t.Fatalf("failed to run program: %s", out)
 	}
-	uc := createTestUploadConfig(t, []string{"knownCounter"}, []string{"aStack"})
+	uc := CreateTestUploadConfig(t, []string{"knownCounter"}, []string{"aStack"})
 	env := configtest.LocalProxyEnv(t, uc, "v1.2.3")
 
 	// Start upload server
@@ -159,7 +163,9 @@
 	// and scheduled to get expired in the future.
 	// Let's pretend telemetry was enabled a year ago by mutating the mode file,
 	// we are in the future, and test if the count files are successfully uploaded.
-	dir.SetModeAsOf("on", uploader.startTime.Add(-365*24*time.Hour).UTC())
+	if err := dir.SetModeAsOf("on", uploader.startTime.Add(-365*24*time.Hour).UTC()); err != nil {
+		t.Fatal(err)
+	}
 	_, fname := subtest(t, uploader)
 	// check that fname does not exist
 	lname := filepath.Join(uploader.dir.LocalDir(), fname)
@@ -189,7 +195,7 @@
 	return s.data
 }
 
-func createTestUploadConfig(t *testing.T, counterNames, stackCounterNames []string) *telemetry.UploadConfig {
+func CreateTestUploadConfig(t *testing.T, counterNames, stackCounterNames []string) *telemetry.UploadConfig {
 	goVersion, progVersion, progName := regtest.ProgInfo(t)
 	GOOS, GOARCH := runtime.GOOS, runtime.GOARCH
 	programConfig := &telemetry.ProgramConfig{
@@ -228,7 +234,7 @@
 		t.Fatalf("failed to run program: %s", out)
 	}
 	cs := readCountFileInfo(t, filepath.Join(telemetryDir, "local"))
-	uc := createTestUploadConfig(t, nil, []string{"aStack"})
+	uc := CreateTestUploadConfig(t, nil, []string{"aStack"})
 	env := configtest.LocalProxyEnv(t, uc, "v1.2.3")
 
 	const today = "2020-01-24"
@@ -327,8 +333,7 @@
 		t.Run(tx.name, func(t *testing.T) {
 			telemetryDir := t.TempDir()
 
-			srv, uploaded := createTestUploadServer(t)
-			defer srv.Close()
+			srv, uploaded := CreateTestUploadServer(t)
 
 			dbg := filepath.Join(telemetryDir, "debug")
 			if err := os.MkdirAll(dbg, 0777); err != nil {
@@ -343,7 +348,9 @@
 				t.Fatal(err)
 			}
 			defer uploader.Close()
-			uploader.dir.SetModeAsOf("on", telemetryEnableTime)
+			if err := uploader.dir.SetModeAsOf("on", telemetryEnableTime); err != nil {
+				t.Fatal(err)
+			}
 			uploader.startTime = mustParseDate(tx.today)
 
 			wantUploadCount := doTest(t, uploader, &tx, cs)
diff --git a/internal/upload/first_test.go b/internal/upload/first_test.go
index fcb5f3d..9ee7268 100644
--- a/internal/upload/first_test.go
+++ b/internal/upload/first_test.go
@@ -15,8 +15,7 @@
 // In practice this test runs last, so is somewhat superfluous,
 // but it also checks that uploads and reads from the channel are matched
 func TestSimpleServer(t *testing.T) {
-	srv, uploaded := createTestUploadServer(t)
-	defer srv.Close()
+	srv, uploaded := CreateTestUploadServer(t)
 
 	url := srv.URL
 	resp, err := http.Post(url, "text/plain", strings.NewReader("hello"))
diff --git a/internal/upload/reports.go b/internal/upload/reports.go
index 262cc30..e97f43f 100644
--- a/internal/upload/reports.go
+++ b/internal/upload/reports.go
@@ -55,7 +55,8 @@
 		}
 		fname, err := u.createReport(earliest[expiry], expiry, files, lastWeek)
 		if err != nil {
-			return nil, err
+			u.logger.Printf("Failed to create report for %s: %v", expiry, err)
+			continue
 		}
 		if fname != "" {
 			u.logger.Printf("Ready to upload: %s", filepath.Base(fname))
@@ -157,12 +158,9 @@
 			succeeded = true
 		}
 	}
-	// TODO(rfindley): There's a bug here: we return an error if a count file
-	// parses, but has no counter.
-	//
-	// Furthermore, this error causes us to bail out, and return no reports. We
-	// should only fail the report containing the count file.
 	if !succeeded {
+		// TODO(rfindley): this isn't right: a count file is not unparseable just
+		// because it has no counters
 		return "", fmt.Errorf("all %d count files for %s were unparseable", len(countFiles), expiryDate)
 	}
 	// 1. generate the local report
diff --git a/internal/upload/run_test.go b/internal/upload/run_test.go
new file mode 100644
index 0000000..6690588
--- /dev/null
+++ b/internal/upload/run_test.go
@@ -0,0 +1,141 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package upload_test
+
+import (
+	"strings"
+	"testing"
+	"time"
+
+	"golang.org/x/telemetry/internal/configtest"
+	"golang.org/x/telemetry/internal/regtest"
+	"golang.org/x/telemetry/internal/telemetry"
+	"golang.org/x/telemetry/internal/testenv"
+	"golang.org/x/telemetry/internal/upload"
+)
+
+// createUploader sets up an upload environment for the provided test, with a
+// fake proxy allowing the given counters, and a fake upload server.
+//
+// The returned Uploader is ready to upload the given directory.
+// The second return is a function to fetch all uploaded reports.
+//
+// For convenience, createUploader also sets the mode in telemetryDir to "on",
+// back-dated to a time in the past. Callers that want to run the upload with a
+// different mode can reset as necessary.
+//
+// All associated resources are cleaned up with t.Clean.
+func createUploader(t *testing.T, telemetryDir string, counters, stackCounters []string) (*upload.Uploader, func() [][]byte) {
+	t.Helper()
+
+	if err := telemetry.NewDir(telemetryDir).SetModeAsOf("on", time.Now().Add(-365*24*time.Hour)); err != nil {
+		t.Fatal(err)
+	}
+
+	srv, uploaded := upload.CreateTestUploadServer(t)
+	uc := upload.CreateTestUploadConfig(t, counters, stackCounters)
+	env := configtest.LocalProxyEnv(t, uc, "v1.2.3")
+
+	uploader, err := upload.NewUploader(upload.RunConfig{
+		TelemetryDir: telemetryDir,
+		UploadURL:    srv.URL,
+		LogWriter:    testWriter{t},
+		Env:          env,
+	})
+	if err != nil {
+		t.Fatal(err)
+	}
+	t.Cleanup(func() { uploader.Close() })
+	return uploader, uploaded
+}
+
+// testWriter is an io.Writer wrapping t.Log.
+type testWriter struct {
+	t *testing.T
+}
+
+func (w testWriter) Write(p []byte) (n int, err error) {
+	w.t.Log(strings.TrimSuffix(string(p), "\n")) // trim newlines added by logging
+	return len(p), nil
+}
+
+func TestUploader_MultipleUploads(t *testing.T) {
+	// This test checks that Uploader.Run produces multiple reports when counters
+	// span more than a week.
+
+	testenv.SkipIfUnsupportedPlatform(t)
+
+	// This program is run at two different dates.
+	prog := regtest.NewIncProgram(t, "prog", "counter1")
+
+	// Create two counter files to upload, at least a week apart.
+	telemetryDir := t.TempDir()
+	asof1 := time.Now().Add(-15 * 24 * time.Hour)
+	if out, err := regtest.RunProgAsOf(t, telemetryDir, asof1, prog); err != nil {
+		t.Fatalf("failed to run program: %s", out)
+	}
+	asof2 := time.Now().Add(-8 * 24 * time.Hour)
+	if out, err := regtest.RunProgAsOf(t, telemetryDir, asof2, prog); err != nil {
+		t.Fatalf("failed to run program: %s", out)
+	}
+
+	uploader, getUploads := createUploader(t, telemetryDir, []string{"counter1", "counter2"}, nil)
+	if err := uploader.Run(); err != nil {
+		t.Fatal(err)
+	}
+
+	uploads := getUploads()
+	if got, want := len(uploads), 2; got != want {
+		t.Fatalf("got %d uploads, want %d", got, want)
+	}
+	for _, upload := range uploads {
+		report := string(upload)
+		if !strings.Contains(report, "counter1") {
+			t.Errorf("Didn't get an upload for counter1. Report:\n%s", report)
+		}
+	}
+}
+
+func TestUploader_EmptyUpload(t *testing.T) {
+	// This test verifies that an empty counter file does not cause uploads of
+	// another week's reports to fail.
+
+	testenv.SkipIfUnsupportedPlatform(t)
+
+	// prog1 runs in week 1, and increments no counter.
+	prog1 := regtest.NewIncProgram(t, "prog1")
+	// prog2 runs in week 2.
+	prog2 := regtest.NewIncProgram(t, "prog2", "week2")
+
+	telemetryDir := t.TempDir()
+
+	// Create two counter files to upload, at least a week apart.
+	// Week 1 has no counters, which in the past caused the both uploads to fail.
+	asof1 := time.Now().Add(-15 * 24 * time.Hour)
+	if out, err := regtest.RunProgAsOf(t, telemetryDir, asof1, prog1); err != nil {
+		t.Fatalf("failed to run program: %s", out)
+	}
+	asof2 := time.Now().Add(-8 * 24 * time.Hour)
+	if out, err := regtest.RunProgAsOf(t, telemetryDir, asof2, prog2); err != nil {
+		t.Fatalf("failed to run program: %s", out)
+	}
+
+	uploader, getUploads := createUploader(t, telemetryDir, []string{"week1", "week2"}, nil)
+	if err := uploader.Run(); err != nil {
+		t.Fatal(err)
+	}
+
+	// Check that we got one upload, for week 2.
+	uploads := getUploads()
+	if got, want := len(uploads), 1; got != want {
+		t.Fatalf("got %d uploads, want %d", got, want)
+	}
+	for _, upload := range uploads {
+		report := string(upload)
+		if !strings.Contains(report, "week2") {
+			t.Errorf("Didn't get an upload for week2. Report:\n%s", report)
+		}
+	}
+}
diff --git a/start_test.go b/start_test.go
index 623462c..66da491 100644
--- a/start_test.go
+++ b/start_test.go
@@ -106,7 +106,9 @@
 
 	countertest.Open(mustGetEnv(telemetryDirEnv))
 	counter.Inc("teststart/counter")
-	it.Default.SetModeAsOf("on", time.Now().Add(-8*24*time.Hour))
+	if err := it.Default.SetModeAsOf("on", time.Now().Add(-8*24*time.Hour)); err != nil {
+		log.Fatalf("setting mode: %v", err)
+	}
 
 	res := telemetry.Start(telemetry.Config{
 		// No need to set TelemetryDir since the Default dir is already set by countertest.Open.