sweet: new diagnostics framework

This new framework does several things. Primarily, it consolidates the
management of diagnostics files (profiles and traces). It's designed
to be flexible enough for the myriad needs of different benchmarks.

It has a concept of "committed" and "uncommitted" diagnostic files.
This will be particularly important for servers, where we may have to
truncate diagnostic data being gathered from an HTTP endpoint.
Uncommitted diagnostics are discarded at the end of the benchmark.

Meanwhile, the concept of a committed diagnostic also lets us
consolidate the profile merging logic. With this new framework, you
simply ask for multiple files of the same diagnostic type, and when
everything is committed at the end, it will do any necessary profile
merging.

For diagnostics fetched over HTTP in particular, this new framework
lets us fix a significant drawback to runtime trace collection.
Previously, we had to collect traces in many small increments (we used
1 second), which makes the resulting traces basically unusable. As of
Go 1.23, traces can be truncated, but the framework had no ability to
collect a truncatable diagnostic. With this CL, HTTP collection
understands which diagnostics types can be truncated, and switches to
collecting a single long-running diagnostic and cutting it off at the
end of the benchmark.

We'll push this through each benchmark and then remove the old
framework.

Change-Id: Ic9a189123f248aea13f92ef8961e9873c7cf9a6d
Reviewed-on: https://go-review.googlesource.com/c/benchmarks/+/600062
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Michael Knyszek <mknyszek@google.com>
diff --git a/sweet/benchmarks/internal/driver/diagnostics.go b/sweet/benchmarks/internal/driver/diagnostics.go
new file mode 100644
index 0000000..c910a87
--- /dev/null
+++ b/sweet/benchmarks/internal/driver/diagnostics.go
@@ -0,0 +1,304 @@
+// Copyright 2024 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package driver
+
+import (
+	"errors"
+	"fmt"
+	"os"
+	"path/filepath"
+	"strings"
+	"sync"
+
+	"github.com/google/pprof/profile"
+	"golang.org/x/benchmarks/sweet/common/diagnostics"
+	sprofile "golang.org/x/benchmarks/sweet/common/profile"
+)
+
+// There are three ways of gathering diagnostic profiles, in order of
+// preference:
+//
+// - In-process: The process coordinating the benchmark and running the
+// benchmarked code are the same, and driver.B takes care of diagnostic
+// collection.
+//
+// - Subprocess self collection: The benchmarked code is running in a subprocess
+// that has the ability to collect diagnostics using a command line flag. For
+// this, we use [Diagnostic.NewFile] and pass the name of the file to the
+// subprocess.
+//
+// - Subprocess HTTP collection: The benchmarked code is an HTTP server with
+// net/http/pprof endpoints. We use [Diagnostic] in conjunction with
+// [server.FetchDiagnostic].
+//
+// TODO: Can we better consolidate the last case into Diagnostics?
+
+type Diagnostics struct {
+	name string
+
+	once      sync.Once
+	tmpDir    string
+	tmpDirErr error
+}
+
+func NewDiagnostics(name string) *Diagnostics {
+	return &Diagnostics{name: name}
+}
+
+func safeFileName(name string) string {
+	// The following characters are disallowed by either VFAT, NTFS, APFS, or
+	// most Unix file systems:
+	//
+	// 0x00–0x1F 0x7F " * / : < > ? \ |
+	//
+	// We use % for escaping, so we also escape it.
+
+	const bad = (1<<0x20 - 1) | 1<<'"' | 1<<'*' | 1<<'/' | 1<<':' | 1<<'<' | 1<<'>' | 1<<'?' | 1<<'\\' | 1<<'|' | 1<<'%'
+	const badLo uint64 = bad & 0xFFFFFFFFFFFFFFFF
+	const badHi uint64 = bad >> 64
+
+	var buf strings.Builder
+	for i := 0; i < len(name); i++ {
+		ch := name[i]
+		if ch >= 0x7F || (badLo>>ch)&1 != 0 || (ch >= 64 && (badHi>>(ch-64))&1 != 0) {
+			fmt.Fprintf(&buf, "%%%02x", ch)
+		} else {
+			buf.WriteByte(ch)
+		}
+	}
+	return buf.String()
+}
+
+// Commit combines all individually committed diagnostic files into the final
+// output files. If there are multiple diagnostic files with the same type and
+// name, it merges them into a single file. If b != nil, it adds metrics for
+// diagnostic file sizes to b.
+func (d *Diagnostics) Commit(b *B) error {
+	// Commit is usually used in a defer, so log the error.
+	err := d.commit1(b)
+	if err != nil {
+		fmt.Fprintln(os.Stderr, err)
+	}
+	return err
+}
+
+func (d *Diagnostics) commit1(b *B) error {
+	if d.tmpDir == "" {
+		// No diagnostics were created.
+		return nil
+	}
+
+	allEntries, err := os.ReadDir(d.tmpDir)
+	if err != nil {
+		return err
+	}
+
+	// Bucket the file names.
+	type mergeKey struct {
+		typ  diagnostics.Type
+		name string
+	}
+	toMerge := make(map[mergeKey][]string)
+	var toDelete []string
+	for _, entry := range allEntries {
+		fileName := entry.Name()
+		path := filepath.Join(d.tmpDir, fileName)
+
+		typ, name, committed := parseDiagnosticPath(fileName)
+
+		if !committed {
+			// Uncommitted. Delete this one.
+			toDelete = append(toDelete, path)
+			continue
+		}
+		info, err := entry.Info()
+		if err != nil {
+			return err
+		} else if info.Size() == 0 {
+			// Skip zero-sized files, otherwise the pprof package
+			// will call it a parsing error.
+			toDelete = append(toDelete, path)
+			continue
+		}
+
+		// Add to the merge list.
+		k := mergeKey{typ, name}
+		toMerge[k] = append(toMerge[k], path)
+	}
+
+	// Process each merge list.
+	var errs []error
+	anyTrace := false
+	var traceBytes int64
+	for k, paths := range toMerge {
+		if err, outPath, deleteInputs := d.merge(k.typ, k.name, paths); err != nil {
+			errs = append(errs, err)
+		} else {
+			if deleteInputs {
+				toDelete = append(toDelete, paths...)
+			}
+			if k.typ == diagnostics.Trace {
+				anyTrace = true
+				if st, err := os.Stat(outPath); err == nil {
+					traceBytes = st.Size()
+				}
+			}
+		}
+	}
+	if b != nil && anyTrace {
+		// Report metric for diagnostic size.
+		b.Report("trace-bytes", uint64(traceBytes))
+	}
+
+	// Delete all of the temporary files.
+	for _, path := range toDelete {
+		errs = append(errs, os.Remove(path))
+	}
+	errs = append(errs, os.Remove(d.tmpDir))
+
+	return errors.Join(errs...)
+}
+
+func (d *Diagnostics) merge(typ diagnostics.Type, subName string, paths []string) (err error, outPath string, deleteInputs bool) {
+	if len(paths) > 1 && !typ.CanMerge() {
+		return fmt.Errorf("found %d > 1 %s files, but this diagnostic cannot be merged", len(paths), typ), "", false
+	}
+
+	// Create the output file.
+	name := d.name
+	if subName != "" {
+		name += "-" + subName
+	}
+	outFile, err := os.CreateTemp(diag.ResultsDir, safeFileName(name)+"-*-"+typ.FileName())
+	if err != nil {
+		return err, "", false
+	}
+	outPath = outFile.Name()
+
+	if len(paths) == 1 {
+		// Simply rename it to the final path.
+		outFile.Close()
+		if err := os.Rename(paths[0], outPath); err != nil {
+			return err, "", false
+		}
+	} else if len(paths) > 1 {
+		defer outFile.Close()
+
+		// Otherwise, merge the profiles.
+		var profiles []*profile.Profile
+		for _, path := range paths {
+			p, err := sprofile.ReadPprof(path)
+			if err != nil {
+				return err, "", false
+			}
+			profiles = append(profiles, p)
+		}
+
+		p, err := profile.Merge(profiles)
+		if err != nil {
+			return fmt.Errorf("error merging profiles: %w", err), "", false
+		}
+
+		err = p.Write(outFile)
+		if err == nil {
+			err = outFile.Close()
+		}
+		if err != nil {
+			return fmt.Errorf("error writing profile %s: %s", outPath, err), "", false
+		}
+
+		// Now we can delete all of the input files.
+		deleteInputs = true
+	}
+
+	return nil, outPath, deleteInputs
+}
+
+type DiagnosticFile struct {
+	*os.File
+}
+
+// getTmpDir returns the directory for storing uncommitted diagnostics files.
+func (d *Diagnostics) getTmpDir() (string, error) {
+	d.once.Do(func() {
+		// Create the uncommitted results directory.
+		d.tmpDir, d.tmpDirErr = os.MkdirTemp(diag.ResultsDir, safeFileName(d.name)+"-*.tmp")
+	})
+	return d.tmpDir, d.tmpDirErr
+}
+
+// Create is shorthand for CreateNamed(typ, "").
+func (d *Diagnostics) Create(typ diagnostics.Type) (*DiagnosticFile, error) {
+	return d.CreateNamed(typ, "")
+}
+
+// CreateNamed returns a new file that a diagnostic can be written to. If this
+// type of diagnostic can be merged, this can be called multiple times with the
+// same type and name and Commit will merge all of the files. The caller must
+// close this file. Diagnostic files are temporary until the caller calls
+// [DiagnosticFile.Commit] to indicate they are ready for merging into the final
+// output.
+func (d *Diagnostics) CreateNamed(typ diagnostics.Type, name string) (*DiagnosticFile, error) {
+	if !DiagnosticEnabled(typ) {
+		return nil, nil
+	}
+
+	tmpDir, err := d.getTmpDir()
+	if err != nil {
+		return nil, err
+	}
+
+	// Construct diagnostic file name. This path must be parsable by
+	// parseDiagnosticPath.
+	if strings.Contains(string(typ), "-") {
+		// To later parse the file name, we assume there's no "-".
+		panic("diagnostic type contains '-'")
+	}
+	pattern := string(typ) + "-*"
+	if name != "" {
+		pattern += "-" + safeFileName(name)
+	}
+	// Mark this as uncommitted.
+	pattern += ".tmp"
+
+	// Create file.
+	f, err := os.CreateTemp(tmpDir, pattern)
+	if err != nil {
+		return nil, err
+	}
+
+	return &DiagnosticFile{f}, nil
+}
+
+func parseDiagnosticPath(fileName string) (typ diagnostics.Type, name string, committed bool) {
+	// Check whether its committed.
+	committed = !strings.HasSuffix(fileName, ".tmp")
+	fileName = strings.TrimSuffix(fileName, ".tmp")
+
+	// Get the type.
+	typString, rest, _ := strings.Cut(fileName, "-")
+	typ = diagnostics.Type(typString)
+
+	// Drop the CreateTemp junk, leaving only the name. If there's no "-", then
+	// there's no name, so we let this set name to "".
+	_, name, _ = strings.Cut(rest, "-")
+
+	return
+}
+
+// Commit indicates that diagnostic file f is ready to be merged into the final
+// output. For a diagnostic that cannot be truncated, this should only be called
+// when the file has been fully written.
+func (f *DiagnosticFile) Commit() {
+	path := f.Name()
+	if !strings.HasSuffix(path, ".tmp") {
+		panic("temporary diagnostic file does not end in .tmp: " + path)
+	}
+	newPath := strings.TrimSuffix(path, ".tmp")
+	if err := os.Rename(path, newPath); err != nil {
+		// If rename fails, something is *horribly* wrong.
+		panic(fmt.Sprintf("failed to rename %q to %q: %s", path, newPath, err))
+	}
+}
diff --git a/sweet/benchmarks/internal/driver/driver.go b/sweet/benchmarks/internal/driver/driver.go
index 0f8890a..44bd0ed 100644
--- a/sweet/benchmarks/internal/driver/driver.go
+++ b/sweet/benchmarks/internal/driver/driver.go
@@ -558,6 +558,8 @@
 	return ok
 }
 
