| // Copyright 2018 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package jsonrpc2 |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "io" |
| "sync" |
| "sync/atomic" |
| |
| "golang.org/x/tools/internal/event" |
| "golang.org/x/tools/internal/event/label" |
| "golang.org/x/tools/internal/event/tag" |
| ) |
| |
| // Binder builds a connection configuration. |
| // This may be used in servers to generate a new configuration per connection. |
| // ConnectionOptions itself implements Binder returning itself unmodified, to |
| // allow for the simple cases where no per connection information is needed. |
| type Binder interface { |
| // Bind returns the ConnectionOptions to use when establishing the passed-in |
| // Connection. |
| // The connection is not ready to use when Bind is called. |
| Bind(context.Context, *Connection) (ConnectionOptions, error) |
| } |
| |
| // A BinderFunc implements the Binder interface for a standalone Bind function. |
| type BinderFunc func(context.Context, *Connection) (ConnectionOptions, error) |
| |
| func (f BinderFunc) Bind(ctx context.Context, c *Connection) (ConnectionOptions, error) { |
| return f(ctx, c) |
| } |
| |
| var _ Binder = BinderFunc(nil) |
| |
| // ConnectionOptions holds the options for new connections. |
| type ConnectionOptions struct { |
| // Framer allows control over the message framing and encoding. |
| // If nil, HeaderFramer will be used. |
| Framer Framer |
| // Preempter allows registration of a pre-queue message handler. |
| // If nil, no messages will be preempted. |
| Preempter Preempter |
| // Handler is used as the queued message handler for inbound messages. |
| // If nil, all responses will be ErrNotHandled. |
| Handler Handler |
| } |
| |
| // Connection manages the jsonrpc2 protocol, connecting responses back to their |
| // calls. |
| // Connection is bidirectional; it does not have a designated server or client |
| // end. |
| type Connection struct { |
| seq int64 // must only be accessed using atomic operations |
| |
| closeOnce sync.Once |
| closer io.Closer |
| |
| writer chan Writer |
| outgoing chan map[ID]chan<- *Response |
| incoming chan map[ID]*incoming |
| async *async |
| } |
| |
| type AsyncCall struct { |
| id ID |
| response chan *Response // the channel a response will be delivered on |
| result chan asyncResult |
| endSpan func() // close the tracing span when all processing for the message is complete |
| } |
| |
| type asyncResult struct { |
| result []byte |
| err error |
| } |
| |
| // incoming is used to track an incoming request as it is being handled |
| type incoming struct { |
| request *Request // the request being processed |
| baseCtx context.Context // a base context for the message processing |
| done func() // a function called when all processing for the message is complete |
| handleCtx context.Context // the context for handling the message, child of baseCtx |
| cancel func() // a function that cancels the handling context |
| } |
| |
| // Bind returns the options unmodified. |
| func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) { |
| return o, nil |
| } |
| |
| // newConnection creates a new connection and runs it. |
| // This is used by the Dial and Serve functions to build the actual connection. |
| func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) { |
| c := &Connection{ |
| closer: rwc, |
| writer: make(chan Writer, 1), |
| outgoing: make(chan map[ID]chan<- *Response, 1), |
| incoming: make(chan map[ID]*incoming, 1), |
| async: newAsync(), |
| } |
| |
| options, err := binder.Bind(ctx, c) |
| if err != nil { |
| return nil, err |
| } |
| if options.Framer == nil { |
| options.Framer = HeaderFramer() |
| } |
| if options.Preempter == nil { |
| options.Preempter = defaultHandler{} |
| } |
| if options.Handler == nil { |
| options.Handler = defaultHandler{} |
| } |
| c.outgoing <- make(map[ID]chan<- *Response) |
| c.incoming <- make(map[ID]*incoming) |
| // the goroutines started here will continue until the underlying stream is closed |
| reader := options.Framer.Reader(rwc) |
| readToQueue := make(chan *incoming) |
| queueToDeliver := make(chan *incoming) |
| go c.readIncoming(ctx, reader, readToQueue) |
| go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver) |
| go c.deliverMessages(ctx, options.Handler, queueToDeliver) |
| |
| // releaseing the writer must be the last thing we do in case any requests |
| // are blocked waiting for the connection to be ready |
| c.writer <- options.Framer.Writer(rwc) |
| return c, nil |
| } |
| |
| // Notify invokes the target method but does not wait for a response. |
| // The params will be marshaled to JSON before sending over the wire, and will |
| // be handed to the method invoked. |
| func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error { |
| notify, err := NewNotification(method, params) |
| if err != nil { |
| return fmt.Errorf("marshaling notify parameters: %v", err) |
| } |
| ctx, done := event.Start(ctx, method, |
| tag.Method.Of(method), |
| tag.RPCDirection.Of(tag.Outbound), |
| ) |
| event.Metric(ctx, tag.Started.Of(1)) |
| err = c.write(ctx, notify) |
| switch { |
| case err != nil: |
| event.Label(ctx, tag.StatusCode.Of("ERROR")) |
| default: |
| event.Label(ctx, tag.StatusCode.Of("OK")) |
| } |
| done() |
| return err |
| } |
| |
| // Call invokes the target method and returns an object that can be used to await the response. |
| // The params will be marshaled to JSON before sending over the wire, and will |
| // be handed to the method invoked. |
| // You do not have to wait for the response, it can just be ignored if not needed. |
| // If sending the call failed, the response will be ready and have the error in it. |
| func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall { |
| result := &AsyncCall{ |
| id: Int64ID(atomic.AddInt64(&c.seq, 1)), |
| result: make(chan asyncResult, 1), |
| } |
| // generate a new request identifier |
| call, err := NewCall(result.id, method, params) |
| if err != nil { |
| //set the result to failed |
| result.result <- asyncResult{err: fmt.Errorf("marshaling call parameters: %w", err)} |
| return result |
| } |
| ctx, endSpan := event.Start(ctx, method, |
| tag.Method.Of(method), |
| tag.RPCDirection.Of(tag.Outbound), |
| tag.RPCID.Of(fmt.Sprintf("%q", result.id)), |
| ) |
| result.endSpan = endSpan |
| event.Metric(ctx, tag.Started.Of(1)) |
| // We have to add ourselves to the pending map before we send, otherwise we |
| // are racing the response. |
| // rchan is buffered in case the response arrives without a listener. |
| result.response = make(chan *Response, 1) |
| outgoing, ok := <-c.outgoing |
| if !ok { |
| // If the call failed due to (say) an I/O error or broken pipe, attribute it |
| // as such. (If the error is nil, then the connection must have been shut |
| // down cleanly.) |
| err := c.async.wait() |
| if err == nil { |
| err = ErrClientClosing |
| } |
| |
| resp, respErr := NewResponse(result.id, nil, err) |
| if respErr != nil { |
| panic(fmt.Errorf("unexpected error from NewResponse: %w", respErr)) |
| } |
| result.response <- resp |
| return result |
| } |
| outgoing[result.id] = result.response |
| c.outgoing <- outgoing |
| // now we are ready to send |
| if err := c.write(ctx, call); err != nil { |
| // sending failed, we will never get a response, so deliver a fake one |
| r, _ := NewResponse(result.id, nil, err) |
| c.incomingResponse(r) |
| } |
| return result |
| } |
| |
| // ID used for this call. |
| // This can be used to cancel the call if needed. |
| func (a *AsyncCall) ID() ID { return a.id } |
| |
| // IsReady can be used to check if the result is already prepared. |
| // This is guaranteed to return true on a result for which Await has already |
| // returned, or a call that failed to send in the first place. |
| func (a *AsyncCall) IsReady() bool { |
| select { |
| case r := <-a.result: |
| a.result <- r |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // Await the results of a Call. |
| // The response will be unmarshaled from JSON into the result. |
| func (a *AsyncCall) Await(ctx context.Context, result interface{}) error { |
| defer a.endSpan() |
| var r asyncResult |
| select { |
| case response := <-a.response: |
| // response just arrived, prepare the result |
| switch { |
| case response.Error != nil: |
| r.err = response.Error |
| event.Label(ctx, tag.StatusCode.Of("ERROR")) |
| default: |
| r.result = response.Result |
| event.Label(ctx, tag.StatusCode.Of("OK")) |
| } |
| case r = <-a.result: |
| // result already available |
| case <-ctx.Done(): |
| event.Label(ctx, tag.StatusCode.Of("CANCELLED")) |
| return ctx.Err() |
| } |
| // refill the box for the next caller |
| a.result <- r |
| // and unpack the result |
| if r.err != nil { |
| return r.err |
| } |
| if result == nil || len(r.result) == 0 { |
| return nil |
| } |
| return json.Unmarshal(r.result, result) |
| } |
| |
| // Respond delivers a response to an incoming Call. |
| // |
| // Respond must be called exactly once for any message for which a handler |
| // returns ErrAsyncResponse. It must not be called for any other message. |
| func (c *Connection) Respond(id ID, result interface{}, rerr error) error { |
| pending := <-c.incoming |
| defer func() { c.incoming <- pending }() |
| entry, found := pending[id] |
| if !found { |
| return nil |
| } |
| delete(pending, id) |
| return c.respond(entry, result, rerr) |
| } |
| |
| // Cancel is used to cancel an inbound message by ID, it does not cancel |
| // outgoing messages. |
| // This is only used inside a message handler that is layering a |
| // cancellation protocol on top of JSON RPC 2. |
| // It will not complain if the ID is not a currently active message, and it will |
| // not cause any messages that have not arrived yet with that ID to be |
| // cancelled. |
| func (c *Connection) Cancel(id ID) { |
| pending := <-c.incoming |
| defer func() { c.incoming <- pending }() |
| if entry, found := pending[id]; found && entry.cancel != nil { |
| entry.cancel() |
| entry.cancel = nil |
| } |
| } |
| |
| // Wait blocks until the connection is fully closed, but does not close it. |
| func (c *Connection) Wait() error { |
| return c.async.wait() |
| } |
| |
| // Close can be used to close the underlying stream, and then wait for the connection to |
| // fully shut down. |
| // This does not cancel in flight requests, but waits for them to gracefully complete. |
| func (c *Connection) Close() error { |
| // close the underlying stream |
| c.closeOnce.Do(func() { |
| if err := c.closer.Close(); err != nil { |
| c.async.setError(err) |
| } |
| }) |
| // and then wait for it to cause the connection to close |
| return c.Wait() |
| } |
| |
| // readIncoming collects inbound messages from the reader and delivers them, either responding |
| // to outgoing calls or feeding requests to the queue. |
| func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) (err error) { |
| defer func() { |
| // Retire any outgoing requests that were still in flight. |
| // With the Reader no longer being processed, they necessarily cannot receive a response. |
| outgoing := <-c.outgoing |
| close(c.outgoing) // Prevent new outgoing requests, which would deadlock. |
| for id, response := range outgoing { |
| response <- &Response{ID: id, Error: err} |
| } |
| |
| close(toQueue) |
| }() |
| |
| for { |
| // get the next message |
| // no lock is needed, this is the only reader |
| msg, n, err := reader.Read(ctx) |
| if err != nil { |
| // The stream failed, we cannot continue |
| if !isClosingError(err) { |
| c.async.setError(err) |
| } |
| return err |
| } |
| switch msg := msg.(type) { |
| case *Request: |
| entry := &incoming{ |
| request: msg, |
| } |
| // add a span to the context for this request |
| labels := append(make([]label.Label, 0, 3), // make space for the id if present |
| tag.Method.Of(msg.Method), |
| tag.RPCDirection.Of(tag.Inbound), |
| ) |
| if msg.IsCall() { |
| labels = append(labels, tag.RPCID.Of(fmt.Sprintf("%q", msg.ID))) |
| } |
| entry.baseCtx, entry.done = event.Start(ctx, msg.Method, labels...) |
| event.Metric(entry.baseCtx, |
| tag.Started.Of(1), |
| tag.ReceivedBytes.Of(n)) |
| // in theory notifications cannot be cancelled, but we build them a cancel context anyway |
| entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx) |
| // if the request is a call, add it to the incoming map so it can be |
| // cancelled by id |
| if msg.IsCall() { |
| pending := <-c.incoming |
| pending[msg.ID] = entry |
| c.incoming <- pending |
| } |
| // send the message to the incoming queue |
| toQueue <- entry |
| case *Response: |
| // If method is not set, this should be a response, in which case we must |
| // have an id to send the response back to the caller. |
| c.incomingResponse(msg) |
| } |
| } |
| } |
| |
| func (c *Connection) incomingResponse(msg *Response) { |
| var response chan<- *Response |
| if outgoing, ok := <-c.outgoing; ok { |
| response = outgoing[msg.ID] |
| delete(outgoing, msg.ID) |
| c.outgoing <- outgoing |
| } |
| if response != nil { |
| response <- msg |
| } |
| } |
| |
| // manageQueue reads incoming requests, attempts to process them with the preempter, or queue them |
| // up for normal handling. |
| func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) { |
| defer close(toDeliver) |
| q := []*incoming{} |
| ok := true |
| for { |
| var nextReq *incoming |
| if len(q) == 0 { |
| // no messages in the queue |
| // if we were closing, then we are done |
| if !ok { |
| return |
| } |
| // not closing, but nothing in the queue, so just block waiting for a read |
| nextReq, ok = <-fromRead |
| } else { |
| // we have a non empty queue, so pick whichever of reading or delivering |
| // that we can make progress on |
| select { |
| case nextReq, ok = <-fromRead: |
| case toDeliver <- q[0]: |
| //TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction? |
| q = q[1:] |
| } |
| } |
| if nextReq != nil { |
| // TODO: should we allow to limit the queue size? |
| var result interface{} |
| rerr := nextReq.handleCtx.Err() |
| if rerr == nil { |
| // only preempt if not already cancelled |
| result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request) |
| } |
| switch { |
| case rerr == ErrNotHandled: |
| // message not handled, add it to the queue for the main handler |
| q = append(q, nextReq) |
| case rerr == ErrAsyncResponse: |
| // message handled but the response will come later |
| default: |
| // anything else means the message is fully handled |
| c.reply(nextReq, result, rerr) |
| } |
| } |
| } |
| } |
| |
| func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) { |
| defer func() { |
| // Close the underlying ReadWriteCloser if not already closed. We're about |
| // to mark the Connection as done, so we'd better actually be done! 😅 |
| // |
| // TODO(bcmills): This is actually a bit premature, since we may have |
| // asynchronous handlers still in flight at this point, but it's at least no |
| // more premature than calling c.async.done at this point (which we were |
| // already doing). This will get a proper fix in https://go.dev/cl/388134. |
| c.closeOnce.Do(func() { |
| if err := c.closer.Close(); err != nil { |
| c.async.setError(err) |
| } |
| }) |
| |
| c.async.done() |
| }() |
| |
| for entry := range fromQueue { |
| // cancel any messages in the queue that we have a pending cancel for |
| var result interface{} |
| rerr := entry.handleCtx.Err() |
| if rerr == nil { |
| // only deliver if not already cancelled |
| result, rerr = handler.Handle(entry.handleCtx, entry.request) |
| } |
| switch { |
| case rerr == ErrNotHandled: |
| // message not handled, report it back to the caller as an error |
| c.reply(entry, nil, fmt.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method)) |
| case rerr == ErrAsyncResponse: |
| // message handled but the response will come later |
| default: |
| c.reply(entry, result, rerr) |
| } |
| } |
| } |
| |
| // reply is used to reply to an incoming request that has just been handled |
| func (c *Connection) reply(entry *incoming, result interface{}, rerr error) { |
| if entry.request.IsCall() { |
| // we have a call finishing, remove it from the incoming map |
| pending := <-c.incoming |
| defer func() { c.incoming <- pending }() |
| delete(pending, entry.request.ID) |
| } |
| if err := c.respond(entry, result, rerr); err != nil { |
| // no way to propagate this error |
| //TODO: should we do more than just log it? |
| event.Error(entry.baseCtx, "jsonrpc2 message delivery failed", err) |
| } |
| } |
| |
| // respond sends a response. |
| // This is the code shared between reply and SendResponse. |
| func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error { |
| var err error |
| if entry.request.IsCall() { |
| // send the response |
| if result == nil && rerr == nil { |
| // call with no response, send an error anyway |
| rerr = fmt.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method) |
| } |
| var response *Response |
| response, err = NewResponse(entry.request.ID, result, rerr) |
| if err == nil { |
| // we write the response with the base context, in case the message was cancelled |
| err = c.write(entry.baseCtx, response) |
| } |
| } else { |
| switch { |
| case rerr != nil: |
| // notification failed |
| err = fmt.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr) |
| rerr = nil |
| case result != nil: |
| //notification produced a response, which is an error |
| err = fmt.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method) |
| default: |
| // normal notification finish |
| } |
| } |
| switch { |
| case rerr != nil || err != nil: |
| event.Label(entry.baseCtx, tag.StatusCode.Of("ERROR")) |
| default: |
| event.Label(entry.baseCtx, tag.StatusCode.Of("OK")) |
| } |
| // and just to be clean, invoke and clear the cancel if needed |
| if entry.cancel != nil { |
| entry.cancel() |
| entry.cancel = nil |
| } |
| // mark the entire request processing as done |
| entry.done() |
| return err |
| } |
| |
| // write is used by all things that write outgoing messages, including replies. |
| // it makes sure that writes are atomic |
| func (c *Connection) write(ctx context.Context, msg Message) error { |
| writer := <-c.writer |
| defer func() { c.writer <- writer }() |
| n, err := writer.Write(ctx, msg) |
| event.Metric(ctx, tag.SentBytes.Of(n)) |
| return err |
| } |