internal/upload: make upload.Run concurrency safe

Fix broken writes by writing upload-related files exclusively. Prevent
duplicate uploads using a lock file.

For golang/go#65970

Change-Id: I548134f597b2dbf5232de54027adb3daa4bad53d
Reviewed-on: https://go-review.googlesource.com/c/telemetry/+/587197
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Hyang-Ah Hana Kim <hyangah@gmail.com>
diff --git a/internal/upload/reports.go b/internal/upload/reports.go
index 4175789..d1091f4 100644
--- a/internal/upload/reports.go
+++ b/internal/upload/reports.go
@@ -248,10 +248,10 @@
 	// write the uploadable file
 	var errUpload, errLocal error
 	if uploadOK {
-		errUpload = os.WriteFile(uploadFileName, uploadContents, 0644)
+		_, errUpload = exclusiveWrite(uploadFileName, uploadContents)
 	}
 	// write the local file
-	errLocal = os.WriteFile(localFileName, localContents, 0644)
+	_, errLocal = exclusiveWrite(localFileName, localContents)
 	/*  Wrote the files */
 
 	// even though these errors won't occur, what should happen
@@ -270,6 +270,31 @@
 	return "", nil
 }
 
+// exclusiveWrite attempts to create filename exclusively, and if successful,
+// writes content to the resulting file handle.
+//
+// It returns a boolean indicating whether the exclusive handle was acquired,
+// and an error indicating whether the operation succeeded.
+// If the file already exists, exclusiveWrite returns (false, nil).
+func exclusiveWrite(filename string, content []byte) (_ bool, rerr error) {
+	f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE|os.O_EXCL, 0644)
+	if err != nil {
+		if os.IsExist(err) {
+			return false, nil
+		}
+		return false, err
+	}
+	defer func() {
+		if err := f.Close(); err != nil && rerr == nil {
+			rerr = err
+		}
+	}()
+	if _, err := f.Write(content); err != nil {
+		return false, err
+	}
+	return true, nil
+}
+
 // return an existing ProgremReport, or create anew
 func findProgReport(meta map[string]string, report *telemetry.Report) *telemetry.ProgramReport {
 	for _, prog := range report.Programs {
diff --git a/internal/upload/run_test.go b/internal/upload/run_test.go
index e80eca7..d8bbc5e 100644
--- a/internal/upload/run_test.go
+++ b/internal/upload/run_test.go
@@ -543,8 +543,6 @@
 }
 
 func TestRun_Concurrent(t *testing.T) {
-	t.Skip("Run is not concurrency safe")
-
 	testenv.SkipIfUnsupportedPlatform(t)
 
 	prog := regtest.NewIncProgram(t, "prog1", "counter")
diff --git a/internal/upload/upload.go b/internal/upload/upload.go
index 0e75d09..2a3bf70 100644
--- a/internal/upload/upload.go
+++ b/internal/upload/upload.go
@@ -61,11 +61,31 @@
 
 // try to upload the report, 'true' if successful
 func (u *uploader) uploadReportContents(fname string, buf []byte) bool {
-	b := bytes.NewReader(buf)
 	fdate := strings.TrimSuffix(filepath.Base(fname), ".json")
 	fdate = fdate[len(fdate)-len("2006-01-02"):]
-	endpoint := u.uploadServerURL + "/" + fdate
 
+	newname := filepath.Join(u.dir.UploadDir(), fdate+".json")
+	if _, err := os.Stat(newname); err == nil {
+		// Another process uploaded but failed to clean up (or hasn't yet cleaned
+		// up). Ensure that cleanup occurs.
+		_ = os.Remove(fname)
+		return false
+	}
+
+	// Lock the upload, to prevent duplicate uploads.
+	{
+		lockname := newname + ".lock"
+		lockfile, err := os.OpenFile(lockname, os.O_CREATE|os.O_EXCL, 0666)
+		if err != nil {
+			u.logger.Printf("Failed to acquire lock %s: %v", lockname, err)
+			return false
+		}
+		_ = lockfile.Close()
+		defer os.Remove(lockname)
+	}
+
+	endpoint := u.uploadServerURL + "/" + fdate
+	b := bytes.NewReader(buf)
 	resp, err := http.Post(endpoint, "application/json", b)
 	if err != nil {
 		u.logger.Printf("Error upload %s to %s: %v", filepath.Base(fname), endpoint, err)
@@ -85,7 +105,6 @@
 		return false
 	}
 	// Store a copy of the uploaded report in the uploaded directory.
-	newname := filepath.Join(u.dir.UploadDir(), fdate+".json")
 	if err := os.WriteFile(newname, buf, 0644); err == nil {
 		os.Remove(fname) // if it exists
 	}