| // Copyright 2018 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package jsonrpc2 |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "sync" |
| "sync/atomic" |
| |
| "golang.org/x/tools/internal/event" |
| "golang.org/x/tools/internal/event/label" |
| ) |
| |
| // Conn is the common interface to jsonrpc clients and servers. |
| // Conn is bidirectional; it does not have a designated server or client end. |
| // It manages the jsonrpc2 protocol, connecting responses back to their calls. |
| type Conn interface { |
| // Call invokes the target method and waits for a response. |
| // The params will be marshaled to JSON before sending over the wire, and will |
| // be handed to the method invoked. |
| // The response will be unmarshaled from JSON into the result. |
| // The id returned will be unique from this connection, and can be used for |
| // logging or tracking. |
| Call(ctx context.Context, method string, params, result interface{}) (ID, error) |
| |
| // Notify invokes the target method but does not wait for a response. |
| // The params will be marshaled to JSON before sending over the wire, and will |
| // be handed to the method invoked. |
| Notify(ctx context.Context, method string, params interface{}) error |
| |
| // Go starts a goroutine to handle the connection. |
| // It must be called exactly once for each Conn. |
| // It returns immediately. |
| // You must block on Done() to wait for the connection to shut down. |
| // This is a temporary measure, this should be started automatically in the |
| // future. |
| Go(ctx context.Context, handler Handler) |
| |
| // Close closes the connection and it's underlying stream. |
| // It does not wait for the close to complete, use the Done() channel for |
| // that. |
| Close() error |
| |
| // Done returns a channel that will be closed when the processing goroutine |
| // has terminated, which will happen if Close() is called or an underlying |
| // stream is closed. |
| Done() <-chan struct{} |
| |
| // Err returns an error if there was one from within the processing goroutine. |
| // If err returns non nil, the connection will be already closed or closing. |
| Err() error |
| } |
| |
| type conn struct { |
| seq int64 // must only be accessed using atomic operations |
| writeMu sync.Mutex // protects writes to the stream |
| stream Stream |
| pendingMu sync.Mutex // protects the pending map |
| pending map[ID]chan *Response |
| |
| done chan struct{} |
| err atomic.Value |
| } |
| |
| // NewConn creates a new connection object around the supplied stream. |
| func NewConn(s Stream) Conn { |
| conn := &conn{ |
| stream: s, |
| pending: make(map[ID]chan *Response), |
| done: make(chan struct{}), |
| } |
| return conn |
| } |
| |
| func (c *conn) Notify(ctx context.Context, method string, params interface{}) (err error) { |
| notify, err := NewNotification(method, params) |
| if err != nil { |
| return fmt.Errorf("marshaling notify parameters: %v", err) |
| } |
| ctx, done := event.Start(ctx, method, |
| Method.Of(method), |
| RPCDirection.Of(Outbound), |
| ) |
| defer func() { |
| recordStatus(ctx, err) |
| done() |
| }() |
| |
| event.Metric(ctx, Started.Of(1)) |
| n, err := c.write(ctx, notify) |
| event.Metric(ctx, SentBytes.Of(n)) |
| return err |
| } |
| |
| func (c *conn) Call(ctx context.Context, method string, params, result interface{}) (_ ID, err error) { |
| // generate a new request identifier |
| id := ID{number: atomic.AddInt64(&c.seq, 1)} |
| call, err := NewCall(id, method, params) |
| if err != nil { |
| return id, fmt.Errorf("marshaling call parameters: %v", err) |
| } |
| ctx, done := event.Start(ctx, method, |
| Method.Of(method), |
| RPCDirection.Of(Outbound), |
| RPCID.Of(fmt.Sprintf("%q", id)), |
| ) |
| defer func() { |
| recordStatus(ctx, err) |
| done() |
| }() |
| event.Metric(ctx, Started.Of(1)) |
| // We have to add ourselves to the pending map before we send, otherwise we |
| // are racing the response. Also add a buffer to rchan, so that if we get a |
| // wire response between the time this call is cancelled and id is deleted |
| // from c.pending, the send to rchan will not block. |
| rchan := make(chan *Response, 1) |
| c.pendingMu.Lock() |
| c.pending[id] = rchan |
| c.pendingMu.Unlock() |
| defer func() { |
| c.pendingMu.Lock() |
| delete(c.pending, id) |
| c.pendingMu.Unlock() |
| }() |
| // now we are ready to send |
| n, err := c.write(ctx, call) |
| event.Metric(ctx, SentBytes.Of(n)) |
| if err != nil { |
| // sending failed, we will never get a response, so don't leave it pending |
| return id, err |
| } |
| // now wait for the response |
| select { |
| case response := <-rchan: |
| // is it an error response? |
| if response.err != nil { |
| return id, response.err |
| } |
| if result == nil || len(response.result) == 0 { |
| return id, nil |
| } |
| if err := json.Unmarshal(response.result, result); err != nil { |
| return id, fmt.Errorf("unmarshaling result: %v", err) |
| } |
| return id, nil |
| case <-ctx.Done(): |
| return id, ctx.Err() |
| } |
| } |
| |
| func (c *conn) replier(req Request, spanDone func()) Replier { |
| return func(ctx context.Context, result interface{}, err error) error { |
| defer func() { |
| recordStatus(ctx, err) |
| spanDone() |
| }() |
| call, ok := req.(*Call) |
| if !ok { |
| // request was a notify, no need to respond |
| return nil |
| } |
| response, err := NewResponse(call.id, result, err) |
| if err != nil { |
| return err |
| } |
| n, err := c.write(ctx, response) |
| event.Metric(ctx, SentBytes.Of(n)) |
| if err != nil { |
| // TODO(iancottrell): if a stream write fails, we really need to shut down |
| // the whole stream |
| return err |
| } |
| return nil |
| } |
| } |
| |
| func (c *conn) write(ctx context.Context, msg Message) (int64, error) { |
| c.writeMu.Lock() |
| defer c.writeMu.Unlock() |
| return c.stream.Write(ctx, msg) |
| } |
| |
| func (c *conn) Go(ctx context.Context, handler Handler) { |
| go c.run(ctx, handler) |
| } |
| |
| func (c *conn) run(ctx context.Context, handler Handler) { |
| defer close(c.done) |
| for { |
| // get the next message |
| msg, n, err := c.stream.Read(ctx) |
| if err != nil { |
| // The stream failed, we cannot continue. |
| c.fail(err) |
| return |
| } |
| switch msg := msg.(type) { |
| case Request: |
| labels := []label.Label{ |
| Method.Of(msg.Method()), |
| RPCDirection.Of(Inbound), |
| {}, // reserved for ID if present |
| } |
| if call, ok := msg.(*Call); ok { |
| labels[len(labels)-1] = RPCID.Of(fmt.Sprintf("%q", call.ID())) |
| } else { |
| labels = labels[:len(labels)-1] |
| } |
| reqCtx, spanDone := event.Start(ctx, msg.Method(), labels...) |
| event.Metric(reqCtx, |
| Started.Of(1), |
| ReceivedBytes.Of(n)) |
| if err := handler(reqCtx, c.replier(msg, spanDone), msg); err != nil { |
| // delivery failed, not much we can do |
| event.Error(reqCtx, "jsonrpc2 message delivery failed", err) |
| } |
| case *Response: |
| // If method is not set, this should be a response, in which case we must |
| // have an id to send the response back to the caller. |
| c.pendingMu.Lock() |
| rchan, ok := c.pending[msg.id] |
| c.pendingMu.Unlock() |
| if ok { |
| rchan <- msg |
| } |
| } |
| } |
| } |
| |
| func (c *conn) Close() error { |
| return c.stream.Close() |
| } |
| |
| func (c *conn) Done() <-chan struct{} { |
| return c.done |
| } |
| |
| func (c *conn) Err() error { |
| if err := c.err.Load(); err != nil { |
| return err.(error) |
| } |
| return nil |
| } |
| |
| // fail sets a failure condition on the stream and closes it. |
| func (c *conn) fail(err error) { |
| c.err.Store(err) |
| c.stream.Close() |
| } |
| |
| func recordStatus(ctx context.Context, err error) { |
| if err != nil { |
| event.Label(ctx, StatusCode.Of("ERROR")) |
| } else { |
| event.Label(ctx, StatusCode.Of("OK")) |
| } |
| } |