| // Copyright 2016 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package trace |
| |
| import ( |
| "fmt" |
| "sort" |
| ) |
| |
| type eventBatch struct { |
| events []*Event |
| selected bool |
| } |
| |
| type orderEvent struct { |
| ev *Event |
| batch int |
| g uint64 |
| init gState |
| next gState |
| } |
| |
| type gStatus int |
| |
| type gState struct { |
| seq uint64 |
| status gStatus |
| } |
| |
| const ( |
| gDead gStatus = iota |
| gRunnable |
| gRunning |
| gWaiting |
| |
| unordered = ^uint64(0) |
| garbage = ^uint64(0) - 1 |
| noseq = ^uint64(0) |
| seqinc = ^uint64(0) - 1 |
| ) |
| |
| // order1007 merges a set of per-P event batches into a single, consistent stream. |
| // The high level idea is as follows. Events within an individual batch are in |
| // correct order, because they are emitted by a single P. So we need to produce |
| // a correct interleaving of the batches. To do this we take first unmerged event |
| // from each batch (frontier). Then choose subset that is "ready" to be merged, |
| // that is, events for which all dependencies are already merged. Then we choose |
| // event with the lowest timestamp from the subset, merge it and repeat. |
| // This approach ensures that we form a consistent stream even if timestamps are |
| // incorrect (condition observed on some machines). |
| func order1007(m map[int][]*Event) (events []*Event, err error) { |
| pending := 0 |
| // The ordering of CPU profile sample events in the data stream is based on |
| // when each run of the signal handler was able to acquire the spinlock, |
| // with original timestamps corresponding to when ReadTrace pulled the data |
| // off of the profBuf queue. Re-sort them by the timestamp we captured |
| // inside the signal handler. |
| sort.Stable(eventList(m[ProfileP])) |
| var batches []*eventBatch |
| for _, v := range m { |
| pending += len(v) |
| batches = append(batches, &eventBatch{v, false}) |
| } |
| gs := make(map[uint64]gState) |
| var frontier []orderEvent |
| for ; pending != 0; pending-- { |
| for i, b := range batches { |
| if b.selected || len(b.events) == 0 { |
| continue |
| } |
| ev := b.events[0] |
| g, init, next := stateTransition(ev) |
| if !transitionReady(g, gs[g], init) { |
| continue |
| } |
| frontier = append(frontier, orderEvent{ev, i, g, init, next}) |
| b.events = b.events[1:] |
| b.selected = true |
| // Get rid of "Local" events, they are intended merely for ordering. |
| switch ev.Type { |
| case EvGoStartLocal: |
| ev.Type = EvGoStart |
| case EvGoUnblockLocal: |
| ev.Type = EvGoUnblock |
| case EvGoSysExitLocal: |
| ev.Type = EvGoSysExit |
| } |
| } |
| if len(frontier) == 0 { |
| return nil, fmt.Errorf("no consistent ordering of events possible") |
| } |
| sort.Sort(orderEventList(frontier)) |
| f := frontier[0] |
| frontier[0] = frontier[len(frontier)-1] |
| frontier = frontier[:len(frontier)-1] |
| events = append(events, f.ev) |
| transition(gs, f.g, f.init, f.next) |
| if !batches[f.batch].selected { |
| panic("frontier batch is not selected") |
| } |
| batches[f.batch].selected = false |
| } |
| |
| // At this point we have a consistent stream of events. |
| // Make sure time stamps respect the ordering. |
| // The tests will skip (not fail) the test case if they see this error. |
| if !sort.IsSorted(eventList(events)) { |
| return nil, ErrTimeOrder |
| } |
| |
| // The last part is giving correct timestamps to EvGoSysExit events. |
| // The problem with EvGoSysExit is that actual syscall exit timestamp (ev.Args[2]) |
| // is potentially acquired long before event emission. So far we've used |
| // timestamp of event emission (ev.Ts). |
| // We could not set ev.Ts = ev.Args[2] earlier, because it would produce |
| // seemingly broken timestamps (misplaced event). |
| // We also can't simply update the timestamp and resort events, because |
| // if timestamps are broken we will misplace the event and later report |
| // logically broken trace (instead of reporting broken timestamps). |
| lastSysBlock := make(map[uint64]int64) |
| for _, ev := range events { |
| switch ev.Type { |
| case EvGoSysBlock, EvGoInSyscall: |
| lastSysBlock[ev.G] = ev.Ts |
| case EvGoSysExit: |
| ts := int64(ev.Args[2]) |
| if ts == 0 { |
| continue |
| } |
| block := lastSysBlock[ev.G] |
| if block == 0 { |
| return nil, fmt.Errorf("stray syscall exit") |
| } |
| if ts < block { |
| return nil, ErrTimeOrder |
| } |
| ev.Ts = ts |
| } |
| } |
| sort.Stable(eventList(events)) |
| |
| return |
| } |
| |
| // stateTransition returns goroutine state (sequence and status) when the event |
| // becomes ready for merging (init) and the goroutine state after the event (next). |
| func stateTransition(ev *Event) (g uint64, init, next gState) { |
| switch ev.Type { |
| case EvGoCreate: |
| g = ev.Args[0] |
| init = gState{0, gDead} |
| next = gState{1, gRunnable} |
| case EvGoWaiting, EvGoInSyscall: |
| g = ev.G |
| init = gState{1, gRunnable} |
| next = gState{2, gWaiting} |
| case EvGoStart, EvGoStartLabel: |
| g = ev.G |
| init = gState{ev.Args[1], gRunnable} |
| next = gState{ev.Args[1] + 1, gRunning} |
| case EvGoStartLocal: |
| // noseq means that this event is ready for merging as soon as |
| // frontier reaches it (EvGoStartLocal is emitted on the same P |
| // as the corresponding EvGoCreate/EvGoUnblock, and thus the latter |
| // is already merged). |
| // seqinc is a stub for cases when event increments g sequence, |
| // but since we don't know current seq we also don't know next seq. |
| g = ev.G |
| init = gState{noseq, gRunnable} |
| next = gState{seqinc, gRunning} |
| case EvGoBlock, EvGoBlockSend, EvGoBlockRecv, EvGoBlockSelect, |
| EvGoBlockSync, EvGoBlockCond, EvGoBlockNet, EvGoSleep, |
| EvGoSysBlock, EvGoBlockGC: |
| g = ev.G |
| init = gState{noseq, gRunning} |
| next = gState{noseq, gWaiting} |
| case EvGoSched, EvGoPreempt: |
| g = ev.G |
| init = gState{noseq, gRunning} |
| next = gState{noseq, gRunnable} |
| case EvGoUnblock, EvGoSysExit: |
| g = ev.Args[0] |
| init = gState{ev.Args[1], gWaiting} |
| next = gState{ev.Args[1] + 1, gRunnable} |
| case EvGoUnblockLocal, EvGoSysExitLocal: |
| g = ev.Args[0] |
| init = gState{noseq, gWaiting} |
| next = gState{seqinc, gRunnable} |
| case EvGCStart: |
| g = garbage |
| init = gState{ev.Args[0], gDead} |
| next = gState{ev.Args[0] + 1, gDead} |
| default: |
| // no ordering requirements |
| g = unordered |
| } |
| return |
| } |
| |
| func transitionReady(g uint64, curr, init gState) bool { |
| return g == unordered || (init.seq == noseq || init.seq == curr.seq) && init.status == curr.status |
| } |
| |
| func transition(gs map[uint64]gState, g uint64, init, next gState) { |
| if g == unordered { |
| return |
| } |
| curr := gs[g] |
| if !transitionReady(g, curr, init) { |
| panic("event sequences are broken") |
| } |
| switch next.seq { |
| case noseq: |
| next.seq = curr.seq |
| case seqinc: |
| next.seq = curr.seq + 1 |
| } |
| gs[g] = next |
| } |
| |
| // order1005 merges a set of per-P event batches into a single, consistent stream. |
| func order1005(m map[int][]*Event) (events []*Event, err error) { |
| for _, batch := range m { |
| events = append(events, batch...) |
| } |
| for _, ev := range events { |
| if ev.Type == EvGoSysExit { |
| // EvGoSysExit emission is delayed until the thread has a P. |
| // Give it the real sequence number and time stamp. |
| ev.seq = int64(ev.Args[1]) |
| if ev.Args[2] != 0 { |
| ev.Ts = int64(ev.Args[2]) |
| } |
| } |
| } |
| sort.Sort(eventSeqList(events)) |
| if !sort.IsSorted(eventList(events)) { |
| return nil, ErrTimeOrder |
| } |
| return |
| } |
| |
| type orderEventList []orderEvent |
| |
| func (l orderEventList) Len() int { |
| return len(l) |
| } |
| |
| func (l orderEventList) Less(i, j int) bool { |
| return l[i].ev.Ts < l[j].ev.Ts |
| } |
| |
| func (l orderEventList) Swap(i, j int) { |
| l[i], l[j] = l[j], l[i] |
| } |
| |
| type eventList []*Event |
| |
| func (l eventList) Len() int { |
| return len(l) |
| } |
| |
| func (l eventList) Less(i, j int) bool { |
| return l[i].Ts < l[j].Ts |
| } |
| |
| func (l eventList) Swap(i, j int) { |
| l[i], l[j] = l[j], l[i] |
| } |
| |
| type eventSeqList []*Event |
| |
| func (l eventSeqList) Len() int { |
| return len(l) |
| } |
| |
| func (l eventSeqList) Less(i, j int) bool { |
| return l[i].seq < l[j].seq |
| } |
| |
| func (l eventSeqList) Swap(i, j int) { |
| l[i], l[j] = l[j], l[i] |
| } |