internal/upload: don't skip all uploads on the first report failure
A single failure of createReport should not prevent the upload of other
reports. Fix this by logging and proceeding when there is a failure.
To test this failure, and to generally make it easier to exercise upload
bugs, make the following testing improvements:
- Add regtest.RunProgAsOf, which is like RunProg but sets CounterTime
(newly exposed) to a time in the past.
- Add regtest.NewIncProgram for the common use case of a program that
just increments counters and exits.
- Export CreateTestUploadConfig and CreateTestUploadServer.
- Have CreateTestUploadServer implement its own cleanup.
- Add a testWriter to echo upload logs to t.Log.
Together, these helpers make it relatively easy to write an ad-hoc
upload test using only the public counter and upload APIs.
For golang/go#65970
Change-Id: I9f54ad22a1f69cc6162ebe5628ad3287b89bbee1
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/584400
Auto-Submit: Robert Findley <rfindley@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
diff --git a/internal/counter/counter_test.go b/internal/counter/counter_test.go
index e3d6ff5..9e41f26 100644
--- a/internal/counter/counter_test.go
+++ b/internal/counter/counter_test.go
@@ -225,7 +225,7 @@
t.Logf("GOOS %s GOARCH %s", runtime.GOOS, runtime.GOARCH)
setup(t)
defer restore()
- now := counterTime().UTC()
+ now := CounterTime().UTC()
year, month, day := now.Date()
// preserve time location as done in (*file).filename.
testStartTime := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
@@ -314,7 +314,7 @@
setup(t)
// get all the 49 combinations of today and when the week ends
for i := 0; i < 7; i++ {
- counterTime = future(i)
+ CounterTime = future(i)
for index := range "0123456" {
os.WriteFile(filepath.Join(telemetry.Default.LocalDir(), "weekends"), []byte{byte(index + '0')}, 0666)
var f file
@@ -511,7 +511,7 @@
}
func restore() {
- counterTime = time.Now().UTC
+ CounterTime = func() time.Time { return time.Now().UTC() }
}
func (f *file) New(name string) *Counter {
diff --git a/internal/counter/file.go b/internal/counter/file.go
index 12181b2..43297f9 100644
--- a/internal/counter/file.go
+++ b/internal/counter/file.go
@@ -256,9 +256,9 @@
func nop() {}
-// counterTime returns the current UTC time.
+// CounterTime returns the current UTC time.
// Mutable for testing.
-var counterTime = func() time.Time {
+var CounterTime = func() time.Time {
return time.Now().UTC()
}
@@ -280,7 +280,7 @@
previous.close()
}
- name, expire, err := f.filename(counterTime())
+ name, expire, err := f.filename(CounterTime())
if err != nil {
// This could be mode == "off" (when rotate is called for the first time)
ret := nop
diff --git a/internal/counter/rotate_test.go b/internal/counter/rotate_test.go
index 9e8ae49..0175df7 100644
--- a/internal/counter/rotate_test.go
+++ b/internal/counter/rotate_test.go
@@ -70,7 +70,7 @@
}
// move into the future and rotate the file, remapping it
now := getnow()
- counterTime = func() time.Time { return now.Add(7 * 24 * time.Hour) }
+ CounterTime = func() time.Time { return now.Add(7 * 24 * time.Hour) }
f.rotate()
// c has value 0 in the new file
@@ -101,7 +101,7 @@
// simulate failure to remap
oldmap := memmap
- counterTime = func() time.Time { return now.Add(14 * 24 * time.Hour) }
+ CounterTime = func() time.Time { return now.Add(14 * 24 * time.Hour) }
memmap = func(*os.File, *mmap.Data) (mmap.Data, error) { return mmap.Data{}, fmt.Errorf("too bad") }
f.rotate()
memmap = oldmap
@@ -135,7 +135,7 @@
// return the current date according to counterTime()
func getnow() time.Time {
- year, month, day := counterTime().Date()
+ year, month, day := CounterTime().Date()
now := time.Date(year, month, day, 0, 0, 0, 0, time.UTC)
return now
}
@@ -187,7 +187,7 @@
}
fd.Close()
}
- counterTime = func() time.Time { return now.Add(7 * 24 * time.Hour) }
+ CounterTime = func() time.Time { return now.Add(7 * 24 * time.Hour) }
f.rotate()
fi, err := os.ReadDir(telemetry.Default.LocalDir())
if err != nil || len(fi) != 3 {
diff --git a/internal/regtest/regtest.go b/internal/regtest/regtest.go
index 51198eb..d0d6a7d 100644
--- a/internal/regtest/regtest.go
+++ b/internal/regtest/regtest.go
@@ -8,6 +8,7 @@
import (
"fmt"
+ "log"
"os"
"os/exec"
"path/filepath"
@@ -15,18 +16,23 @@
"runtime/debug"
"strings"
"testing"
+ "time"
+ "golang.org/x/telemetry/counter"
"golang.org/x/telemetry/counter/countertest"
+ internalcounter "golang.org/x/telemetry/internal/counter"
"golang.org/x/telemetry/internal/telemetry"
)
const (
telemetryDirEnvVar = "_COUNTERTEST_RUN_TELEMETRY_DIR"
+ asofEnvVar = "_COUNTERTEST_ASOF"
entryPointEnvVar = "_COUNTERTEST_ENTRYPOINT"
)
var (
telemetryDirEnvVarValue = os.Getenv(telemetryDirEnvVar)
+ asofEnvVarValue = os.Getenv(asofEnvVar)
entryPointEnvVarValue = os.Getenv(entryPointEnvVar)
)
@@ -46,6 +52,16 @@
if telemetryDirEnvVarValue != "" && entryPointEnvVarValue == name {
// We are running the separate process that was spawned by RunProg.
fmt.Fprintf(os.Stderr, "running program %q\n", name)
+ if asofEnvVarValue != "" {
+ asof, err := time.Parse("2006-01-02", asofEnvVarValue)
+ if err != nil {
+ log.Fatalf("error parsing asof time %q: %v", asof, err)
+ }
+ fmt.Fprintf(os.Stderr, "setting counter time to %s\n", name)
+ internalcounter.CounterTime = func() time.Time {
+ return asof
+ }
+ }
countertest.Open(telemetryDirEnvVarValue)
os.Exit(fn())
}
@@ -62,6 +78,17 @@
return Program(name)
}
+// NewIncProgram returns a basic program that increments the given counters and
+// exits with status 0.
+func NewIncProgram(t *testing.T, name string, counters ...string) Program {
+ return NewProgram(t, name, func() int {
+ for _, c := range counters {
+ counter.Inc(c)
+ }
+ return 0
+ })
+}
+
// registeredPrograms stores all registered program names to detect duplicate registrations.
var registeredPrograms = make(map[string]map[string]bool) // test name -> program name -> exist
@@ -70,6 +97,12 @@
// but all the programs must be registered with NewProgram before the first
// call to RunProg.
func RunProg(t *testing.T, telemetryDir string, prog Program) ([]byte, error) {
+ return RunProgAsOf(t, telemetryDir, time.Time{}, prog)
+}
+
+// RunProgAsOf is like RunProg, but executes the program as of a specific
+// counter time.
+func RunProgAsOf(t *testing.T, telemetryDir string, asof time.Time, prog Program) ([]byte, error) {
if telemetryDirEnvVarValue != "" {
fmt.Fprintf(os.Stderr, "unknown program %q\n %s %s", prog, telemetryDirEnvVarValue, entryPointEnvVarValue)
os.Exit(2)
@@ -83,6 +116,9 @@
// Spawn a subprocess to run the 'prog' by setting telemetryDirEnvVar.
cmd := exec.Command(testBin, "-test.run", fmt.Sprintf("^%s$", testName))
cmd.Env = append(os.Environ(), telemetryDirEnvVar+"="+telemetryDir, entryPointEnvVar+"="+string(prog))
+ if !asof.IsZero() {
+ cmd.Env = append(cmd.Env, asofEnvVar+"="+asof.Format("2006-01-02"))
+ }
return cmd.CombinedOutput()
}
diff --git a/internal/upload/dates_test.go b/internal/upload/dates_test.go
index 1b94244..4dba177 100644
--- a/internal/upload/dates_test.go
+++ b/internal/upload/dates_test.go
@@ -43,12 +43,11 @@
if out, err := regtest.RunProg(t, telemetryDir, prog); err != nil {
t.Fatalf("failed to run program: %s", out)
}
- uc := createTestUploadConfig(t, []string{"knownCounter"}, []string{"aStack"})
+ uc := CreateTestUploadConfig(t, []string{"knownCounter"}, []string{"aStack"})
env := configtest.LocalProxyEnv(t, uc, "v1.2.3")
// Start upload server
- srv, uploaded := createTestUploadServer(t)
- defer srv.Close()
+ srv, uploaded := CreateTestUploadServer(t)
// make it impossible to write a log by creating a non-directory with the log's name
logName := filepath.Join(telemetryDir, "debug")
@@ -75,7 +74,9 @@
// and scheduled to get expired in the future.
// Let's pretend telemetry was enabled a year ago by mutating the mode file,
// we are in the future, and test if the count files are successfully uploaded.
- uploader.dir.SetModeAsOf("on", uploader.startTime.Add(-365*24*time.Hour).UTC())
+ if err := uploader.dir.SetModeAsOf("on", uploader.startTime.Add(-365*24*time.Hour).UTC()); err != nil {
+ t.Fatal(err)
+ }
uploadedContent, fname := subtest(t, uploader) // TODO(hyangah) : inline
if want, got := [][]byte{uploadedContent}, uploaded(); !reflect.DeepEqual(want, got) {
@@ -88,10 +89,11 @@
}
}
-// createTestUploadServer creates a test server that records the uploaded data.
-func createTestUploadServer(t *testing.T) (*httptest.Server, func() [][]byte) {
+// CreateTestUploadServer creates a test server that records the uploaded data.
+// The server is closed as part of cleaning up t.
+func CreateTestUploadServer(t *testing.T) (*httptest.Server, func() [][]byte) {
s := &uploadQueue{}
- return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
buf, err := io.ReadAll(r.Body)
if err != nil {
t.Errorf("invalid request received: %v", err)
@@ -99,7 +101,9 @@
return
}
s.Append(buf)
- })), s.Get
+ }))
+ t.Cleanup(srv.Close)
+ return srv, s.Get
}
// failingUploadServer creates a test server that returns errors
@@ -128,7 +132,7 @@
if out, err := regtest.RunProg(t, telemetryDir, prog); err != nil {
t.Fatalf("failed to run program: %s", out)
}
- uc := createTestUploadConfig(t, []string{"knownCounter"}, []string{"aStack"})
+ uc := CreateTestUploadConfig(t, []string{"knownCounter"}, []string{"aStack"})
env := configtest.LocalProxyEnv(t, uc, "v1.2.3")
// Start upload server
@@ -159,7 +163,9 @@
// and scheduled to get expired in the future.
// Let's pretend telemetry was enabled a year ago by mutating the mode file,
// we are in the future, and test if the count files are successfully uploaded.
- dir.SetModeAsOf("on", uploader.startTime.Add(-365*24*time.Hour).UTC())
+ if err := dir.SetModeAsOf("on", uploader.startTime.Add(-365*24*time.Hour).UTC()); err != nil {
+ t.Fatal(err)
+ }
_, fname := subtest(t, uploader)
// check that fname does not exist
lname := filepath.Join(uploader.dir.LocalDir(), fname)
@@ -189,7 +195,7 @@
return s.data
}
-func createTestUploadConfig(t *testing.T, counterNames, stackCounterNames []string) *telemetry.UploadConfig {
+func CreateTestUploadConfig(t *testing.T, counterNames, stackCounterNames []string) *telemetry.UploadConfig {
goVersion, progVersion, progName := regtest.ProgInfo(t)
GOOS, GOARCH := runtime.GOOS, runtime.GOARCH
programConfig := &telemetry.ProgramConfig{
@@ -228,7 +234,7 @@
t.Fatalf("failed to run program: %s", out)
}
cs := readCountFileInfo(t, filepath.Join(telemetryDir, "local"))
- uc := createTestUploadConfig(t, nil, []string{"aStack"})
+ uc := CreateTestUploadConfig(t, nil, []string{"aStack"})
env := configtest.LocalProxyEnv(t, uc, "v1.2.3")
const today = "2020-01-24"
@@ -327,8 +333,7 @@
t.Run(tx.name, func(t *testing.T) {
telemetryDir := t.TempDir()
- srv, uploaded := createTestUploadServer(t)
- defer srv.Close()
+ srv, uploaded := CreateTestUploadServer(t)
dbg := filepath.Join(telemetryDir, "debug")
if err := os.MkdirAll(dbg, 0777); err != nil {
@@ -343,7 +348,9 @@
t.Fatal(err)
}
defer uploader.Close()
- uploader.dir.SetModeAsOf("on", telemetryEnableTime)
+ if err := uploader.dir.SetModeAsOf("on", telemetryEnableTime); err != nil {
+ t.Fatal(err)
+ }
uploader.startTime = mustParseDate(tx.today)
wantUploadCount := doTest(t, uploader, &tx, cs)
diff --git a/internal/upload/first_test.go b/internal/upload/first_test.go
index fcb5f3d..9ee7268 100644
--- a/internal/upload/first_test.go
+++ b/internal/upload/first_test.go
@@ -15,8 +15,7 @@
// In practice this test runs last, so is somewhat superfluous,
// but it also checks that uploads and reads from the channel are matched
func TestSimpleServer(t *testing.T) {
- srv, uploaded := createTestUploadServer(t)
- defer srv.Close()
+ srv, uploaded := CreateTestUploadServer(t)
url := srv.URL
resp, err := http.Post(url, "text/plain", strings.NewReader("hello"))
diff --git a/internal/upload/reports.go b/internal/upload/reports.go
index 262cc30..e97f43f 100644
--- a/internal/upload/reports.go
+++ b/internal/upload/reports.go
@@ -55,7 +55,8 @@
}
fname, err := u.createReport(earliest[expiry], expiry, files, lastWeek)
if err != nil {
- return nil, err
+ u.logger.Printf("Failed to create report for %s: %v", expiry, err)
+ continue
}
if fname != "" {
u.logger.Printf("Ready to upload: %s", filepath.Base(fname))
@@ -157,12 +158,9 @@
succeeded = true
}
}
- // TODO(rfindley): There's a bug here: we return an error if a count file
- // parses, but has no counter.
- //
- // Furthermore, this error causes us to bail out, and return no reports. We
- // should only fail the report containing the count file.
if !succeeded {
+ // TODO(rfindley): this isn't right: a count file is not unparseable just
+ // because it has no counters
return "", fmt.Errorf("all %d count files for %s were unparseable", len(countFiles), expiryDate)
}
// 1. generate the local report
diff --git a/internal/upload/run_test.go b/internal/upload/run_test.go
new file mode 100644
index 0000000..6690588
--- /dev/null
+++ b/internal/upload/run_test.go
@@ -0,0 +1,141 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package upload_test
+
+import (
+ "strings"
+ "testing"
+ "time"
+
+ "golang.org/x/telemetry/internal/configtest"
+ "golang.org/x/telemetry/internal/regtest"
+ "golang.org/x/telemetry/internal/telemetry"
+ "golang.org/x/telemetry/internal/testenv"
+ "golang.org/x/telemetry/internal/upload"
+)
+
+// createUploader sets up an upload environment for the provided test, with a
+// fake proxy allowing the given counters, and a fake upload server.
+//
+// The returned Uploader is ready to upload the given directory.
+// The second return is a function to fetch all uploaded reports.
+//
+// For convenience, createUploader also sets the mode in telemetryDir to "on",
+// back-dated to a time in the past. Callers that want to run the upload with a
+// different mode can reset as necessary.
+//
+// All associated resources are cleaned up with t.Clean.
+func createUploader(t *testing.T, telemetryDir string, counters, stackCounters []string) (*upload.Uploader, func() [][]byte) {
+ t.Helper()
+
+ if err := telemetry.NewDir(telemetryDir).SetModeAsOf("on", time.Now().Add(-365*24*time.Hour)); err != nil {
+ t.Fatal(err)
+ }
+
+ srv, uploaded := upload.CreateTestUploadServer(t)
+ uc := upload.CreateTestUploadConfig(t, counters, stackCounters)
+ env := configtest.LocalProxyEnv(t, uc, "v1.2.3")
+
+ uploader, err := upload.NewUploader(upload.RunConfig{
+ TelemetryDir: telemetryDir,
+ UploadURL: srv.URL,
+ LogWriter: testWriter{t},
+ Env: env,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+ t.Cleanup(func() { uploader.Close() })
+ return uploader, uploaded
+}
+
+// testWriter is an io.Writer wrapping t.Log.
+type testWriter struct {
+ t *testing.T
+}
+
+func (w testWriter) Write(p []byte) (n int, err error) {
+ w.t.Log(strings.TrimSuffix(string(p), "\n")) // trim newlines added by logging
+ return len(p), nil
+}
+
+func TestUploader_MultipleUploads(t *testing.T) {
+ // This test checks that Uploader.Run produces multiple reports when counters
+ // span more than a week.
+
+ testenv.SkipIfUnsupportedPlatform(t)
+
+ // This program is run at two different dates.
+ prog := regtest.NewIncProgram(t, "prog", "counter1")
+
+ // Create two counter files to upload, at least a week apart.
+ telemetryDir := t.TempDir()
+ asof1 := time.Now().Add(-15 * 24 * time.Hour)
+ if out, err := regtest.RunProgAsOf(t, telemetryDir, asof1, prog); err != nil {
+ t.Fatalf("failed to run program: %s", out)
+ }
+ asof2 := time.Now().Add(-8 * 24 * time.Hour)
+ if out, err := regtest.RunProgAsOf(t, telemetryDir, asof2, prog); err != nil {
+ t.Fatalf("failed to run program: %s", out)
+ }
+
+ uploader, getUploads := createUploader(t, telemetryDir, []string{"counter1", "counter2"}, nil)
+ if err := uploader.Run(); err != nil {
+ t.Fatal(err)
+ }
+
+ uploads := getUploads()
+ if got, want := len(uploads), 2; got != want {
+ t.Fatalf("got %d uploads, want %d", got, want)
+ }
+ for _, upload := range uploads {
+ report := string(upload)
+ if !strings.Contains(report, "counter1") {
+ t.Errorf("Didn't get an upload for counter1. Report:\n%s", report)
+ }
+ }
+}
+
+func TestUploader_EmptyUpload(t *testing.T) {
+ // This test verifies that an empty counter file does not cause uploads of
+ // another week's reports to fail.
+
+ testenv.SkipIfUnsupportedPlatform(t)
+
+ // prog1 runs in week 1, and increments no counter.
+ prog1 := regtest.NewIncProgram(t, "prog1")
+ // prog2 runs in week 2.
+ prog2 := regtest.NewIncProgram(t, "prog2", "week2")
+
+ telemetryDir := t.TempDir()
+
+ // Create two counter files to upload, at least a week apart.
+ // Week 1 has no counters, which in the past caused the both uploads to fail.
+ asof1 := time.Now().Add(-15 * 24 * time.Hour)
+ if out, err := regtest.RunProgAsOf(t, telemetryDir, asof1, prog1); err != nil {
+ t.Fatalf("failed to run program: %s", out)
+ }
+ asof2 := time.Now().Add(-8 * 24 * time.Hour)
+ if out, err := regtest.RunProgAsOf(t, telemetryDir, asof2, prog2); err != nil {
+ t.Fatalf("failed to run program: %s", out)
+ }
+
+ uploader, getUploads := createUploader(t, telemetryDir, []string{"week1", "week2"}, nil)
+ if err := uploader.Run(); err != nil {
+ t.Fatal(err)
+ }
+
+ // Check that we got one upload, for week 2.
+ uploads := getUploads()
+ if got, want := len(uploads), 1; got != want {
+ t.Fatalf("got %d uploads, want %d", got, want)
+ }
+ for _, upload := range uploads {
+ report := string(upload)
+ if !strings.Contains(report, "week2") {
+ t.Errorf("Didn't get an upload for week2. Report:\n%s", report)
+ }
+ }
+}
diff --git a/start_test.go b/start_test.go
index 623462c..66da491 100644
--- a/start_test.go
+++ b/start_test.go
@@ -106,7 +106,9 @@
countertest.Open(mustGetEnv(telemetryDirEnv))
counter.Inc("teststart/counter")
- it.Default.SetModeAsOf("on", time.Now().Add(-8*24*time.Hour))
+ if err := it.Default.SetModeAsOf("on", time.Now().Add(-8*24*time.Hour)); err != nil {
+ log.Fatalf("setting mode: %v", err)
+ }
res := telemetry.Start(telemetry.Config{
// No need to set TelemetryDir since the Default dir is already set by countertest.Open.