cmd/trace/v2: tolerate traces with broken tails

This change modifies cmd/trace/v2 to tolerate traces with
incomplete/broken generations at the tail. These broken tails can be
created if a program crashes while a trace is being produced. Although
the runtime tries to flush the trace on some panics, it may still
produce some extra trace data that is incomplete.

This change modifies cmd/trace/v2 to still work on any complete
generations, even if there are incomplete/broken generations at the tail
end of the trace. Basically, the tool now just tracks when the last good
generation ended (via Sync events) and truncates the trace to that point
when it encounters an error.

This change also revamps the text output of the tool to emit regular
progress notifications as well as warnings as to how much of the trace
data was lost.

Fixes #65316.

Change-Id: I877d39993bc02a81eebe647db9c2be17635bcec8
Reviewed-on: https://go-review.googlesource.com/c/go/+/580135
Reviewed-by: Carlos Amedee <carlos@golang.org>
LUCI-TryBot-Result: Go LUCI <golang-scoped@luci-project-accounts.iam.gserviceaccount.com>
Auto-Submit: Michael Knyszek <mknyszek@google.com>
diff --git a/src/cmd/trace/v2/jsontrace_test.go b/src/cmd/trace/v2/jsontrace_test.go
index 65ce041..e1a5366 100644
--- a/src/cmd/trace/v2/jsontrace_test.go
+++ b/src/cmd/trace/v2/jsontrace_test.go
@@ -283,7 +283,7 @@
 	}
 
 	// Parse the test trace.
-	parsed, err := parseTrace(&trace)
+	parsed, err := parseTrace(&trace, int64(trace.Len()))
 	if err != nil {
 		t.Fatalf("failed to parse trace: %v", err)
 	}
diff --git a/src/cmd/trace/v2/main.go b/src/cmd/trace/v2/main.go
index f2a54ee..93e9fa7 100644
--- a/src/cmd/trace/v2/main.go
+++ b/src/cmd/trace/v2/main.go
@@ -14,6 +14,8 @@
 	"net"
 	"net/http"
 	"os"
+	"sync/atomic"
+	"time"
 
 	"internal/trace/v2/raw"
 
@@ -28,9 +30,16 @@
 	}
 	defer tracef.Close()
 
+	// Get the size of the trace file.
+	fi, err := tracef.Stat()
+	if err != nil {
+		return fmt.Errorf("failed to stat trace file: %v", err)
+	}
+	traceSize := fi.Size()
+
 	// Handle requests for profiles.
 	if pprof != "" {
-		parsed, err := parseTrace(tracef)
+		parsed, err := parseTrace(tracef, traceSize)
 		if err != nil {
 			return err
 		}
@@ -72,7 +81,7 @@
 	addr := "http://" + ln.Addr().String()
 
 	log.Print("Preparing trace for viewer...")
-	parsed, err := parseTrace(tracef)
+	parsed, err := parseTraceInteractive(tracef, traceSize)
 	if err != nil {
 		return err
 	}
@@ -80,6 +89,15 @@
 	// We might double-close, but that's fine; we ignore the error.
 	tracef.Close()
 
+	// Print a nice message for a partial trace.
+	if parsed.err != nil {
+		log.Printf("Encountered error, but able to proceed. Error: %v", parsed.err)
+
+		lost := parsed.size - parsed.valid
+		pct := float64(lost) / float64(parsed.size) * 100
+		log.Printf("Lost %.2f%% of the latest trace data due to error (%s of %s)", pct, byteCount(lost), byteCount(parsed.size))
+	}
+
 	log.Print("Splitting trace for viewer...")
 	ranges, err := splitTrace(parsed)
 	if err != nil {
@@ -140,29 +158,79 @@
 	return fmt.Errorf("failed to start http server: %w", err)
 }
 
-type parsedTrace struct {
-	events  []tracev2.Event
-	summary *trace.Summary
+func parseTraceInteractive(tr io.Reader, size int64) (parsed *parsedTrace, err error) {
+	done := make(chan struct{})
+	cr := countingReader{r: tr}
+	go func() {
+		parsed, err = parseTrace(&cr, size)
+		done <- struct{}{}
+	}()
+	ticker := time.NewTicker(5 * time.Second)
+progressLoop:
+	for {
+		select {
+		case <-ticker.C:
+		case <-done:
+			ticker.Stop()
+			break progressLoop
+		}
+		progress := cr.bytesRead.Load()
+		pct := float64(progress) / float64(size) * 100
+		log.Printf("%s of %s (%.1f%%) processed...", byteCount(progress), byteCount(size), pct)
+	}
+	return
 }
 
-func parseTrace(tr io.Reader) (*parsedTrace, error) {
-	r, err := tracev2.NewReader(tr)
+type parsedTrace struct {
+	events      []tracev2.Event
+	summary     *trace.Summary
+	size, valid int64
+	err         error
+}
+
+func parseTrace(rr io.Reader, size int64) (*parsedTrace, error) {
+	// Set up the reader.
+	cr := countingReader{r: rr}
+	r, err := tracev2.NewReader(&cr)
 	if err != nil {
 		return nil, fmt.Errorf("failed to create trace reader: %w", err)
 	}
+
+	// Set up state.
 	s := trace.NewSummarizer()
 	t := new(parsedTrace)
+	var validBytes int64
+	var validEvents int
 	for {
 		ev, err := r.ReadEvent()
 		if err == io.EOF {
+			validBytes = cr.bytesRead.Load()
+			validEvents = len(t.events)
 			break
-		} else if err != nil {
-			return nil, fmt.Errorf("failed to read event: %w", err)
+		}
+		if err != nil {
+			t.err = err
+			break
 		}
 		t.events = append(t.events, ev)
 		s.Event(&t.events[len(t.events)-1])
+
+		if ev.Kind() == tracev2.EventSync {
+			validBytes = cr.bytesRead.Load()
+			validEvents = len(t.events)
+		}
 	}
+
+	// Check to make sure we got at least one good generation.
+	if validEvents == 0 {
+		return nil, fmt.Errorf("failed to parse any useful part of the trace: %v", t.err)
+	}
+
+	// Finish off the parsedTrace.
 	t.summary = s.Finalize()
+	t.valid = validBytes
+	t.size = size
+	t.events = t.events[:validEvents]
 	return t, nil
 }
 
@@ -217,3 +285,39 @@
 		fmt.Println(ev.String())
 	}
 }
+
+type countingReader struct {
+	r         io.Reader
+	bytesRead atomic.Int64
+}
+
+func (c *countingReader) Read(buf []byte) (n int, err error) {
+	n, err = c.r.Read(buf)
+	c.bytesRead.Add(int64(n))
+	return n, err
+}
+
+type byteCount int64
+
+func (b byteCount) String() string {
+	var suffix string
+	var divisor int64
+	switch {
+	case b < 1<<10:
+		suffix = "B"
+		divisor = 1
+	case b < 1<<20:
+		suffix = "KiB"
+		divisor = 1 << 10
+	case b < 1<<30:
+		suffix = "MiB"
+		divisor = 1 << 20
+	case b < 1<<40:
+		suffix = "GiB"
+		divisor = 1 << 30
+	}
+	if divisor == 1 {
+		return fmt.Sprintf("%d %s", b, suffix)
+	}
+	return fmt.Sprintf("%.1f %s", float64(b)/float64(divisor), suffix)
+}