internal/upload: make upload.Run concurrency safe
Fix broken writes by writing upload-related files exclusively. Prevent
duplicate uploads using a lock file.
For golang/go#65970
Change-Id: I548134f597b2dbf5232de54027adb3daa4bad53d
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/587197
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
diff --git a/internal/upload/reports.go b/internal/upload/reports.go
index 4175789..d1091f4 100644
--- a/internal/upload/reports.go
+++ b/internal/upload/reports.go
@@ -248,10 +248,10 @@
// write the uploadable file
var errUpload, errLocal error
if uploadOK {
- errUpload = os.WriteFile(uploadFileName, uploadContents, 0644)
+ _, errUpload = exclusiveWrite(uploadFileName, uploadContents)
}
// write the local file
- errLocal = os.WriteFile(localFileName, localContents, 0644)
+ _, errLocal = exclusiveWrite(localFileName, localContents)
/* Wrote the files */
// even though these errors won't occur, what should happen
@@ -270,6 +270,31 @@
return "", nil
}
+// exclusiveWrite attempts to create filename exclusively, and if successful,
+// writes content to the resulting file handle.
+//
+// It returns a boolean indicating whether the exclusive handle was acquired,
+// and an error indicating whether the operation succeeded.
+// If the file already exists, exclusiveWrite returns (false, nil).
+func exclusiveWrite(filename string, content []byte) (_ bool, rerr error) {
+ f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0644)
+ if err != nil {
+ if os.IsExist(err) {
+ return false, nil
+ }
+ return false, err
+ }
+ defer func() {
+ if err := f.Close(); err != nil && rerr == nil {
+ rerr = err
+ }
+ }()
+ if _, err := f.Write(content); err != nil {
+ return false, err
+ }
+ return true, nil
+}
+
// return an existing ProgremReport, or create anew
func findProgReport(meta map[string]string, report *telemetry.Report) *telemetry.ProgramReport {
for _, prog := range report.Programs {
diff --git a/internal/upload/run_test.go b/internal/upload/run_test.go
index e80eca7..d8bbc5e 100644
--- a/internal/upload/run_test.go
+++ b/internal/upload/run_test.go
@@ -543,8 +543,6 @@
}
func TestRun_Concurrent(t *testing.T) {
- t.Skip("Run is not concurrency safe")
-
testenv.SkipIfUnsupportedPlatform(t)
prog := regtest.NewIncProgram(t, "prog1", "counter")
diff --git a/internal/upload/upload.go b/internal/upload/upload.go
index 0e75d09..2a3bf70 100644
--- a/internal/upload/upload.go
+++ b/internal/upload/upload.go
@@ -61,11 +61,31 @@
// try to upload the report, 'true' if successful
func (u *uploader) uploadReportContents(fname string, buf []byte) bool {
- b := bytes.NewReader(buf)
fdate := strings.TrimSuffix(filepath.Base(fname), ".json")
fdate = fdate[len(fdate)-len("2006-01-02"):]
- endpoint := u.uploadServerURL + "/" + fdate
+ newname := filepath.Join(u.dir.UploadDir(), fdate+".json")
+ if _, err := os.Stat(newname); err == nil {
+ // Another process uploaded but failed to clean up (or hasn't yet cleaned
+ // up). Ensure that cleanup occurs.
+ _ = os.Remove(fname)
+ return false
+ }
+
+ // Lock the upload, to prevent duplicate uploads.
+ {
+ lockname := newname + ".lock"
+ lockfile, err := os.OpenFile(lockname, os.O_CREATE|os.O_EXCL, 0666)
+ if err != nil {
+ u.logger.Printf("Failed to acquire lock %s: %v", lockname, err)
+ return false
+ }
+ _ = lockfile.Close()
+ defer os.Remove(lockname)
+ }
+
+ endpoint := u.uploadServerURL + "/" + fdate
+ b := bytes.NewReader(buf)
resp, err := http.Post(endpoint, "application/json", b)
if err != nil {
u.logger.Printf("Error upload %s to %s: %v", filepath.Base(fname), endpoint, err)
@@ -85,7 +105,6 @@
return false
}
// Store a copy of the uploaded report in the uploaded directory.
- newname := filepath.Join(u.dir.UploadDir(), fdate+".json")
if err := os.WriteFile(newname, buf, 0644); err == nil {
os.Remove(fname) // if it exists
}