blob: bb9cc280f5c981921165aa96001dc62f47444559 [file] [log] [blame]
// Copyright 2023 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import (
"bufio"
"errors"
"fmt"
"io"
"slices"
"strings"
"internal/trace/internal/tracev1"
"internal/trace/tracev2"
"internal/trace/version"
)
// Reader reads a byte stream, validates it, and produces trace events.
//
// Provided the trace is non-empty the Reader always produces a Sync
// event as the first event, and a Sync event as the last event.
// (There may also be any number of Sync events in the middle, too.)
type Reader struct {
version version.Version
r *bufio.Reader
lastTs Time
gen *generation
frontier []*batchCursor
cpuSamples []cpuSample
order ordering
syncs int
readGenErr error
done bool
// Spill state.
//
// Traces before Go 1.26 had no explicit end-of-generation signal, and
// so the first batch of the next generation needed to be parsed to identify
// a new generation. This batch is the "spilled" so we don't lose track
// of it when parsing the next generation.
//
// This is unnecessary after Go 1.26 because of an explicit end-of-generation
// signal.
spill *spilledBatch
spillErr error // error from reading spill
spillErrSync bool // whether we emitted a Sync before reporting spillErr
v1Events *traceV1Converter
}
// NewReader creates a new trace reader.
func NewReader(r io.Reader) (*Reader, error) {
br := bufio.NewReader(r)
v, err := version.ReadHeader(br)
if err != nil {
return nil, err
}
switch v {
case version.Go111, version.Go119, version.Go121:
tr, err := tracev1.Parse(br, v)
if err != nil {
return nil, err
}
return &Reader{
v1Events: convertV1Trace(tr),
}, nil
case version.Go122, version.Go123, version.Go125, version.Go126:
return &Reader{
version: v,
r: br,
order: ordering{
traceVer: v,
mStates: make(map[ThreadID]*mState),
pStates: make(map[ProcID]*pState),
gStates: make(map[GoID]*gState),
activeTasks: make(map[TaskID]taskState),
},
}, nil
default:
return nil, fmt.Errorf("unknown or unsupported version go 1.%d", v)
}
}
// ReadEvent reads a single event from the stream.
//
// If the stream has been exhausted, it returns an invalid event and io.EOF.
func (r *Reader) ReadEvent() (e Event, err error) {
// Return only io.EOF if we're done.
if r.done {
return Event{}, io.EOF
}
// Handle v1 execution traces.
if r.v1Events != nil {
if r.syncs == 0 {
// Always emit a sync event first, if we have any events at all.
ev, ok := r.v1Events.events.Peek()
if ok {
r.syncs++
return syncEvent(r.v1Events.evt, Time(ev.Ts-1), r.syncs), nil
}
}
ev, err := r.v1Events.next()
if err == io.EOF {
// Always emit a sync event at the end.
r.done = true
r.syncs++
return syncEvent(nil, r.v1Events.lastTs+1, r.syncs), nil
} else if err != nil {
return Event{}, err
}
return ev, nil
}
// Trace v2 parsing algorithm.
//
// (1) Read in all the batches for the next generation from the stream.
// (a) Use the size field in the header to quickly find all batches.
// (2) Parse out the strings, stacks, CPU samples, and timestamp conversion data.
// (3) Group each event batch by M, sorted by timestamp. (batchCursor contains the groups.)
// (4) Organize batchCursors in a min-heap, ordered by the timestamp of the next event for each M.
// (5) Try to advance the next event for the M at the top of the min-heap.
// (a) On success, select that M.
// (b) On failure, sort the min-heap and try to advance other Ms. Select the first M that advances.
// (c) If there's nothing left to advance, goto (1).
// (6) Select the latest event for the selected M and get it ready to be returned.
// (7) Read the next event for the selected M and update the min-heap.
// (8) Return the selected event, goto (5) on the next call.
// Set us up to track the last timestamp and fix up
// the timestamp of any event that comes through.
defer func() {
if err != nil {
return
}
if err = e.validateTableIDs(); err != nil {
return
}
if e.base.time <= r.lastTs {
e.base.time = r.lastTs + 1
}
r.lastTs = e.base.time
}()
// Consume any events in the ordering first.
if ev, ok := r.order.Next(); ok {
return ev, nil
}
// Check if we need to refresh the generation.
if len(r.frontier) == 0 && len(r.cpuSamples) == 0 {
if r.version < version.Go126 {
return r.nextGenWithSpill()
}
if r.readGenErr != nil {
return Event{}, r.readGenErr
}
gen, err := readGeneration(r.r, r.version)
if err != nil {
// Before returning an error, emit the sync event
// for the current generation and queue up the error
// for the next call.
r.readGenErr = err
r.gen = nil
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
return r.installGen(gen)
}
tryAdvance := func(i int) (bool, error) {
bc := r.frontier[i]
if ok, err := r.order.Advance(&bc.ev, r.gen.evTable, bc.m, r.gen.gen); !ok || err != nil {
return ok, err
}
// Refresh the cursor's event.
ok, err := bc.nextEvent(r.gen.batches[bc.m], r.gen.freq)
if err != nil {
return false, err
}
if ok {
// If we successfully refreshed, update the heap.
heapUpdate(r.frontier, i)
} else {
// There's nothing else to read. Delete this cursor from the frontier.
r.frontier = heapRemove(r.frontier, i)
}
return true, nil
}
// Inject a CPU sample if it comes next.
if len(r.cpuSamples) != 0 {
if len(r.frontier) == 0 || r.cpuSamples[0].time < r.frontier[0].ev.time {
e := r.cpuSamples[0].asEvent(r.gen.evTable)
r.cpuSamples = r.cpuSamples[1:]
return e, nil
}
}
// Try to advance the head of the frontier, which should have the minimum timestamp.
// This should be by far the most common case
if len(r.frontier) == 0 {
return Event{}, fmt.Errorf("broken trace: frontier is empty:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
}
if ok, err := tryAdvance(0); err != nil {
return Event{}, err
} else if !ok {
// Try to advance the rest of the frontier, in timestamp order.
//
// To do this, sort the min-heap. A sorted min-heap is still a
// min-heap, but now we can iterate over the rest and try to
// advance in order. This path should be rare.
slices.SortFunc(r.frontier, (*batchCursor).compare)
success := false
for i := 1; i < len(r.frontier); i++ {
if ok, err = tryAdvance(i); err != nil {
return Event{}, err
} else if ok {
success = true
break
}
}
if !success {
return Event{}, fmt.Errorf("broken trace: failed to advance: frontier:\n[gen=%d]\n\n%s\n%s\n", r.gen.gen, dumpFrontier(r.frontier), dumpOrdering(&r.order))
}
}
// Pick off the next event on the queue. At this point, one must exist.
ev, ok := r.order.Next()
if !ok {
panic("invariant violation: advance successful, but queue is empty")
}
return ev, nil
}
// nextGenWithSpill reads the generation and calls nextGen while
// also handling any spilled batches.
func (r *Reader) nextGenWithSpill() (Event, error) {
if r.version >= version.Go126 {
return Event{}, errors.New("internal error: nextGenWithSpill called for Go 1.26+ trace")
}
if r.spillErr != nil {
if r.spillErrSync {
return Event{}, r.spillErr
}
r.spillErrSync = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
if r.gen != nil && r.spill == nil {
// If we have a generation from the last read,
// and there's nothing left in the frontier, and
// there's no spilled batch, indicating that there's
// no further generation, it means we're done.
// Emit the final sync event.
r.done = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
// Read the next generation.
var gen *generation
gen, r.spill, r.spillErr = readGenerationWithSpill(r.r, r.spill, r.version)
if gen == nil {
r.gen = nil
r.spillErrSync = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
return r.installGen(gen)
}
// installGen installs the new generation into the Reader and returns
// a Sync event for the new generation.
func (r *Reader) installGen(gen *generation) (Event, error) {
if gen == nil {
// Emit the final sync event.
r.gen = nil
r.done = true
r.syncs++
return syncEvent(nil, r.lastTs, r.syncs), nil
}
r.gen = gen
// Reset CPU samples cursor.
r.cpuSamples = r.gen.cpuSamples
// Reset frontier.
for _, m := range r.gen.batchMs {
batches := r.gen.batches[m]
bc := &batchCursor{m: m}
ok, err := bc.nextEvent(batches, r.gen.freq)
if err != nil {
return Event{}, err
}
if !ok {
// Turns out there aren't actually any events in these batches.
continue
}
r.frontier = heapInsert(r.frontier, bc)
}
r.syncs++
// Always emit a sync event at the beginning of the generation.
return syncEvent(r.gen.evTable, r.gen.freq.mul(r.gen.minTs), r.syncs), nil
}
func dumpFrontier(frontier []*batchCursor) string {
var sb strings.Builder
for _, bc := range frontier {
spec := tracev2.Specs()[bc.ev.typ]
fmt.Fprintf(&sb, "M %d [%s time=%d", bc.m, spec.Name, bc.ev.time)
for i, arg := range spec.Args[1:] {
fmt.Fprintf(&sb, " %s=%d", arg, bc.ev.args[i])
}
fmt.Fprintf(&sb, "]\n")
}
return sb.String()
}