internal/jsonrpc2: add telemetry to the rpc system

This uses the opencensus compatability later to track all the json rpc calls in
and out.

Change-Id: Ib719879a8d6855b6e6479a4f1b01fe823b548110
Reviewed-on: https://go-review.googlesource.com/c/tools/+/183248
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Rebecca Stambler <rstambler@golang.org>
diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go
index 3796b44..db4a73b 100644
--- a/internal/jsonrpc2/jsonrpc2.go
+++ b/internal/jsonrpc2/jsonrpc2.go
@@ -11,10 +11,14 @@
 	"context"
 	"encoding/json"
 	"fmt"
-	"runtime/trace"
 	"sync"
 	"sync/atomic"
 	"time"
+
+	"golang.org/x/tools/internal/lsp/telemetry"
+	"golang.org/x/tools/internal/lsp/telemetry/stats"
+	"golang.org/x/tools/internal/lsp/telemetry/tag"
+	"golang.org/x/tools/internal/lsp/telemetry/trace"
 )
 
 // Conn is a JSON RPC 2 client server connection.
@@ -35,9 +39,10 @@
 }
 
 type queueEntry struct {
-	ctx context.Context
-	c   *Conn
-	r   *Request
+	ctx  context.Context
+	c    *Conn
+	r    *Request
+	size int64
 }
 
 // Handler is an option you can pass to NewConn to handle incoming requests.
@@ -56,6 +61,64 @@
 // instead.
 type Canceler func(context.Context, *Conn, *Request)
 
