| // Copyright 2023 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package trace |
| |
| import ( |
| "cmp" |
| "encoding/binary" |
| "fmt" |
| |
| "internal/trace/v2/event" |
| "internal/trace/v2/event/go122" |
| ) |
| |
| type batchCursor struct { |
| m ThreadID |
| lastTs Time |
| idx int // next index into []batch |
| dataOff int // next index into batch.data |
| ev baseEvent // last read event |
| } |
| |
| func (b *batchCursor) nextEvent(batches []batch, freq frequency) (ok bool, err error) { |
| // Batches should generally always have at least one event, |
| // but let's be defensive about that and accept empty batches. |
| for b.idx < len(batches) && len(batches[b.idx].data) == b.dataOff { |
| b.idx++ |
| b.dataOff = 0 |
| b.lastTs = 0 |
| } |
| // Have we reached the end of the batches? |
| if b.idx == len(batches) { |
| return false, nil |
| } |
| // Initialize lastTs if it hasn't been yet. |
| if b.lastTs == 0 { |
| b.lastTs = freq.mul(batches[b.idx].time) |
| } |
| // Read an event out. |
| n, tsdiff, err := readTimedBaseEvent(batches[b.idx].data[b.dataOff:], &b.ev) |
| if err != nil { |
| return false, err |
| } |
| // Complete the timestamp from the cursor's last timestamp. |
| b.ev.time = freq.mul(tsdiff) + b.lastTs |
| |
| // Move the cursor's timestamp forward. |
| b.lastTs = b.ev.time |
| |
| // Move the cursor forward. |
| b.dataOff += n |
| return true, nil |
| } |
| |
| func (b *batchCursor) compare(a *batchCursor) int { |
| return cmp.Compare(b.ev.time, a.ev.time) |
| } |
| |
| // readTimedBaseEvent reads out the raw event data from b |
| // into e. It does not try to interpret the arguments |
| // but it does validate that the event is a regular |
| // event with a timestamp (vs. a structural event). |
| // |
| // It requires that the event its reading be timed, which must |
| // be the case for every event in a plain EventBatch. |
| func readTimedBaseEvent(b []byte, e *baseEvent) (int, timestamp, error) { |
| // Get the event type. |
| typ := event.Type(b[0]) |
| specs := go122.Specs() |
| if int(typ) >= len(specs) { |
| return 0, 0, fmt.Errorf("found invalid event type: %v", typ) |
| } |
| e.typ = typ |
| |
| // Get spec. |
| spec := &specs[typ] |
| if len(spec.Args) == 0 || !spec.IsTimedEvent { |
| return 0, 0, fmt.Errorf("found event without a timestamp: type=%v", typ) |
| } |
| n := 1 |
| |
| // Read timestamp diff. |
| ts, nb := binary.Uvarint(b[n:]) |
| if nb <= 0 { |
| return 0, 0, fmt.Errorf("found invalid uvarint for timestamp") |
| } |
| n += nb |
| |
| // Read the rest of the arguments. |
| for i := 0; i < len(spec.Args)-1; i++ { |
| arg, nb := binary.Uvarint(b[n:]) |
| if nb <= 0 { |
| return 0, 0, fmt.Errorf("found invalid uvarint") |
| } |
| e.args[i] = arg |
| n += nb |
| } |
| return n, timestamp(ts), nil |
| } |
| |
| func heapInsert(heap []*batchCursor, bc *batchCursor) []*batchCursor { |
| // Add the cursor to the end of the heap. |
| heap = append(heap, bc) |
| |
| // Sift the new entry up to the right place. |
| heapSiftUp(heap, len(heap)-1) |
| return heap |
| } |
| |
| func heapUpdate(heap []*batchCursor, i int) { |
| // Try to sift up. |
| if heapSiftUp(heap, i) != i { |
| return |
| } |
| // Try to sift down, if sifting up failed. |
| heapSiftDown(heap, i) |
| } |
| |
| func heapRemove(heap []*batchCursor, i int) []*batchCursor { |
| // Sift index i up to the root, ignoring actual values. |
| for i > 0 { |
| heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2] |
| i = (i - 1) / 2 |
| } |
| // Swap the root with the last element, then remove it. |
| heap[0], heap[len(heap)-1] = heap[len(heap)-1], heap[0] |
| heap = heap[:len(heap)-1] |
| // Sift the root down. |
| heapSiftDown(heap, 0) |
| return heap |
| } |
| |
| func heapSiftUp(heap []*batchCursor, i int) int { |
| for i > 0 && heap[(i-1)/2].ev.time > heap[i].ev.time { |
| heap[(i-1)/2], heap[i] = heap[i], heap[(i-1)/2] |
| i = (i - 1) / 2 |
| } |
| return i |
| } |
| |
| func heapSiftDown(heap []*batchCursor, i int) int { |
| for { |
| m := min3(heap, i, 2*i+1, 2*i+2) |
| if m == i { |
| // Heap invariant already applies. |
| break |
| } |
| heap[i], heap[m] = heap[m], heap[i] |
| i = m |
| } |
| return i |
| } |
| |
| func min3(b []*batchCursor, i0, i1, i2 int) int { |
| minIdx := i0 |
| minT := maxTime |
| if i0 < len(b) { |
| minT = b[i0].ev.time |
| } |
| if i1 < len(b) { |
| if t := b[i1].ev.time; t < minT { |
| minT = t |
| minIdx = i1 |
| } |
| } |
| if i2 < len(b) { |
| if t := b[i2].ev.time; t < minT { |
| minT = t |
| minIdx = i2 |
| } |
| } |
| return minIdx |
| } |