trace: add experimental flight recorder API

This change adds an experimental flight recorder API that lives outside
the Go runtime. It implements the same API as the proposed API, but
has a few caveats because it exists outside the runtime.

Firstly, because the conceptual circular buffer lives outside the
runtime, this flight recorder has slightly more overhead than a
runtime-internal implementation would. Specifically, all the trace data
needs to be copied out of the runtime and gently processed, and this
process needs to happen continuously. Peak memory use is also going to
be higher because of this copying.

Secondly, the flight recorder needs to flush the runtime's buffers twice
in a row in order to obtain the snapshot it wants. This is because the
signal in the trace that a generation is done is either that the trace
stream ends or a batch with a new generation value appears. Flushing
twice in a row ensures that the generation we actually wanted done is
complete, at the cost of an additional flush. The overhead of this
should be minimal in practice, but it does mean that the actual flush
operation will have a substantially longer latency than with a
runtime-internal implementation. This is OK because that latency doesn't
actually affect any properties of the resulting snapshot; it's purely
latency to the caller. This problem could have been avoided with an
explicit in-band signal that a generation has been flushed, which we may
want to consider adding in the future.

For #63185.

Change-Id: I7a94e2cddcfbf19a4140b398c188c3d59f8b9c9e
Reviewed-on: https://go-review.googlesource.com/c/exp/+/550257
Auto-Submit: Michael Knyszek <mknyszek@google.com>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Reviewed-by: Michael Pratt <mpratt@google.com>
diff --git a/trace/flightrecorder.go b/trace/flightrecorder.go
new file mode 100644
index 0000000..e189b80
--- /dev/null
+++ b/trace/flightrecorder.go
@@ -0,0 +1,364 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.22
+
+package trace
+
+import (
+	"bufio"
+	"encoding/binary"
+	"fmt"
+	"io"
+	"math/bits"
+	"runtime/trace"
+	"slices"
+	"sync"
+	"time"
+	_ "unsafe" // for go:linkname
+
+	"golang.org/x/exp/trace/internal/event/go122"
+)
+
+// FlightRecorder represents a flight recording configuration.
+//
+// Flight recording holds execution trace data in a circular buffer representing
+// the most recent execution data.
+//
+// Only one flight recording may be active at any given time.
+type FlightRecorder struct {
+	// State for coordinating with the recorder goroutine.
+	fromTracer   *io.PipeReader
+	toRecorder   *io.PipeWriter
+	recorderWait sync.WaitGroup
+	err          error
+
+	// State specific to the recorder goroutine.
+	header [16]byte
+	active rawGeneration
+	ringMu sync.Mutex
+	ring   []rawGeneration
+
+	// Externally-set options.
+	targetSize   int
+	targetPeriod time.Duration
+
+	enabled bool       // whether the flight recorder is enabled.
+	writing sync.Mutex // protects concurrent calls to WriteTo
+
+	// The values of targetSize and targetPeriod we've committed to since the last Start.
+	wantSize int
+	wantDur  time.Duration
+}
+
+// NewFlightRecorder creates a new flight recording configuration.
+func NewFlightRecorder() *FlightRecorder {
+	return &FlightRecorder{
+		// These are just some optimistic, reasonable defaults.
+		//
+		// In reality we're also bound by whatever the runtime defaults are, because
+		// we currently have no way to change them.
+		//
+		// TODO(mknyszek): Consider adding a function that allows mutating one or
+		// both of these values' equivalents in the runtime.
+		targetSize:   10 << 20, // 10 MiB.
+		targetPeriod: 10 * time.Second,
+	}
+}
+
+// SetPeriod sets the approximate time duration that the flight recorder's circular buffer
+// represents.
+//
+// Note that SetPeriod does not make any guarantees on the amount of time the trace
+// produced by WriteTo will represent.
+// This is just a hint to the runtime to enable some control the resulting trace.
+//
+// The initial period is implementation defined, but can be assumed to be on the order
+// of seconds.
+//
+// Adjustments to this value will not apply to an active flight recorder, and will not apply
+// if tracing is already enabled via trace.Start. All tracing must be stopped and started
+// again to change this value.
+func (r *FlightRecorder) SetPeriod(d time.Duration) {
+	r.targetPeriod = d
+}
+
+// SetSize sets the approximate size of the flight recorder's circular buffer.
+//
+// This generally takes precedence over the duration passed to SetPeriod.
+// However, it does not make any guarantees on the size of the data WriteTo will write.
+// This is just a hint to the runtime to enable some control over the memory overheads
+// of tracing.
+//
+// The initial size is implementation defined.
+//
+// Adjustments to this value will not apply to an active flight recorder, and will not apply
+// if tracing is already enabled via trace.Start. All tracing must be stopped and started
+// again to change this value.
+func (r *FlightRecorder) SetSize(bytes int) {
+	r.targetSize = bytes
+}
+
+// Start begins flight recording. Only one flight recorder or one call to [runtime/trace.Start]
+// may be active at any given time. Returns an error if starting the flight recorder would
+// violate this rule.
+func (r *FlightRecorder) Start() error {
+	if r.enabled {
+		return fmt.Errorf("cannot enable a enabled flight recorder")
+	}
+
+	r.wantSize = r.targetSize
+	r.wantDur = r.targetPeriod
+	r.err = nil
+	r.fromTracer, r.toRecorder = io.Pipe()
+
+	// Start tracing, sending data to the recorder goroutine (not yet started) via an io.Pipe.
+	if err := trace.Start(r.toRecorder); err != nil {
+		return err
+	}
+
+	// Start recorder goroutine.
+	r.recorderWait.Add(1)
+	go func() {
+		defer r.recorderWait.Done()
+
+		// Read in the header so we can tack it on to the front
+		// of whatever WriteTo emits later.
+		_, err := io.ReadFull(r.fromTracer, r.header[:])
+		if err != nil {
+			r.err = err
+			return
+		}
+
+		// Process the rest of the trace.
+		rd := bufio.NewReader(r.fromTracer)
+		for {
+			b, gen, err := readBatch(rd)
+			if err == io.EOF || err == io.ErrClosedPipe {
+				break
+			}
+			if err != nil {
+				r.err = err
+				return
+			}
+
+			// Check if we're entering a new generation.
+			if r.active.gen != 0 && r.active.gen+1 == gen {
+				r.ringMu.Lock()
+
+				// Validate r.active.freq before we use it. It's required for a generation
+				// to not be considered broken, and without it, we can't correctly handle
+				// SetPeriod.
+				if r.active.freq == 0 {
+					r.err = fmt.Errorf("broken trace: failed to find frequency event in generation %d", r.active.gen)
+					return
+				}
+
+				// Get the current trace clock time.
+				now := traceTimeNow(r.active.freq)
+
+				// Add the current generation to the ring. Make sure we always have at least one
+				// complete generation by putting the active generation onto the new list, regardless
+				// of whatever our settings are.
+				//
+				// N.B. Let's completely replace the ring here, so that WriteTo can just make a copy
+				// and not worry about aliasing. This creates allocations, but at a very low rate.
+				newRing := []rawGeneration{r.active}
+				size := r.active.size
+				for i := len(r.ring) - 1; i >= 0; i-- {
+					// Stop adding older generations if the new ring already exceeds the thresholds.
+					// This ensures we keep generations that cross a threshold, but not any that lie
+					// entirely outside it.
+					if size > r.wantSize || now.Sub(newRing[len(newRing)-1].minTraceTime()) > r.wantDur {
+						break
+					}
+					size += r.ring[i].size
+					newRing = append(newRing, r.ring[i])
+				}
+				slices.Reverse(newRing)
+				r.ring = newRing
+				r.ringMu.Unlock()
+
+				// Start a new active generation.
+				r.active = rawGeneration{}
+			}
+
+			// Obtain the frequency if this is a frequency batch.
+			if b.isFreqBatch() {
+				freq, err := parseFreq(b)
+				if err != nil {
+					r.err = err
+					return
+				}
+				r.active.freq = freq
+			}
+
+			// Append the batch to the current generation.
+			if r.active.gen == 0 {
+				r.active.gen = gen
+			}
+			if r.active.minTime == 0 || r.active.minTime > b.time {
+				r.active.minTime = b.time
+			}
+			r.active.size += 1
+			r.active.size += uvarintSize(gen)
+			r.active.size += uvarintSize(uint64(b.m))
+			r.active.size += uvarintSize(uint64(b.time))
+			r.active.size += uvarintSize(uint64(len(b.data)))
+			r.active.size += len(b.data)
+			r.active.batches = append(r.active.batches, b)
+		}
+	}()
+
+	r.enabled = true
+	return nil
+}
+
+// Stop ends flight recording. It waits until any concurrent [FlightRecorder.WriteTo] calls exit.
+// Returns an error if the flight recorder is inactive.
+func (r *FlightRecorder) Stop() error {
+	if !r.enabled {
+		return fmt.Errorf("cannot disable a disabled flight recorder")
+	}
+	r.enabled = false
+	trace.Stop()
+
+	// Close the write side of the pipe. This is safe because tracing has stopped, so no more will
+	// be written to the pipe.
+	r.fromTracer.Close()
+
+	// Wait for the reader to exit.
+	r.recorderWait.Wait()
+
+	// Reset all state. No need to lock because the reader has already exited.
+	r.active = rawGeneration{}
+	r.ring = nil
+	r.toRecorder.Close()
+	r.fromTracer.Close()
+	return r.err
+}
+
+// Enabled returns true if the flight recorder is active. Specifically, it will return true if
+// Start did not return an error, and Stop has not yet been called.
+// It is safe to call from multiple goroutines simultaneously.
+func (r *FlightRecorder) Enabled() bool {
+	return r.enabled
+}
+
+// ErrSnapshotActive indicates that a call to WriteTo was made while one was already in progress.
+// If the caller of WriteTo sees this error, they should use the result from the other call to WriteTo.
+var ErrSnapshotActive = fmt.Errorf("call to WriteTo for trace.FlightRecorder already in progress")
+
+// WriteTo takes a snapshots of the circular buffer's contents and writes the execution data to w.
+// Returns the number of bytes written and an error.
+// An error is returned upon failure to write to w or if the flight recorder is inactive.
+// Only one goroutine may execute WriteTo at a time, but it is safe to call from multiple goroutines.
+// If a goroutine calls WriteTo while another goroutine is currently executing it, WriteTo will return
+// ErrSnapshotActive to that goroutine.
+func (r *FlightRecorder) WriteTo(w io.Writer) (total int, err error) {
+	if !r.enabled {
+		return 0, fmt.Errorf("cannot snapshot a disabled flight recorder")
+	}
+	if !r.writing.TryLock() {
+		return 0, ErrSnapshotActive
+	}
+	defer r.writing.Unlock()
+
+	// Force a global buffer flush twice.
+	//
+	// This is pretty unfortunate, but because the signal that a generation is done is that a new
+	// generation appears in the trace *or* the trace stream ends, the recorder goroutine will
+	// have no idea when to add a generation to the ring if we just flush once. If we flush twice,
+	// at least the first one will end up on the ring, which is the one we wanted anyway.
+	//
+	// In a runtime-internal implementation this is a non-issue. The runtime is fully aware
+	// of what generations are complete, so only one flush is necessary.
+	runtime_traceAdvance(false)
+	runtime_traceAdvance(false)
+
+	// Now that everything has been flushed and written, grab whatever we have.
+	//
+	// N.B. traceAdvance blocks until the tracer goroutine has actually written everything
+	// out, which means the generation we just flushed must have been already been observed
+	// by the recorder goroutine. Because we flushed twice, the first flush is guaranteed to
+	// have been both completed *and* processed by the recorder goroutine.
+	r.ringMu.Lock()
+	gens := r.ring
+	r.ringMu.Unlock()
+
+	// Write the header.
+	total, err = w.Write(r.header[:])
+	if err != nil {
+		return total, err
+	}
+
+	// Helper for writing varints.
+	var varintBuf [binary.MaxVarintLen64]byte
+	writeUvarint := func(u uint64) error {
+		v := binary.PutUvarint(varintBuf[:], u)
+		n, err := w.Write(varintBuf[:v])
+		total += n
+		return err
+	}
+
+	// Write all the data.
+	for _, gen := range gens {
+		for _, batch := range gen.batches {
+			// Rewrite the batch header event with four arguments: gen, M ID, timestamp, and data length.
+			n, err := w.Write([]byte{byte(go122.EvEventBatch)})
+			total += n
+			if err != nil {
+				return total, err
+			}
+			if err := writeUvarint(gen.gen); err != nil {
+				return total, err
+			}
+			if err := writeUvarint(uint64(batch.m)); err != nil {
+				return total, err
+			}
+			if err := writeUvarint(uint64(batch.time)); err != nil {
+				return total, err
+			}
+			if err := writeUvarint(uint64(len(batch.data))); err != nil {
+				return total, err
+			}
+
+			// Write batch data.
+			n, err = w.Write(batch.data)
+			total += n
+			if err != nil {
+				return total, err
+			}
+		}
+	}
+	return total, nil
+}
+
+type rawGeneration struct {
+	gen     uint64
+	size    int
+	minTime timestamp
+	freq    frequency
+	batches []batch
+}
+
+func (r *rawGeneration) minTraceTime() Time {
+	return r.freq.mul(r.minTime)
+}
+
+func traceTimeNow(freq frequency) Time {
+	// TODO(mknyszek): It's unfortunate that we have to rely on runtime-internal details
+	// like this. This would be better off in the runtime.
+	return freq.mul(timestamp(runtime_traceClockNow()))
+}
+
+func uvarintSize(x uint64) int {
+	return 1 + bits.Len64(x)/7
+}
+
+//go:linkname runtime_traceAdvance runtime.traceAdvance
+func runtime_traceAdvance(stopTrace bool)
+
+//go:linkname runtime_traceClockNow runtime.traceClockNow
+func runtime_traceClockNow() int64
diff --git a/trace/flightrecorder_test.go b/trace/flightrecorder_test.go
new file mode 100644
index 0000000..42b437c
--- /dev/null
+++ b/trace/flightrecorder_test.go
@@ -0,0 +1,295 @@
+// Copyright 2023 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+//go:build go1.22
+
+package trace_test
+
+import (
+	"bytes"
+	"context"
+	"io"
+	"runtime/trace"
+	"sync"
+	"sync/atomic"
+	"testing"
+	"time"
+
+	"golang.org/x/exp/trace/internal/testtrace"
+
+	. "golang.org/x/exp/trace"
+)
+
+func TestFlightRecorderDoubleStart(t *testing.T) {
+	fr := NewFlightRecorder()
+
+	if err := fr.Start(); err != nil {
+		t.Fatalf("unexpected error on Start: %v", err)
+	}
+	if err := fr.Start(); err == nil {
+		t.Fatalf("expected error from double Start: %v", err)
+	}
+	if err := fr.Stop(); err != nil {
+		t.Fatalf("unexpected error on Stop: %v", err)
+	}
+}
+
+func TestFlightRecorderDoubleStop(t *testing.T) {
+	fr := NewFlightRecorder()
+
+	if err := fr.Start(); err != nil {
+		t.Fatalf("unexpected error on Start: %v", err)
+	}
+	if err := fr.Stop(); err != nil {
+		t.Fatalf("unexpected error on Stop: %v", err)
+	}
+	if err := fr.Stop(); err == nil {
+		t.Fatalf("expected error from double Stop: %v", err)
+	}
+}
+
+func TestFlightRecorderEnabled(t *testing.T) {
+	fr := NewFlightRecorder()
+
+	if fr.Enabled() {
+		t.Fatal("flight recorder is enabled, but never started")
+	}
+	if err := fr.Start(); err != nil {
+		t.Fatalf("unexpected error on Start: %v", err)
+	}
+	if !fr.Enabled() {
+		t.Fatal("flight recorder is not enabled, but started")
+	}
+	if err := fr.Stop(); err != nil {
+		t.Fatalf("unexpected error on Stop: %v", err)
+	}
+	if fr.Enabled() {
+		t.Fatal("flight recorder is enabled, but stopped")
+	}
+}
+
+func TestFlightRecorderWriteToDisabled(t *testing.T) {
+	var buf bytes.Buffer
+
+	fr := NewFlightRecorder()
+	if n, err := fr.WriteTo(&buf); err == nil {
+		t.Fatalf("successfully wrote %d bytes from disabled flight recorder", n)
+	}
+	if err := fr.Start(); err != nil {
+		t.Fatalf("unexpected error on Start: %v", err)
+	}
+	if err := fr.Stop(); err != nil {
+		t.Fatalf("unexpected error on Stop: %v", err)
+	}
+	if n, err := fr.WriteTo(&buf); err == nil {
+		t.Fatalf("successfully wrote %d bytes from disabled flight recorder", n)
+	}
+}
+
+func TestFlightRecorderConcurrentWriteTo(t *testing.T) {
+	fr := NewFlightRecorder()
+	if err := fr.Start(); err != nil {
+		t.Fatalf("unexpected error on Start: %v", err)
+	}
+
+	// Start two goroutines to write snapshots.
+	//
+	// Most of the time one will fail and one will succeed, but we don't require this.
+	// Due to a variety of factors, it's definitely possible for them both to succeed.
+	// However, at least one should succeed.
+	var bufs [2]bytes.Buffer
+	var wg sync.WaitGroup
+	var successes atomic.Uint32
+	for i := range bufs {
+		wg.Add(1)
+		go func() {
+			defer wg.Done()
+
+			n, err := fr.WriteTo(&bufs[i])
+			if err == ErrSnapshotActive {
+				if n != 0 {
+					t.Errorf("(goroutine %d) WriteTo bytes written is non-zero for early bail out: %d", i, n)
+				}
+				return
+			}
+			if err != nil {
+				t.Errorf("(goroutine %d) failed to write snapshot for unexpected reason: %v", i, err)
+			}
+			successes.Add(1)
+
+			if n == 0 {
+				t.Errorf("(goroutine %d) wrote invalid trace of zero bytes in size", i)
+			}
+			if n != bufs[i].Len() {
+				t.Errorf("(goroutine %d) trace length doesn't match WriteTo result: got %d, want %d", i, n, bufs[i].Len())
+			}
+		}()
+	}
+	wg.Wait()
+
+	// Stop tracing.
+	if err := fr.Stop(); err != nil {
+		t.Fatalf("unexpected error on Stop: %v", err)
+	}
+
+	// Make sure at least one succeeded to write.
+	if successes.Load() == 0 {
+		t.Fatal("expected at least one success to write a snapshot, got zero")
+	}
+
+	// Validate the traces that came out.
+	for i := range bufs {
+		buf := &bufs[i]
+		if buf.Len() == 0 {
+			continue
+		}
+		testReader(t, buf, testtrace.ExpectSuccess())
+	}
+}
+
+func TestFlightRecorder(t *testing.T) {
+	testFlightRecorder(t, NewFlightRecorder(), func(snapshot func()) {
+		snapshot()
+	})
+}
+
+func TestFlightRecorderStartStop(t *testing.T) {
+	fr := NewFlightRecorder()
+	for i := 0; i < 5; i++ {
+		testFlightRecorder(t, fr, func(snapshot func()) {
+			snapshot()
+		})
+	}
+}
+
+func TestFlightRecorderLog(t *testing.T) {
+	tr := testFlightRecorder(t, NewFlightRecorder(), func(snapshot func()) {
+		trace.Log(context.Background(), "message", "hello")
+		snapshot()
+	})
+
+	// Prepare to read the trace snapshot.
+	r, err := NewReader(bytes.NewReader(tr))
+	if err != nil {
+		t.Fatalf("unexpected error creating trace reader: %v", err)
+	}
+
+	// Find the log message in the trace.
+	found := false
+	for {
+		ev, err := r.ReadEvent()
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			t.Fatalf("unexpected error reading trace: %v", err)
+		}
+		if !found && ev.Kind() == EventLog {
+			log := ev.Log()
+			found = log.Category == "message" && log.Message == "hello"
+		}
+	}
+	if !found {
+		t.Errorf("failed to find expected log message (%q, %q) in snapshot", "message", "hello")
+	}
+}
+
+func TestFlightRecorderOneGeneration(t *testing.T) {
+	test := func(t *testing.T, fr *FlightRecorder) {
+		tr := testFlightRecorder(t, fr, func(snapshot func()) {
+			// Sleep to let a few generations pass.
+			time.Sleep(3 * time.Second)
+			snapshot()
+		})
+
+		// Prepare to read the trace snapshot.
+		r, err := NewReader(bytes.NewReader(tr))
+		if err != nil {
+			t.Fatalf("unexpected error creating trace reader: %v", err)
+		}
+
+		// Make sure there's only exactly one Sync event.
+		sync := 0
+		for {
+			ev, err := r.ReadEvent()
+			if err == io.EOF {
+				break
+			}
+			if err != nil {
+				t.Fatalf("unexpected error reading trace: %v", err)
+			}
+			if ev.Kind() == EventSync {
+				sync++
+			}
+		}
+		if sync != 1 {
+			t.Errorf("expected one sync event, found %d", sync)
+		}
+	}
+	t.Run("SetPeriod", func(t *testing.T) {
+		// Set the period to 0 so that we're always throwing away old generations.
+		// This should always result in exactly one generation.
+		// Note: this is always going to result in taking the second generation
+		// flushed, which is the much less useful one. That's OK, because in practice
+		// SetPeriod shouldn't ever be called with a value this low.
+		fr := NewFlightRecorder()
+		fr.SetPeriod(0)
+		test(t, fr)
+	})
+	t.Run("SetSize", func(t *testing.T) {
+		// Set the size to 0 so that we're always throwing away old generations.
+		// This should always result in exactly one generation.
+		// Note: this is always going to result in taking the second generation
+		// flushed, which is the much less useful one. That's OK, because in practice
+		// SetPeriod shouldn't ever be called with a value this low.
+		fr := NewFlightRecorder()
+		fr.SetSize(0)
+		test(t, fr)
+	})
+}
+
+type flightRecorderTestFunc func(snapshot func())
+
+func testFlightRecorder(t *testing.T, fr *FlightRecorder, f flightRecorderTestFunc) []byte {
+	if trace.IsEnabled() {
+		t.Skip("cannot run flight recorder tests when tracing is enabled")
+	}
+
+	// Start the flight recorder.
+	if err := fr.Start(); err != nil {
+		t.Fatalf("unexpected error on Start: %v", err)
+	}
+
+	// Set up snapshot callback.
+	var buf bytes.Buffer
+	callback := func() {
+		n, err := fr.WriteTo(&buf)
+		if err != nil {
+			t.Errorf("unexpected failure during flight recording: %v", err)
+			return
+		}
+		if n < 16 {
+			t.Errorf("expected a trace size of at least 16 bytes, got %d", n)
+		}
+		if n != buf.Len() {
+			t.Errorf("WriteTo result doesn't match trace size: got %d, want %d", n, buf.Len())
+		}
+	}
+
+	// Call the test function.
+	f(callback)
+
+	// Stop the flight recorder.
+	if err := fr.Stop(); err != nil {
+		t.Fatalf("unexpected error on Stop: %v", err)
+	}
+
+	// Get the trace bytes; we don't want to use the Buffer as a Reader directly
+	// since we may want to consume this data more than once.
+	traceBytes := buf.Bytes()
+
+	// Parse the trace to make sure it's not broken.
+	testReader(t, bytes.NewReader(traceBytes), testtrace.ExpectSuccess())
+	return traceBytes
+}