internal/lsp: rewrite the rpc debug page

Now it only uses the telemetry messages directly rather than the metric system.
This is much faster and more direct, and removes a blocker on improving the
metrics support.
Also fixes the fact that recieced and sent were the wrong way round before,
probably as an artifact of the old protocol logging code, and also removes
the bytes histogram which was a lie (it was a histogram of write sizes that
was presented as a histogram of message sizes)

fixes golang/go#38168

Change-Id: Ib1c3459c0ff1cf0c6087a828981e80c1ce1c5c1b
Reviewed-on: https://go-review.googlesource.com/c/tools/+/227139
Run-TryBot: Ian Cottrell <iancottrell@google.com>
Reviewed-by: Robert Findley <rfindley@google.com>
diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go
index 08234d2..1f2fd4c 100644
--- a/internal/jsonrpc2/jsonrpc2.go
+++ b/internal/jsonrpc2/jsonrpc2.go
@@ -112,7 +112,7 @@
 
 	event.Record(ctx, tag.Started.Of(1))
 	n, err := c.stream.Write(ctx, data)
-	event.Record(ctx, tag.ReceivedBytes.Of(n))
+	event.Record(ctx, tag.SentBytes.Of(n))
 	return err
 }
 
@@ -161,7 +161,7 @@
 	}()
 	// now we are ready to send
 	n, err := c.stream.Write(ctx, data)
-	event.Record(ctx, tag.ReceivedBytes.Of(n))
+	event.Record(ctx, tag.SentBytes.Of(n))
 	if err != nil {
 		// sending failed, we will never get a response, so don't leave it pending
 		return err
@@ -239,7 +239,7 @@
 		return err
 	}
 	n, err := r.conn.stream.Write(ctx, data)
