internal/lsp: rewrite the rpc debug page
Now it only uses the telemetry messages directly rather than the metric system.
This is much faster and more direct, and removes a blocker on improving the
metrics support.
Also fixes the fact that recieced and sent were the wrong way round before,
probably as an artifact of the old protocol logging code, and also removes
the bytes histogram which was a lie (it was a histogram of write sizes that
was presented as a histogram of message sizes)
fixes golang/go#38168
Change-Id: Ib1c3459c0ff1cf0c6087a828981e80c1ce1c5c1b
Reviewed-on: https://go-review.googlesource.com/c/tools/+/227139
Run-TryBot: Ian Cottrell <iancottrell@google.com>
Reviewed-by: Robert Findley <rfindley@google.com>
diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go
index 08234d2..1f2fd4c 100644
--- a/internal/jsonrpc2/jsonrpc2.go
+++ b/internal/jsonrpc2/jsonrpc2.go
@@ -112,7 +112,7 @@
event.Record(ctx, tag.Started.Of(1))
n, err := c.stream.Write(ctx, data)
- event.Record(ctx, tag.ReceivedBytes.Of(n))
+ event.Record(ctx, tag.SentBytes.Of(n))
return err
}
@@ -161,7 +161,7 @@
}()
// now we are ready to send
n, err := c.stream.Write(ctx, data)
- event.Record(ctx, tag.ReceivedBytes.Of(n))
+ event.Record(ctx, tag.SentBytes.Of(n))
if err != nil {
// sending failed, we will never get a response, so don't leave it pending
return err
@@ -239,7 +239,7 @@
return err
}
n, err := r.conn.stream.Write(ctx, data)
- event.Record(ctx, tag.ReceivedBytes.Of(n))
+ event.Record(ctx, tag.SentBytes.Of(n))
if err != nil {
// TODO(iancottrell): if a stream write fails, we really need to shut down
@@ -302,7 +302,7 @@
)
event.Record(reqCtx,
tag.Started.Of(1),
- tag.SentBytes.Of(n))
+ tag.ReceivedBytes.Of(n))
req := &Request{
conn: c,
diff --git a/internal/lsp/debug/rpc.go b/internal/lsp/debug/rpc.go
index 823ee9d..4632661 100644
--- a/internal/lsp/debug/rpc.go
+++ b/internal/lsp/debug/rpc.go
@@ -16,7 +16,6 @@
"golang.org/x/tools/internal/lsp/debug/tag"
"golang.org/x/tools/internal/telemetry/event"
"golang.org/x/tools/internal/telemetry/export"
- "golang.org/x/tools/internal/telemetry/export/metric"
)
var rpcTmpl = template.Must(template.Must(baseTemplate.Clone()).Parse(`
@@ -32,10 +31,10 @@
<b>{{.Method}}</b> {{.Started}} <a href="/trace/{{.Method}}">traces</a> ({{.InProgress}} in progress)
<br>
<i>Latency</i> {{with .Latency}}{{.Mean}} ({{.Min}}<{{.Max}}){{end}}
- <i>By bucket</i> 0s {{range .Latency.Values}}<b>{{.Count}}</b> {{.Limit}} {{end}}
+ <i>By bucket</i> 0s {{range .Latency.Values}}{{if gt .Count 0}}<b>{{.Count}}</b> {{.Limit}} {{end}}{{end}}
<br>
- <i>Received</i> {{with .Received}}{{.Mean}} ({{.Min}}<{{.Max}}){{end}}
- <i>Sent</i> {{with .Sent}}{{.Mean}} ({{.Min}}<{{.Max}}){{end}}
+ <i>Received</i> {{.Received}} (avg. {{.ReceivedMean}})
+ <i>Sent</i> {{.Sent}} (avg. {{.SentMean}})
<br>
<i>Result codes</i> {{range .Codes}}{{.Key}}={{.Count}} {{end}}
</P>
@@ -45,25 +44,24 @@
type rpcs struct {
mu sync.Mutex
- Inbound []*rpcStats
- Outbound []*rpcStats
+ Inbound []*rpcStats // stats for incoming lsp rpcs sorted by method name
+ Outbound []*rpcStats // stats for outgoing lsp rpcs sorted by method name
}
type rpcStats struct {
- Method string
- Started int64
- Completed int64
- InProgress int64
- Latency rpcTimeHistogram
- Received rpcBytesHistogram
- Sent rpcBytesHistogram
- Codes []*rpcCodeBucket
+ Method string
+ Started int64
+ Completed int64
+
+ Latency rpcTimeHistogram
+ Received byteUnits
+ Sent byteUnits
+ Codes []*rpcCodeBucket
}
type rpcTimeHistogram struct {
Sum timeUnits
Count int64
- Mean timeUnits
Min timeUnits
Max timeUnits
Values []rpcTimeBucket
@@ -74,140 +72,142 @@
Count int64
}
-type rpcBytesHistogram struct {
- Sum byteUnits
- Count int64
- Mean byteUnits
- Min byteUnits
- Max byteUnits
- Values []rpcBytesBucket
-}
-
-type rpcBytesBucket struct {
- Limit byteUnits
- Count int64
-}
-
type rpcCodeBucket struct {
Key string
Count int64
}
func (r *rpcs) ProcessEvent(ctx context.Context, ev event.Event, tagMap event.TagMap) context.Context {
- switch {
- case ev.IsEndSpan():
- // calculate latency if this was an rpc span
- span := export.GetSpan(ctx)
- if span == nil {
- return ctx
- }
- // is this a finished rpc span, if so it will have a status code record
- for _, ev := range span.Events() {
- code := tag.StatusCode.Get(ev.Map())
- if code != "" {
- elapsedTime := span.Finish().At.Sub(span.Start().At)
- latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
- statsCtx := event.Label1(ctx, tag.StatusCode.Of(code))
- event.Record1(statsCtx, tag.Latency.Of(latencyMillis))
- }
- }
- return ctx
- case ev.IsRecord():
- // fall through to the metrics handling logic
- default:
- // ignore all other event types
- return ctx
- }
r.mu.Lock()
defer r.mu.Unlock()
- //TODO(38168): we should just deal with the events here and not use metrics
- metrics := metric.Entries.Get(tagMap).([]metric.Data)
- for _, data := range metrics {
- for i, group := range data.Groups() {
- set := &r.Inbound
- groupTags := event.NewTagMap(group...)
- if tag.RPCDirection.Get(groupTags) == tag.Outbound {
- set = &r.Outbound
- }
- method := tag.Method.Get(groupTags)
- index := sort.Search(len(*set), func(i int) bool {
- return (*set)[i].Method >= method
- })
- if index >= len(*set) || (*set)[index].Method != method {
- old := *set
- *set = make([]*rpcStats, len(old)+1)
- copy(*set, old[:index])
- copy((*set)[index+1:], old[index:])
- (*set)[index] = &rpcStats{Method: method}
- }
- stats := (*set)[index]
- switch data.Handle() {
- case started.Name:
- stats.Started = data.(*metric.Int64Data).Rows[i]
- case completed.Name:
- status := tag.StatusCode.Get(groupTags)
- var b *rpcCodeBucket
- for c, entry := range stats.Codes {
- if entry.Key == status {
- b = stats.Codes[c]
- break
- }
- }
- if b == nil {
- b = &rpcCodeBucket{Key: status}
- stats.Codes = append(stats.Codes, b)
- sort.Slice(stats.Codes, func(i int, j int) bool {
- return stats.Codes[i].Key < stats.Codes[j].Key
- })
- }
- b.Count = data.(*metric.Int64Data).Rows[i]
- case latency.Name:
- data := data.(*metric.HistogramFloat64Data)
- row := data.Rows[i]
- stats.Latency.Count = row.Count
- stats.Latency.Sum = timeUnits(row.Sum)
- stats.Latency.Min = timeUnits(row.Min)
- stats.Latency.Max = timeUnits(row.Max)
- stats.Latency.Mean = timeUnits(row.Sum) / timeUnits(row.Count)
- stats.Latency.Values = make([]rpcTimeBucket, len(data.Info.Buckets))
- last := int64(0)
- for i, b := range data.Info.Buckets {
- stats.Latency.Values[i].Limit = timeUnits(b)
- stats.Latency.Values[i].Count = row.Values[i] - last
- last = row.Values[i]
- }
- case sentBytes.Name:
- data := data.(*metric.HistogramInt64Data)
- row := data.Rows[i]
- stats.Sent.Count = row.Count
- stats.Sent.Sum = byteUnits(row.Sum)
- stats.Sent.Min = byteUnits(row.Min)
- stats.Sent.Max = byteUnits(row.Max)
- stats.Sent.Mean = byteUnits(row.Sum) / byteUnits(row.Count)
- case receivedBytes.Name:
- data := data.(*metric.HistogramInt64Data)
- row := data.Rows[i]
- stats.Received.Count = row.Count
- stats.Received.Sum = byteUnits(row.Sum)
- stats.Sent.Min = byteUnits(row.Min)
- stats.Sent.Max = byteUnits(row.Max)
- stats.Received.Mean = byteUnits(row.Sum) / byteUnits(row.Count)
- }
+ switch {
+ case ev.IsStartSpan():
+ if _, stats := r.getRPCSpan(ctx, ev); stats != nil {
+ stats.Started++
}
- }
-
- for _, set := range [][]*rpcStats{r.Inbound, r.Outbound} {
- for _, stats := range set {
- stats.Completed = 0
- for _, b := range stats.Codes {
- stats.Completed += b.Count
+ case ev.IsEndSpan():
+ span, stats := r.getRPCSpan(ctx, ev)
+ if stats != nil {
+ endRPC(ctx, ev, span, stats)
+ }
+ case ev.IsRecord():
+ sent := byteUnits(tag.SentBytes.Get(tagMap))
+ rec := byteUnits(tag.ReceivedBytes.Get(tagMap))
+ if sent != 0 || rec != 0 {
+ if _, stats := r.getRPCSpan(ctx, ev); stats != nil {
+ stats.Sent += sent
+ stats.Received += rec
}
- stats.InProgress = stats.Started - stats.Completed
}
}
return ctx
}
+func endRPC(ctx context.Context, ev event.Event, span *export.Span, stats *rpcStats) {
+ // update the basic counts
+ stats.Completed++
+
+ // get and record the status code
+ if status := getStatusCode(span); status != "" {
+ var b *rpcCodeBucket
+ for c, entry := range stats.Codes {
+ if entry.Key == status {
+ b = stats.Codes[c]
+ break
+ }
+ }
+ if b == nil {
+ b = &rpcCodeBucket{Key: status}
+ stats.Codes = append(stats.Codes, b)
+ sort.Slice(stats.Codes, func(i int, j int) bool {
+ return stats.Codes[i].Key < stats.Codes[j].Key
+ })
+ }
+ b.Count++
+ }
+
+ // calculate latency if this was an rpc span
+ elapsedTime := span.Finish().At.Sub(span.Start().At)
+ latencyMillis := timeUnits(elapsedTime) / timeUnits(time.Millisecond)
+ if stats.Latency.Count == 0 {
+ stats.Latency.Min = latencyMillis
+ stats.Latency.Max = latencyMillis
+ } else {
+ if stats.Latency.Min > latencyMillis {
+ stats.Latency.Min = latencyMillis
+ }
+ if stats.Latency.Max < latencyMillis {
+ stats.Latency.Max = latencyMillis
+ }
+ }
+ stats.Latency.Count++
+ stats.Latency.Sum += latencyMillis
+ for i := range stats.Latency.Values {
+ if stats.Latency.Values[i].Limit > latencyMillis {
+ stats.Latency.Values[i].Count++
+ break
+ }
+ }
+}
+
+func (r *rpcs) getRPCSpan(ctx context.Context, ev event.Event) (*export.Span, *rpcStats) {
+ // get the span
+ span := export.GetSpan(ctx)
+ if span == nil {
+ return nil, nil
+ }
+ // use the span start event look up the correct stats block
+ // we do this because it prevents us matching a sub span
+ startMap := span.Start().Map()
+ return span, r.getRPCStats(startMap)
+}
+
+func (r *rpcs) getRPCStats(tagMap event.TagMap) *rpcStats {
+ method := tag.Method.Get(tagMap)
+ if method == "" {
+ return nil
+ }
+ set := &r.Inbound
+ if tag.RPCDirection.Get(tagMap) != tag.Inbound {
+ set = &r.Outbound
+ }
+ // get the record for this method
+ index := sort.Search(len(*set), func(i int) bool {
+ return (*set)[i].Method >= method
+ })
+
+ if index < len(*set) && (*set)[index].Method == method {
+ return (*set)[index]
+ }
+
+ old := *set
+ *set = make([]*rpcStats, len(old)+1)
+ copy(*set, old[:index])
+ copy((*set)[index+1:], old[index:])
+ stats := &rpcStats{Method: method}
+ stats.Latency.Values = make([]rpcTimeBucket, len(millisecondsDistribution))
+ for i, m := range millisecondsDistribution {
+ stats.Latency.Values[i].Limit = timeUnits(m)
+ }
+ (*set)[index] = stats
+ return stats
+}
+
+func (s *rpcStats) InProgress() int64 { return s.Started - s.Completed }
+func (s *rpcStats) SentMean() byteUnits { return s.Sent / byteUnits(s.Started) }
+func (s *rpcStats) ReceivedMean() byteUnits { return s.Received / byteUnits(s.Started) }
+
+func (h *rpcTimeHistogram) Mean() timeUnits { return h.Sum / timeUnits(h.Count) }
+
+func getStatusCode(span *export.Span) string {
+ for _, ev := range span.Events() {
+ if status := tag.StatusCode.Get(ev.Map()); status != "" {
+ return status
+ }
+ }
+ return ""
+}
+
func (r *rpcs) getData(req *http.Request) interface{} {
return r
}