+// TODO: Delete below here
+
 func WritePprofProfile(prof *profile.Profile, typ diagnostics.Type, pattern string) error {
 	if !typ.IsPprof() {
 		return fmt.Errorf("this type of diagnostic doesn't use the pprof format")
diff --git a/sweet/benchmarks/internal/server/server.go b/sweet/benchmarks/internal/server/server.go
index 75fe10b..42681fe 100644
--- a/sweet/benchmarks/internal/server/server.go
+++ b/sweet/benchmarks/internal/server/server.go
@@ -5,6 +5,8 @@
 package server
 
 import (
+	"context"
+	"errors"
 	"fmt"
 	"io"
 	"net/http"
@@ -16,6 +18,102 @@
 	"golang.org/x/benchmarks/sweet/common/diagnostics"
 )
 
+// FetchDiagnostic reads a profile or trace from the pprof endpoint at host. The
+// returned stop function finalizes the diagnostic file on disk and returns the
+// total size in bytes. Because of limitations of net/http/pprof, this cannot
+// actually stop collection on the server side, so stop should only be called
+// when the server is about to be shut down.
+func FetchDiagnostic(host string, diag *driver.Diagnostics, typ diagnostics.Type, name string) (stop func()) {
+	if typ.HTTPEndpoint() == "" {
+		panic("diagnostic " + string(typ) + " has no endpoint")
+	}
+
+	if !driver.DiagnosticEnabled(typ) {
+		return func() {}
+	}
+
+	// If this is a snapshot-type diagnostic, wait until the end to collect it.
+	if typ.IsSnapshot() {
+		return func() {
+			err := collectTo(context.Background(), host, diag, typ, name)
+			if err != nil {
+				fmt.Fprintf(os.Stderr, "failed to read diagnostic %s: %v", typ, err)
+			}
+		}
+	}
+
+	// Otherwise, start collecting it now. If it can be truncated, then we try
+	// to collect it in one long run and cut if off when stop is called.
+	// If it can be merged, we can collect several of them.
+	var wg sync.WaitGroup
+	wg.Add(1)
+	ctx, cancel := context.WithCancel(context.Background())
+	go func() {
+		defer wg.Done()
+
+		for {
+			err := collectTo(ctx, host, diag, typ, name)
+			if err != nil {
+				if !errors.Is(err, context.Canceled) {
+					fmt.Fprintf(os.Stderr, "failed to read diagnostic %s: %v", typ, err)
+				}
+				break
+			}
+			if !typ.CanMerge() {
+				break
+			}
+		}
+	}()
+	return func() {
+		// Stop the loop.
+		cancel()
+		wg.Wait()
+	}
+}
+
+func collectTo(ctx context.Context, host string, diag *driver.Diagnostics, typ diagnostics.Type, name string) error {
+	// Construct the endpoint URL
+	var endpoint string
+	endpoint = fmt.Sprintf("http://%s/%s", host, typ.HTTPEndpoint())
+	if typ.CanMerge() && !typ.CanTruncate() {
+		// Collect in lots of small increments because we won't be able to just
+		// stop it.
+		endpoint += "?seconds=1"
+	} else if typ.CanTruncate() {
+		// Collect a long run that we can cut off.
+		endpoint += "?seconds=999999"
+	}
+
+	// Start profile collection.
+	req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
+	if err != nil {
+		return err
+	}
+	resp, err := http.DefaultClient.Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+
+	// Read into a diagnostic file
+	f, err := diag.CreateNamed(typ, name)
+	if err != nil {
+		return err
+	}
+	defer f.Close()
+
+	_, err = io.Copy(f, resp.Body)
+	if err == nil || typ.CanTruncate() {
+		// If we got a complete file, or it's fine to truncate it anyway, commit
+		// the diagnostic file.
+		f.Close()
+		f.Commit()
+	}
+	return err
+}
+
+// TODO: Delete below here
+
 func CollectDiagnostic(host, tmpDir, benchName string, typ diagnostics.Type) (int64, error) {
 	// We attempt to use the benchmark name to create a temp file so replace all
 	// path separators with "_".
diff --git a/sweet/cmd/sweet/run.go b/sweet/cmd/sweet/run.go
index f232ddb..70962f6 100644
--- a/sweet/cmd/sweet/run.go
+++ b/sweet/cmd/sweet/run.go
@@ -447,7 +447,7 @@
 	return newConfigs, nil
 }
 
-var cpuProfileRe = regexp.MustCompile(`^.*\.cpuprofile[0-9]+$`)
+var cpuProfileRe = regexp.MustCompile(`\.cpuprofile[0-9]+|-cpu\.prof$`)
 
 func mergeCPUProfiles(dir string) (string, error) {
 	profiles, err := sprofile.ReadDirPprof(dir, func(name string) bool {
diff --git a/sweet/common/diagnostics/config.go b/sweet/common/diagnostics/config.go
index 944d2fa..906e2f8 100644
--- a/sweet/common/diagnostics/config.go
+++ b/sweet/common/diagnostics/config.go
@@ -93,6 +93,65 @@
 	return t == CPUProfile || t == MemProfile
 }
 
+// HTTPEndpoint returns the net/http/pprof endpoint for this diagnostic type as
+// a host-relative URL, or "" if there is no enpdoint.
+func (t Type) HTTPEndpoint() string {
+	switch t {
+	case CPUProfile:
+		return "debug/pprof/profile"
+	case MemProfile:
+		return "debug/pprof/heap"
+	case Trace:
+		return "debug/pprof/trace"
+	}
+	return ""
+}
+
+// FileName returns the typical file name suffix for this diagnostic type.
+func (t Type) FileName() string {
+	switch t {
+	case CPUProfile:
+		return "cpu.prof"
+	case MemProfile:
+		return "mem.prof"
+	case Perf:
+		return "perf.data"
+	case Trace:
+		return "runtime.trace"
+	}
+	panic("unsupported profile type " + string(t))
+}
+
+// IsSnapshot indicates that this diagnostic is a point-in-time snapshot that
+// should be collected at the end of a benchmark.
+func (t Type) IsSnapshot() bool {
+	switch t {
+	case MemProfile:
+		return true
+	}
+	return false
+}
+
+// CanMerge indicates that multiple profiles of this type can be merged into one
+// profile.
+func (t Type) CanMerge() bool {
+	switch t {
+	case CPUProfile, MemProfile:
+		return true
+	}
+	return false
+}
+
+// CanTruncate indicates that a truncated diagnostic file of this type is still
+// meaningful.
+func (t Type) CanTruncate() bool {
+	switch t {
+	case Trace, Perf:
+		return true
+	}
+	return false
+}
+
 // Types returns a slice of all supported types.
 func Types() []Type {
 	return []Type{