-	event.Record(ctx, tag.ReceivedBytes.Of(n))
+	event.Record(ctx, tag.SentBytes.Of(n))
 
 	if err != nil {
 		// TODO(iancottrell): if a stream write fails, we really need to shut down
@@ -302,7 +302,7 @@
 			)
 			event.Record(reqCtx,
 				tag.Started.Of(1),
-				tag.SentBytes.Of(n))
+				tag.ReceivedBytes.Of(n))
 
 			req := &Request{
 				conn: c,
diff --git a/internal/lsp/debug/rpc.go b/internal/lsp/debug/rpc.go
index 823ee9d..4632661 100644
--- a/internal/lsp/debug/rpc.go
+++ b/internal/lsp/debug/rpc.go
@@ -16,7 +16,6 @@
 	"golang.org/x/tools/internal/lsp/debug/tag"
 	"golang.org/x/tools/internal/telemetry/event"
 	"golang.org/x/tools/internal/telemetry/export"
-	"golang.org/x/tools/internal/telemetry/export/metric"
 )
 
 var rpcTmpl = template.Must(template.Must(baseTemplate.Clone()).Parse(`
@@ -32,10 +31,10 @@
 		<b>{{.Method}}</b> {{.Started}} <a href="/trace/{{.Method}}">traces</a> ({{.InProgress}} in progress)
 		<br>
 		<i>Latency</i> {{with .Latency}}{{.Mean}} ({{.Min}}<{{.Max}}){{end}}
-		<i>By bucket</i> 0s {{range .Latency.Values}}<b>{{.Count}}</b> {{.Limit}} {{end}}
+		<i>By bucket</i> 0s {{range .Latency.Values}}{{if gt .Count 0}}<b>{{.Count}}</b> {{.Limit}} {{end}}{{end}}
 		<br>
-		<i>Received</i> {{with .Received}}{{.Mean}} ({{.Min}}<{{.Max}}){{end}}
-		<i>Sent</i> {{with .Sent}}{{.Mean}} ({{.Min}}<{{.Max}}){{end}}
+		<i>Received</i> {{.Received}} (avg. {{.ReceivedMean}})
+		<i>Sent</i> {{.Sent}} (avg. {{.SentMean}})
 		<br>
 		<i>Result codes</i> {{range .Codes}}{{.Key}}={{.Count}} {{end}}
 		</P>
@@ -45,25 +44,24 @@
 
 type rpcs struct {
 	mu       sync.Mutex
-	Inbound  []*rpcStats
-	Outbound []*rpcStats
+	Inbound  []*rpcStats // stats for incoming lsp rpcs sorted by method name
+	Outbound []*rpcStats // stats for outgoing lsp rpcs sorted by method name
 }
 
 type rpcStats struct {
-	Method     string
-	Started    int64
-	Completed  int64
-	InProgress int64
-	Latency    rpcTimeHistogram
-	Received   rpcBytesHistogram
-	Sent       rpcBytesHistogram
-	Codes      []*rpcCodeBucket
+	Method    string
+	Started   int64
+	Completed int64
+
+	Latency  rpcTimeHistogram
+	Received byteUnits
+	Sent     byteUnits
+	Codes    []*rpcCodeBucket
 }
 
 type rpcTimeHistogram struct {
 	Sum    timeUnits
 	Count  int64
-	Mean   timeUnits
 	Min    timeUnits
 	Max    timeUnits
 	Values []rpcTimeBucket
@@ -74,140 +72,142 @@
 	Count int64
 }
 
-type rpcBytesHistogram struct {
-	Sum    byteUnits
-	Count  int64
-	Mean   byteUnits
-	Min    byteUnits
-	Max    byteUnits
-	Values []rpcBytesBucket
-}
-
-type rpcBytesBucket struct {
-	Limit byteUnits
-	Count int64
-}
-
 type rpcCodeBucket struct {
 	Key   string
 	Count int64
 }
 
 func (r *rpcs) ProcessEvent(ctx context.Context, ev event.Event, tagMap event.TagMap) context.Context {
-	switch {
-	case ev.IsEndSpan():
-		// calculate latency if this was an rpc span
-		span := export.GetSpan(ctx)
-		if span == nil {
-			return ctx
-		}
-		// is this a finished rpc span, if so it will have a status code record
-		for _, ev := range span.Events() {
-			code := tag.StatusCode.Get(ev.Map())
-			if code != "" {
-				elapsedTime := span.Finish().At.Sub(span.Start().At)
-				latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
-				statsCtx := event.Label1(ctx, tag.StatusCode.Of(code))
-				event.Record1(statsCtx, tag.Latency.Of(latencyMillis))
-			}
-		}
-		return ctx
-	case ev.IsRecord():
-		// fall through to the metrics handling logic
-	default:
-		// ignore all other event types
-		return ctx
-	}
 	r.mu.Lock()
 	defer r.mu.Unlock()
-	//TODO(38168): we should just deal with the events here and not use metrics
-	metrics := metric.Entries.Get(tagMap).([]metric.Data)
-	for _, data := range metrics {
-		for i, group := range data.Groups() {
-			set := &r.Inbound
-			groupTags := event.NewTagMap(group...)
-			if tag.RPCDirection.Get(groupTags) == tag.Outbound {
-				set = &r.Outbound
-			}
-			method := tag.Method.Get(groupTags)
-			index := sort.Search(len(*set), func(i int) bool {
-				return (*set)[i].Method >= method
-			})
-			if index >= len(*set) || (*set)[index].Method != method {
-				old := *set
-				*set = make([]*rpcStats, len(old)+1)
-				copy(*set, old[:index])
-				copy((*set)[index+1:], old[index:])
-				(*set)[index] = &rpcStats{Method: method}
-			}
-			stats := (*set)[index]
-			switch data.Handle() {
-			case started.Name:
-				stats.Started = data.(*metric.Int64Data).Rows[i]
-			case completed.Name:
-				status := tag.StatusCode.Get(groupTags)
-				var b *rpcCodeBucket
-				for c, entry := range stats.Codes {
-					if entry.Key == status {
-						b = stats.Codes[c]
-						break
-					}
-				}
-				if b == nil {
-					b = &rpcCodeBucket{Key: status}
-					stats.Codes = append(stats.Codes, b)
-					sort.Slice(stats.Codes, func(i int, j int) bool {
-						return stats.Codes[i].Key < stats.Codes[j].Key
-					})
-				}
-				b.Count = data.(*metric.Int64Data).Rows[i]
-			case latency.Name:
-				data := data.(*metric.HistogramFloat64Data)
-				row := data.Rows[i]
-				stats.Latency.Count = row.Count
-				stats.Latency.Sum = timeUnits(row.Sum)
-				stats.Latency.Min = timeUnits(row.Min)
-				stats.Latency.Max = timeUnits(row.Max)
-				stats.Latency.Mean = timeUnits(row.Sum) / timeUnits(row.Count)
-				stats.Latency.Values = make([]rpcTimeBucket, len(data.Info.Buckets))
-				last := int64(0)
-				for i, b := range data.Info.Buckets {
-					stats.Latency.Values[i].Limit = timeUnits(b)
-					stats.Latency.Values[i].Count = row.Values[i] - last
-					last = row.Values[i]
-				}
-			case sentBytes.Name:
-				data := data.(*metric.HistogramInt64Data)
-				row := data.Rows[i]
-				stats.Sent.Count = row.Count
-				stats.Sent.Sum = byteUnits(row.Sum)
-				stats.Sent.Min = byteUnits(row.Min)
-				stats.Sent.Max = byteUnits(row.Max)
-				stats.Sent.Mean = byteUnits(row.Sum) / byteUnits(row.Count)
-			case receivedBytes.Name:
-				data := data.(*metric.HistogramInt64Data)
-				row := data.Rows[i]
-				stats.Received.Count = row.Count
-				stats.Received.Sum = byteUnits(row.Sum)
-				stats.Sent.Min = byteUnits(row.Min)
-				stats.Sent.Max = byteUnits(row.Max)
-				stats.Received.Mean = byteUnits(row.Sum) / byteUnits(row.Count)
-			}
+	switch {
+	case ev.IsStartSpan():
+		if _, stats := r.getRPCSpan(ctx, ev); stats != nil {
+			stats.Started++
 		}
-	}
-
-	for _, set := range [][]*rpcStats{r.Inbound, r.Outbound} {
-		for _, stats := range set {
-			stats.Completed = 0
-			for _, b := range stats.Codes {
-				stats.Completed += b.Count
+	case ev.IsEndSpan():
+		span, stats := r.getRPCSpan(ctx, ev)
+		if stats != nil {
+			endRPC(ctx, ev, span, stats)
+		}
+	case ev.IsRecord():
+		sent := byteUnits(tag.SentBytes.Get(tagMap))
+		rec := byteUnits(tag.ReceivedBytes.Get(tagMap))
+		if sent != 0 || rec != 0 {
+			if _, stats := r.getRPCSpan(ctx, ev); stats != nil {
+				stats.Sent += sent
+				stats.Received += rec
 			}
-			stats.InProgress = stats.Started - stats.Completed
 		}
 	}
 	return ctx
 }
 
+func endRPC(ctx context.Context, ev event.Event, span *export.Span, stats *rpcStats) {
+	// update the basic counts
+	stats.Completed++
+
+	// get and record the status code
+	if status := getStatusCode(span); status != "" {
+		var b *rpcCodeBucket
+		for c, entry := range stats.Codes {
+			if entry.Key == status {
+				b = stats.Codes[c]
+				break
+			}
+		}
+		if b == nil {
+			b = &rpcCodeBucket{Key: status}
+			stats.Codes = append(stats.Codes, b)
+			sort.Slice(stats.Codes, func(i int, j int) bool {
+				return stats.Codes[i].Key < stats.Codes[j].Key
+			})
+		}
+		b.Count++
+	}
+
+	// calculate latency if this was an rpc span
+	elapsedTime := span.Finish().At.Sub(span.Start().At)
+	latencyMillis := timeUnits(elapsedTime) / timeUnits(time.Millisecond)
+	if stats.Latency.Count == 0 {
+		stats.Latency.Min = latencyMillis
+		stats.Latency.Max = latencyMillis
+	} else {
+		if stats.Latency.Min > latencyMillis {
+			stats.Latency.Min = latencyMillis
+		}
+		if stats.Latency.Max < latencyMillis {
+			stats.Latency.Max = latencyMillis
+		}
+	}
+	stats.Latency.Count++
+	stats.Latency.Sum += latencyMillis
+	for i := range stats.Latency.Values {
+		if stats.Latency.Values[i].Limit > latencyMillis {
+			stats.Latency.Values[i].Count++
+			break
+		}
+	}
+}
+
+func (r *rpcs) getRPCSpan(ctx context.Context, ev event.Event) (*export.Span, *rpcStats) {
+	// get the span
+	span := export.GetSpan(ctx)
+	if span == nil {
+		return nil, nil
+	}
+	// use the span start event look up the correct stats block
+	// we do this because it prevents us matching a sub span
+	startMap := span.Start().Map()
+	return span, r.getRPCStats(startMap)
+}
+
+func (r *rpcs) getRPCStats(tagMap event.TagMap) *rpcStats {
+	method := tag.Method.Get(tagMap)
+	if method == "" {
+		return nil
+	}
+	set := &r.Inbound
+	if tag.RPCDirection.Get(tagMap) != tag.Inbound {
+		set = &r.Outbound
+	}
+	// get the record for this method
+	index := sort.Search(len(*set), func(i int) bool {
+		return (*set)[i].Method >= method
+	})
+
+	if index < len(*set) && (*set)[index].Method == method {
+		return (*set)[index]
+	}
+
+	old := *set
+	*set = make([]*rpcStats, len(old)+1)
+	copy(*set, old[:index])
+	copy((*set)[index+1:], old[index:])
+	stats := &rpcStats{Method: method}
+	stats.Latency.Values = make([]rpcTimeBucket, len(millisecondsDistribution))
+	for i, m := range millisecondsDistribution {
+		stats.Latency.Values[i].Limit = timeUnits(m)
+	}
+	(*set)[index] = stats
+	return stats
+}
+
+func (s *rpcStats) InProgress() int64       { return s.Started - s.Completed }
+func (s *rpcStats) SentMean() byteUnits     { return s.Sent / byteUnits(s.Started) }
+func (s *rpcStats) ReceivedMean() byteUnits { return s.Received / byteUnits(s.Started) }
+
+func (h *rpcTimeHistogram) Mean() timeUnits { return h.Sum / timeUnits(h.Count) }
+
+func getStatusCode(span *export.Span) string {
+	for _, ev := range span.Events() {
+		if status := tag.StatusCode.Get(ev.Map()); status != "" {
+			return status
+		}
+	}
+	return ""
+}
+
 func (r *rpcs) getData(req *http.Request) interface{} {
 	return r
 }