+type rpcStats struct {
+	server   bool
+	method   string
+	ctx      context.Context
+	span     trace.Span
+	start    time.Time
+	received int64
+	sent     int64
+}
+
+type statsKeyType string
+
+const rpcStatsKey = statsKeyType("rpcStatsKey")
+
+func start(ctx context.Context, server bool, method string, id *ID) (context.Context, *rpcStats) {
+	s := &rpcStats{
+		server: server,
+		method: method,
+		ctx:    ctx,
+		start:  time.Now(),
+	}
+	s.ctx = context.WithValue(s.ctx, rpcStatsKey, s)
+	tags := make([]tag.Mutator, 0, 4)
+	tags = append(tags, tag.Upsert(telemetry.KeyMethod, method))
+	mode := telemetry.Outbound
+	spanKind := trace.SpanKindClient
+	if server {
+		spanKind = trace.SpanKindServer
+		mode = telemetry.Inbound
+	}
+	tags = append(tags, tag.Upsert(telemetry.KeyRPCDirection, mode))
+	if id != nil {
+		tags = append(tags, tag.Upsert(telemetry.KeyRPCID, id.String()))
+	}
+	s.ctx, s.span = trace.StartSpan(ctx, method, trace.WithSpanKind(spanKind))
+	s.ctx, _ = tag.New(s.ctx, tags...)
+	stats.Record(s.ctx, telemetry.Started.M(1))
+	return s.ctx, s
+}
+
+func (s *rpcStats) end(ctx context.Context, err *error) {
+	if err != nil && *err != nil {
+		ctx, _ = tag.New(ctx, tag.Upsert(telemetry.KeyStatus, "ERROR"))
+	} else {
+		ctx, _ = tag.New(ctx, tag.Upsert(telemetry.KeyStatus, "OK"))
+	}
+	elapsedTime := time.Since(s.start)
+	latencyMillis := float64(elapsedTime) / float64(time.Millisecond)
+
+	stats.Record(ctx,
+		telemetry.ReceivedBytes.M(s.received),
+		telemetry.SentBytes.M(s.sent),
+		telemetry.Latency.M(latencyMillis),
+	)
+
+	s.span.End()
+}
+
 // NewErrorf builds a Error struct for the suppied message and code.
 // If args is not empty, message and args will be passed to Sprintf.
 func NewErrorf(code int64, format string, args ...interface{}) *Error {
@@ -103,9 +166,10 @@
 // Notify is called to send a notification request over the connection.
 // It will return as soon as the notification has been sent, as no response is
 // possible.
-func (c *Conn) Notify(ctx context.Context, method string, params interface{}) error {
-	ctx, task := trace.NewTask(ctx, "jsonrpc2.Notify "+method)
-	defer task.End()
+func (c *Conn) Notify(ctx context.Context, method string, params interface{}) (err error) {
+	ctx, rpcStats := start(ctx, false, method, nil)
+	defer rpcStats.end(ctx, &err)
+
 	jsonParams, err := marshalToRaw(params)
 	if err != nil {
 		return fmt.Errorf("marshalling notify parameters: %v", err)
@@ -119,22 +183,23 @@
 		return fmt.Errorf("marshalling notify request: %v", err)
 	}
 	c.Logger(Send, nil, -1, request.Method, request.Params, nil)
-	return c.stream.Write(ctx, data)
+	n, err := c.stream.Write(ctx, data)
+	rpcStats.sent += n
+	return err
 }
 
 // Call sends a request over the connection and then waits for a response.
 // If the response is not an error, it will be decoded into result.
 // result must be of a type you an pass to json.Unmarshal.
-func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) error {
-	ctx, task := trace.NewTask(ctx, "jsonrpc2.Call "+method)
-	defer task.End()
+func (c *Conn) Call(ctx context.Context, method string, params, result interface{}) (err error) {
+	// generate a new request identifier
+	id := ID{Number: atomic.AddInt64(&c.seq, 1)}
+	ctx, rpcStats := start(ctx, false, method, &id)
+	defer rpcStats.end(ctx, &err)
 	jsonParams, err := marshalToRaw(params)
 	if err != nil {
 		return fmt.Errorf("marshalling call parameters: %v", err)
 	}
-	// generate a new request identifier
-	id := ID{Number: atomic.AddInt64(&c.seq, 1)}
-	trace.Logf(ctx, "jsonrpc2", "request id %v", id)
 	request := &Request{
 		ID:     &id,
 		Method: method,
@@ -160,7 +225,9 @@
 	// now we are ready to send
 	before := time.Now()
 	c.Logger(Send, request.ID, -1, request.Method, request.Params, nil)
-	if err := c.stream.Write(ctx, data); err != nil {
+	n, err := c.stream.Write(ctx, data)
+	rpcStats.sent += n
+	if err != nil {
 		// sending failed, we will never get a response, so don't leave it pending
 		return err
 	}
@@ -192,8 +259,9 @@
 // You must call this exactly once for any given request.
 // If err is set then result will be ignored.
 func (c *Conn) Reply(ctx context.Context, req *Request, result interface{}, err error) error {
-	ctx, task := trace.NewTask(ctx, "jsonrpc2.Reply "+req.Method)
-	defer task.End()
+	ctx, st := trace.StartSpan(ctx, req.Method+":reply", trace.WithSpanKind(trace.SpanKindClient))
+	defer st.End()
+
 	if req.IsNotify() {
 		return fmt.Errorf("reply not invoked with a valid call")
 	}
@@ -228,7 +296,17 @@
 		return err
 	}
 	c.Logger(Send, response.ID, elapsed, req.Method, response.Result, response.Error)
-	if err = c.stream.Write(ctx, data); err != nil {
+	n, err := c.stream.Write(ctx, data)
+
+	v := ctx.Value(rpcStatsKey)
+	if v != nil {
+		s := v.(*rpcStats)
+		s.sent += n
+	} else {
+		//panic("no stats available in reply")
+	}
+
+	if err != nil {
 		// TODO(iancottrell): if a stream write fails, we really need to shut down
 		// the whole stream
 		return err
@@ -253,8 +331,8 @@
 	Error      *Error           `json:"error,omitempty"`
 }
 
-func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request) bool {
-	e := queueEntry{ctx: ctx, c: c, r: request}
+func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request, size int64) bool {
+	e := queueEntry{ctx: ctx, c: c, r: request, size: size}
 	if !c.RejectIfOverloaded {
 		q <- e
 		return true
@@ -281,17 +359,15 @@
 			if e.ctx.Err() != nil {
 				continue
 			}
-			ctx, task := trace.NewTask(e.ctx, "jsonrpc2.Handle "+e.r.Method)
-			if !e.r.IsNotify() {
-				trace.Logf(ctx, "jsonrpc2", "request id %v", e.r.ID)
-			}
+			ctx, rpcStats := start(ctx, true, e.r.Method, e.r.ID)
+			rpcStats.received += e.size
 			c.Handler(ctx, e.c, e.r)
-			task.End()
+			rpcStats.end(ctx, nil)
 		}
 	}()
 	for {
 		// get the data for a message
-		data, err := c.stream.Read(ctx)
+		data, n, err := c.stream.Read(ctx)
 		if err != nil {
 			// the stream failed, we cannot continue
 			return err
@@ -316,7 +392,7 @@
 			if request.IsNotify() {
 				c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
 				// we have a Notify, add to the processor queue
-				c.deliver(ctx, q, request)
+				c.deliver(ctx, q, request, n)
 				//TODO: log when we drop a message?
 			} else {
 				// we have a Call, add to the processor queue
@@ -329,7 +405,7 @@
 				}
 				c.handlingMu.Unlock()
 				c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
-				if !c.deliver(reqCtx, q, request) {
+				if !c.deliver(reqCtx, q, request, n) {
 					// queue is full, reject the message by directly replying
 					c.Reply(ctx, request, nil, NewErrorf(CodeServerOverloaded, "no room in queue"))
 				}
diff --git a/internal/jsonrpc2/stream.go b/internal/jsonrpc2/stream.go
index fe28c55..f850c27 100644
--- a/internal/jsonrpc2/stream.go
+++ b/internal/jsonrpc2/stream.go
@@ -22,10 +22,10 @@
 type Stream interface {
 	// Read gets the next message from the stream.
 	// It is never called concurrently.
-	Read(context.Context) ([]byte, error)
+	Read(context.Context) ([]byte, int64, error)
 	// Write sends a message to the stream.
 	// It must be safe for concurrent use.
-	Write(context.Context, []byte) error
+	Write(context.Context, []byte) (int64, error)
 }
 
 // NewStream returns a Stream built on top of an io.Reader and io.Writer
@@ -44,29 +44,29 @@
 	out   io.Writer
 }
 
-func (s *plainStream) Read(ctx context.Context) ([]byte, error) {
+func (s *plainStream) Read(ctx context.Context) ([]byte, int64, error) {
 	select {
 	case <-ctx.Done():
-		return nil, ctx.Err()
+		return nil, 0, ctx.Err()
 	default:
 	}
 	var raw json.RawMessage
 	if err := s.in.Decode(&raw); err != nil {
-		return nil, err
+		return nil, 0, err
 	}
-	return raw, nil
+	return raw, int64(len(raw)), nil
 }
 
-func (s *plainStream) Write(ctx context.Context, data []byte) error {
+func (s *plainStream) Write(ctx context.Context, data []byte) (int64, error) {
 	select {
 	case <-ctx.Done():
-		return ctx.Err()
+		return 0, ctx.Err()
 	default:
 	}
 	s.outMu.Lock()
-	_, err := s.out.Write(data)
+	n, err := s.out.Write(data)
 	s.outMu.Unlock()
-	return err
+	return int64(n), err
 }
 
 // NewHeaderStream returns a Stream built on top of an io.Reader and io.Writer
@@ -85,18 +85,19 @@
 	out   io.Writer
 }
 
-func (s *headerStream) Read(ctx context.Context) ([]byte, error) {
+func (s *headerStream) Read(ctx context.Context) ([]byte, int64, error) {
 	select {
 	case <-ctx.Done():
-		return nil, ctx.Err()
+		return nil, 0, ctx.Err()
 	default:
 	}
-	var length int64
+	var total, length int64
 	// read the header, stop on the first empty line
 	for {
 		line, err := s.in.ReadString('\n')
+		total += int64(len(line))
 		if err != nil {
-			return nil, fmt.Errorf("failed reading header line %q", err)
+			return nil, total, fmt.Errorf("failed reading header line %q", err)
 		}
 		line = strings.TrimSpace(line)
 		// check we have a header line
@@ -105,42 +106,45 @@
 		}
 		colon := strings.IndexRune(line, ':')
 		if colon < 0 {
-			return nil, fmt.Errorf("invalid header line %q", line)
+			return nil, total, fmt.Errorf("invalid header line %q", line)
 		}
 		name, value := line[:colon], strings.TrimSpace(line[colon+1:])
 		switch name {
 		case "Content-Length":
 			if length, err = strconv.ParseInt(value, 10, 32); err != nil {
-				return nil, fmt.Errorf("failed parsing Content-Length: %v", value)
+				return nil, total, fmt.Errorf("failed parsing Content-Length: %v", value)
 			}
 			if length <= 0 {
-				return nil, fmt.Errorf("invalid Content-Length: %v", length)
+				return nil, total, fmt.Errorf("invalid Content-Length: %v", length)
 			}
 		default:
 			// ignoring unknown headers
 		}
 	}
 	if length == 0 {
-		return nil, fmt.Errorf("missing Content-Length header")
+		return nil, total, fmt.Errorf("missing Content-Length header")
 	}
 	data := make([]byte, length)
 	if _, err := io.ReadFull(s.in, data); err != nil {
-		return nil, err
+		return nil, total, err
 	}
-	return data, nil
+	total += length
+	return data, total, nil
 }
 
-func (s *headerStream) Write(ctx context.Context, data []byte) error {
+func (s *headerStream) Write(ctx context.Context, data []byte) (int64, error) {
 	select {
 	case <-ctx.Done():
-		return ctx.Err()
+		return 0, ctx.Err()
 	default:
 	}
 	s.outMu.Lock()
-	_, err := fmt.Fprintf(s.out, "Content-Length: %v\r\n\r\n", len(data))
+	defer s.outMu.Unlock()
+	n, err := fmt.Fprintf(s.out, "Content-Length: %v\r\n\r\n", len(data))
+	total := int64(n)
 	if err == nil {
-		_, err = s.out.Write(data)
+		n, err = s.out.Write(data)
+		total += int64(n)
 	}
-	s.outMu.Unlock()
-	return err
+	return total, err
 }