blob: 07a6e13ffef985423a5aef4bb97ad93806cb5f1c [file] [log] [blame]
// Copyright 2016 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package trace
import (
"fmt"
"sort"
)
type eventBatch struct {
events []*Event
selected bool
}
type orderEvent struct {
ev *Event
batch int
g uint64
init gState
next gState
}
type gStatus int
type gState struct {
seq uint64
status gStatus
}
const (
gDead gStatus = iota
gRunnable
gRunning
gWaiting
unordered = ^uint64(0)
garbage = ^uint64(0) - 1
noseq = ^uint64(0)
seqinc = ^uint64(0) - 1
)
// order1007 merges a set of per-P event batches into a single, consistent stream.
// The high level idea is as follows. Events within an individual batch are in
// correct order, because they are emitted by a single P. So we need to produce
// a correct interleaving of the batches. To do this we take first unmerged event
// from each batch (frontier). Then choose subset that is "ready" to be merged,
// that is, events for which all dependencies are already merged. Then we choose
// event with the lowest timestamp from the subset, merge it and repeat.
// This approach ensures that we form a consistent stream even if timestamps are
// incorrect (condition observed on some machines).
func order1007(m map[int][]*Event) (events []*Event, err error) {
pending := 0
// The ordering of CPU profile sample events in the data stream is based on
// when each run of the signal handler was able to acquire the spinlock,
// with original timestamps corresponding to when ReadTrace pulled the data
// off of the profBuf queue. Re-sort them by the timestamp we captured
// inside the signal handler.
sort.Stable(eventList(m[ProfileP]))
var batches []*eventBatch
for _, v := range m {
pending += len(v)
batches = append(batches, &eventBatch{v, false})
}
gs := make(map[uint64]gState)
var frontier []orderEvent
for ; pending != 0; pending-- {
for i, b := range batches {
if b.selected || len(b.events) == 0 {
continue
}
ev := b.events[0]
g, init, next := stateTransition(ev)
if !transitionReady(g, gs[g], init) {
continue
}
frontier = append(frontier, orderEvent{ev, i, g, init, next})
b.events = b.events[1:]
b.selected = true
// Get rid of "Local" events, they are intended merely for ordering.
switch ev.Type {
case EvGoStartLocal:
ev.Type = EvGoStart
case EvGoUnblockLocal:
ev.Type = EvGoUnblock
case EvGoSysExitLocal:
ev.Type = EvGoSysExit
}
}
if len(frontier) == 0 {
return nil, fmt.Errorf("no consistent ordering of events possible")
}
sort.Sort(orderEventList(frontier))
f := frontier[0]
frontier[0] = frontier[len(frontier)-1]
frontier = frontier[:len(frontier)-1]
events = append(events, f.ev)
transition(gs, f.g, f.init, f.next)
if !batches[f.batch].selected {
panic("frontier batch is not selected")
}
batches[f.batch].selected = false
}
// At this point we have a consistent stream of events.
// Make sure time stamps respect the ordering.
// The tests will skip (not fail) the test case if they see this error.
if !sort.IsSorted(eventList(events)) {
return nil, ErrTimeOrder
}
// The last part is giving correct timestamps to EvGoSysExit events.
// The problem with EvGoSysExit is that actual syscall exit timestamp (ev.Args[2])
// is potentially acquired long before event emission. So far we've used
// timestamp of event emission (ev.Ts).
// We could not set ev.Ts = ev.Args[2] earlier, because it would produce
// seemingly broken timestamps (misplaced event).
// We also can't simply update the timestamp and resort events, because
// if timestamps are broken we will misplace the event and later report
// logically broken trace (instead of reporting broken timestamps).
lastSysBlock := make(map[uint64]int64)
for _, ev := range events {
switch ev.Type {
case EvGoSysBlock, EvGoInSyscall:
lastSysBlock[ev.G] = ev.Ts
case EvGoSysExit:
ts := int64(ev.Args[2])
if ts == 0 {
continue
}
block := lastSysBlock[ev.G]
if block == 0 {
return nil, fmt.Errorf("stray syscall exit")
}
if ts < block {
return nil, ErrTimeOrder
}
ev.Ts = ts
}
}
sort.Stable(eventList(events))
return
}
// stateTransition returns goroutine state (sequence and status) when the event
// becomes ready for merging (init) and the goroutine state after the event (next).
func stateTransition(ev *Event) (g uint64, init, next gState) {
switch ev.Type {
case EvGoCreate:
g = ev.Args[0]
init = gState{0, gDead}
next = gState{1, gRunnable}
case EvGoWaiting, EvGoInSyscall:
g = ev.G
init = gState{1, gRunnable}
next = gState{2, gWaiting}
case EvGoStart, EvGoStartLabel:
g = ev.G
init = gState{ev.Args[1], gRunnable}
next = gState{ev.Args[1] + 1, gRunning}
case EvGoStartLocal:
// noseq means that this event is ready for merging as soon as
// frontier reaches it (EvGoStartLocal is emitted on the same P
// as the corresponding EvGoCreate/EvGoUnblock, and thus the latter
// is already merged).
// seqinc is a stub for cases when event increments g sequence,
// but since we don't know current seq we also don't know next seq.
g = ev.G
init = gState{noseq, gRunnable}
next = gState{seqinc, gRunning}
case EvGoBlock, EvGoBlockSend, EvGoBlockRecv, EvGoBlockSelect,
EvGoBlockSync, EvGoBlockCond, EvGoBlockNet, EvGoSleep,
EvGoSysBlock, EvGoBlockGC:
g = ev.G
init = gState{noseq, gRunning}
next = gState{noseq, gWaiting}
case EvGoSched, EvGoPreempt:
g = ev.G
init = gState{noseq, gRunning}
next = gState{noseq, gRunnable}
case EvGoUnblock, EvGoSysExit:
g = ev.Args[0]
init = gState{ev.Args[1], gWaiting}
next = gState{ev.Args[1] + 1, gRunnable}
case EvGoUnblockLocal, EvGoSysExitLocal:
g = ev.Args[0]
init = gState{noseq, gWaiting}
next = gState{seqinc, gRunnable}
case EvGCStart:
g = garbage
init = gState{ev.Args[0], gDead}
next = gState{ev.Args[0] + 1, gDead}
default:
// no ordering requirements
g = unordered
}
return
}
func transitionReady(g uint64, curr, init gState) bool {
return g == unordered || (init.seq == noseq || init.seq == curr.seq) && init.status == curr.status
}
func transition(gs map[uint64]gState, g uint64, init, next gState) {
if g == unordered {
return
}
curr := gs[g]
if !transitionReady(g, curr, init) {
panic("event sequences are broken")
}
switch next.seq {
case noseq:
next.seq = curr.seq
case seqinc:
next.seq = curr.seq + 1
}
gs[g] = next
}
// order1005 merges a set of per-P event batches into a single, consistent stream.
func order1005(m map[int][]*Event) (events []*Event, err error) {
for _, batch := range m {
events = append(events, batch...)
}
for _, ev := range events {
if ev.Type == EvGoSysExit {
// EvGoSysExit emission is delayed until the thread has a P.
// Give it the real sequence number and time stamp.
ev.seq = int64(ev.Args[1])
if ev.Args[2] != 0 {
ev.Ts = int64(ev.Args[2])
}
}
}
sort.Sort(eventSeqList(events))
if !sort.IsSorted(eventList(events)) {
return nil, ErrTimeOrder
}
return
}
type orderEventList []orderEvent
func (l orderEventList) Len() int {
return len(l)
}
func (l orderEventList) Less(i, j int) bool {
return l[i].ev.Ts < l[j].ev.Ts
}
func (l orderEventList) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
}
type eventList []*Event
func (l eventList) Len() int {
return len(l)
}
func (l eventList) Less(i, j int) bool {
return l[i].Ts < l[j].Ts
}
func (l eventList) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
}
type eventSeqList []*Event
func (l eventSeqList) Len() int {
return len(l)
}
func (l eventSeqList) Less(i, j int) bool {
return l[i].seq < l[j].seq
}
func (l eventSeqList) Swap(i, j int) {
l[i], l[j] = l[j], l[i]
}