| // Copyright 2018 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package jsonrpc2 |
| |
| import ( |
| "context" |
| "encoding/json" |
| "fmt" |
| "io" |
| "sync/atomic" |
| |
| "golang.org/x/exp/event" |
| errors "golang.org/x/xerrors" |
| ) |
| |
| // Binder builds a connection configuration. |
| // This may be used in servers to generate a new configuration per connection. |
| // ConnectionOptions itself implements Binder returning itself unmodified, to |
| // allow for the simple cases where no per connection information is needed. |
| type Binder interface { |
| // Bind is invoked when creating a new connection. |
| // The connection is not ready to use when Bind is called. |
| Bind(context.Context, *Connection) (ConnectionOptions, error) |
| } |
| |
| // ConnectionOptions holds the options for new connections. |
| type ConnectionOptions struct { |
| // Framer allows control over the message framing and encoding. |
| // If nil, HeaderFramer will be used. |
| Framer Framer |
| // Preempter allows registration of a pre-queue message handler. |
| // If nil, no messages will be preempted. |
| Preempter Preempter |
| // Handler is used as the queued message handler for inbound messages. |
| // If nil, all responses will be ErrNotHandled. |
| Handler Handler |
| } |
| |
| // Connection manages the jsonrpc2 protocol, connecting responses back to their |
| // calls. |
| // Connection is bidirectional; it does not have a designated server or client |
| // end. |
| type Connection struct { |
| seq int64 // must only be accessed using atomic operations |
| closer io.Closer |
| writerBox chan Writer |
| outgoingBox chan map[ID]chan<- *Response |
| incomingBox chan map[ID]*incoming |
| async async |
| } |
| |
| type AsyncCall struct { |
| id ID |
| response chan *Response // the channel a response will be delivered on |
| resultBox chan asyncResult |
| endBuilder event.Builder // close the tracing span when all processing for the message is complete |
| } |
| |
| type asyncResult struct { |
| result []byte |
| err error |
| } |
| |
| // incoming is used to track an incoming request as it is being handled |
| type incoming struct { |
| request *Request // the request being processed |
| baseCtx context.Context // a base context for the message processing |
| endBuilder event.Builder // builder's End method called when all processing for the message is complete |
| handleCtx context.Context // the context for handling the message, child of baseCtx |
| cancel func() // a function that cancels the handling context |
| } |
| |
| // Bind returns the options unmodified. |
| func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) { |
| return o, nil |
| } |
| |
| // newConnection creates a new connection and runs it. |
| // This is used by the Dial and Serve functions to build the actual connection. |
| func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) { |
| c := &Connection{ |
| closer: rwc, |
| writerBox: make(chan Writer, 1), |
| outgoingBox: make(chan map[ID]chan<- *Response, 1), |
| incomingBox: make(chan map[ID]*incoming, 1), |
| } |
| |
| options, err := binder.Bind(ctx, c) |
| if err != nil { |
| return nil, err |
| } |
| if options.Framer == nil { |
| options.Framer = HeaderFramer() |
| } |
| if options.Preempter == nil { |
| options.Preempter = defaultHandler{} |
| } |
| if options.Handler == nil { |
| options.Handler = defaultHandler{} |
| } |
| c.outgoingBox <- make(map[ID]chan<- *Response) |
| c.incomingBox <- make(map[ID]*incoming) |
| c.async.init() |
| // the goroutines started here will continue until the underlying stream is closed |
| reader := options.Framer.Reader(rwc) |
| readToQueue := make(chan *incoming) |
| queueToDeliver := make(chan *incoming) |
| go c.readIncoming(ctx, reader, readToQueue) |
| go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver) |
| go c.deliverMessages(ctx, options.Handler, queueToDeliver) |
| // releaseing the writer must be the last thing we do in case any requests |
| // are blocked waiting for the connection to be ready |
| c.writerBox <- options.Framer.Writer(rwc) |
| return c, nil |
| } |
| |
| // Notify invokes the target method but does not wait for a response. |
| // The params will be marshaled to JSON before sending over the wire, and will |
| // be handed to the method invoked. |
| func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error { |
| notify, err := NewNotification(method, params) |
| if err != nil { |
| return errors.Errorf("marshaling notify parameters: %v", err) |
| } |
| b := event.To(ctx).With(Method.Of(method)) |
| b.Clone().Metric(Started.Record(1)) |
| ctx, endBuilder := b.With(RPCDirection.Of(Outbound)).Start(method) |
| err = c.write(ctx, notify) |
| event.To(ctx).With(Error.Of(err)).Metric(Finished.Record(1)) |
| endBuilder.End() |
| return err |
| } |
| |
| // Call invokes the target method and returns an object that can be used to await the response. |
| // The params will be marshaled to JSON before sending over the wire, and will |
| // be handed to the method invoked. |
| // You do not have to wait for the response, it can just be ignored if not needed. |
| // If sending the call failed, the response will be ready and have the error in it. |
| func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall { |
| result := &AsyncCall{ |
| id: Int64ID(atomic.AddInt64(&c.seq, 1)), |
| resultBox: make(chan asyncResult, 1), |
| } |
| // generate a new request identifier |
| call, err := NewCall(result.id, method, params) |
| if err != nil { |
| //set the result to failed |
| result.resultBox <- asyncResult{err: errors.Errorf("marshaling call parameters: %w", err)} |
| return result |
| } |
| b := event.To(ctx).With(Method.Of(method)) |
| b.Clone().Metric(Started.Record(1)) |
| ctx, endBuilder := b.Clone().With(RPCDirection.Of(Outbound)).With(RPCID.Of(fmt.Sprintf("%q", result.id))).Start(method) |
| result.endBuilder = endBuilder |
| // We have to add ourselves to the pending map before we send, otherwise we |
| // are racing the response. |
| // rchan is buffered in case the response arrives without a listener. |
| result.response = make(chan *Response, 1) |
| pending := <-c.outgoingBox |
| pending[result.id] = result.response |
| c.outgoingBox <- pending |
| // now we are ready to send |
| if err := c.write(ctx, call); err != nil { |
| // sending failed, we will never get a response, so deliver a fake one |
| r, _ := NewResponse(result.id, nil, err) |
| c.incomingResponse(r) |
| } |
| return result |
| } |
| |
| // ID used for this call. |
| // This can be used to cancel the call if needed. |
| func (a *AsyncCall) ID() ID { return a.id } |
| |
| // IsReady can be used to check if the result is already prepared. |
| // This is guaranteed to return true on a result for which Await has already |
| // returned, or a call that failed to send in the first place. |
| func (a *AsyncCall) IsReady() bool { |
| select { |
| case r := <-a.resultBox: |
| a.resultBox <- r |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // Await the results of a Call. |
| // The response will be unmarshaled from JSON into the result. |
| func (a *AsyncCall) Await(ctx context.Context, result interface{}) error { |
| status := "NONE" |
| defer a.endBuilder.With(StatusCode.Of(status)).End() |
| var r asyncResult |
| select { |
| case response := <-a.response: |
| // response just arrived, prepare the result |
| switch { |
| case response.Error != nil: |
| r.err = response.Error |
| status = "ERROR" |
| default: |
| r.result = response.Result |
| status = "OK" |
| } |
| case r = <-a.resultBox: |
| // result already available |
| case <-ctx.Done(): |
| status = "CANCELLED" |
| return ctx.Err() |
| } |
| // refill the box for the next caller |
| a.resultBox <- r |
| // and unpack the result |
| if r.err != nil { |
| return r.err |
| } |
| if result == nil || len(r.result) == 0 { |
| return nil |
| } |
| return json.Unmarshal(r.result, result) |
| } |
| |
| // Respond deliverers a response to an incoming Call. |
| // It is an error to not call this exactly once for any message for which a |
| // handler has previously returned ErrAsyncResponse. It is also an error to |
| // call this for any other message. |
| func (c *Connection) Respond(id ID, result interface{}, rerr error) error { |
| pending := <-c.incomingBox |
| defer func() { c.incomingBox <- pending }() |
| entry, found := pending[id] |
| if !found { |
| return nil |
| } |
| delete(pending, id) |
| return c.respond(entry, result, rerr) |
| } |
| |
| // Cancel is used to cancel an inbound message by ID, it does not cancel |
| // outgoing messages. |
| // This is only used inside a message handler that is layering a |
| // cancellation protocol on top of JSON RPC 2. |
| // It will not complain if the ID is not a currently active message, and it will |
| // not cause any messages that have not arrived yet with that ID to be |
| // cancelled. |
| func (c *Connection) Cancel(id ID) { |
| pending := <-c.incomingBox |
| defer func() { c.incomingBox <- pending }() |
| if entry, found := pending[id]; found && entry.cancel != nil { |
| entry.cancel() |
| entry.cancel = nil |
| } |
| } |
| |
| // Wait blocks until the connection is fully closed, but does not close it. |
| func (c *Connection) Wait() error { |
| return c.async.wait() |
| } |
| |
| // Close can be used to close the underlying stream, and then wait for the connection to |
| // fully shut down. |
| // This does not cancel in flight requests, but waits for them to gracefully complete. |
| func (c *Connection) Close() error { |
| // close the underlying stream |
| if err := c.closer.Close(); err != nil && !isClosingError(err) { |
| return err |
| } |
| // and then wait for it to cause the connection to close |
| if err := c.Wait(); err != nil && !isClosingError(err) { |
| return err |
| } |
| return nil |
| } |
| |
| // readIncoming collects inbound messages from the reader and delivers them, either responding |
| // to outgoing calls or feeding requests to the queue. |
| func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) { |
| defer close(toQueue) |
| for { |
| // get the next message |
| // no lock is needed, this is the only reader |
| msg, n, err := reader.Read(ctx) |
| if err != nil { |
| // The stream failed, we cannot continue |
| c.async.setError(err) |
| return |
| } |
| switch msg := msg.(type) { |
| case *Request: |
| entry := &incoming{ |
| request: msg, |
| } |
| // add a span to the context for this request |
| b := event.To(ctx).With(Method.Of(msg.Method)).With(RPCDirection.Of(Inbound)) |
| if msg.IsCall() { |
| b = b.With(RPCID.Of(fmt.Sprintf("%q", msg.ID))) |
| } |
| entry.baseCtx, entry.endBuilder = b.Start(msg.Method) |
| b = event.To(entry.baseCtx).With(Method.Of(msg.Method)) |
| b.Clone().Metric(Started.Record(1)) |
| b.Metric(ReceivedBytes.Record(n)) |
| // in theory notifications cannot be cancelled, but we build them a cancel context anyway |
| entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx) |
| // if the request is a call, add it to the incoming map so it can be |
| // cancelled by id |
| if msg.IsCall() { |
| pending := <-c.incomingBox |
| c.incomingBox <- pending |
| pending[msg.ID] = entry |
| } |
| // send the message to the incoming queue |
| toQueue <- entry |
| case *Response: |
| // If method is not set, this should be a response, in which case we must |
| // have an id to send the response back to the caller. |
| c.incomingResponse(msg) |
| } |
| } |
| } |
| |
| func (c *Connection) incomingResponse(msg *Response) { |
| pending := <-c.outgoingBox |
| response, ok := pending[msg.ID] |
| if ok { |
| delete(pending, msg.ID) |
| } |
| c.outgoingBox <- pending |
| if response != nil { |
| response <- msg |
| } |
| } |
| |
| // manageQueue reads incoming requests, attempts to proccess them with the preempter, or queue them |
| // up for normal handling. |
| func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) { |
| defer close(toDeliver) |
| q := []*incoming{} |
| ok := true |
| for { |
| var nextReq *incoming |
| if len(q) == 0 { |
| // no messages in the queue |
| // if we were closing, then we are done |
| if !ok { |
| return |
| } |
| // not closing, but nothing in the queue, so just block waiting for a read |
| nextReq, ok = <-fromRead |
| } else { |
| // we have a non empty queue, so pick whichever of reading or delivering |
| // that we can make progress on |
| select { |
| case nextReq, ok = <-fromRead: |
| case toDeliver <- q[0]: |
| //TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction? |
| q = q[1:] |
| } |
| } |
| if nextReq != nil { |
| // TODO: should we allow to limit the queue size? |
| var result interface{} |
| rerr := nextReq.handleCtx.Err() |
| if rerr == nil { |
| // only preempt if not already cancelled |
| result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request) |
| } |
| switch { |
| case rerr == ErrNotHandled: |
| // message not handled, add it to the queue for the main handler |
| q = append(q, nextReq) |
| case rerr == ErrAsyncResponse: |
| // message handled but the response will come later |
| default: |
| // anything else means the message is fully handled |
| c.reply(nextReq, result, rerr) |
| } |
| } |
| } |
| } |
| |
| func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) { |
| defer c.async.done() |
| for entry := range fromQueue { |
| // cancel any messages in the queue that we have a pending cancel for |
| var result interface{} |
| rerr := entry.handleCtx.Err() |
| if rerr == nil { |
| // only deliver if not already cancelled |
| result, rerr = handler.Handle(entry.handleCtx, entry.request) |
| } |
| switch { |
| case rerr == ErrNotHandled: |
| // message not handled, report it back to the caller as an error |
| c.reply(entry, nil, errors.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method)) |
| case rerr == ErrAsyncResponse: |
| // message handled but the response will come later |
| default: |
| c.reply(entry, result, rerr) |
| } |
| } |
| } |
| |
| // reply is used to reply to an incoming request that has just been handled |
| func (c *Connection) reply(entry *incoming, result interface{}, rerr error) { |
| if entry.request.IsCall() { |
| // we have a call finishing, remove it from the incoming map |
| pending := <-c.incomingBox |
| defer func() { c.incomingBox <- pending }() |
| delete(pending, entry.request.ID) |
| } |
| if err := c.respond(entry, result, rerr); err != nil { |
| // no way to propagate this error |
| //TODO: should we do more than just log it? |
| event.To(entry.baseCtx).With(Error.Of(err)).Log("jsonrpc2 message delivery failed") |
| } |
| } |
| |
| // respond sends a response. |
| // This is the code shared between reply and SendResponse. |
| func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error { |
| var err error |
| if entry.request.IsCall() { |
| // send the response |
| if result == nil && rerr == nil { |
| // call with no response, send an error anyway |
| rerr = errors.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method) |
| } |
| var response *Response |
| response, err = NewResponse(entry.request.ID, result, rerr) |
| if err == nil { |
| // we write the response with the base context, in case the message was cancelled |
| err = c.write(entry.baseCtx, response) |
| } |
| } else { |
| switch { |
| case rerr != nil: |
| // notification failed |
| err = errors.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr) |
| rerr = nil |
| case result != nil: |
| //notification produced a response, which is an error |
| err = errors.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method) |
| default: |
| // normal notification finish |
| } |
| } |
| var status string |
| switch { |
| case rerr != nil || err != nil: |
| status = "ERROR" |
| default: |
| status = "OK" |
| } |
| // and just to be clean, invoke and clear the cancel if needed |
| if entry.cancel != nil { |
| entry.cancel() |
| entry.cancel = nil |
| } |
| // mark the entire request processing as done |
| entry.endBuilder.With(StatusCode.Of(status)).End() |
| return err |
| } |
| |
| // write is used by all things that write outgoing messages, including replies. |
| // it makes sure that writes are atomic |
| func (c *Connection) write(ctx context.Context, msg Message) error { |
| writer := <-c.writerBox |
| defer func() { c.writerBox <- writer }() |
| n, err := writer.Write(ctx, msg) |
| // TODO: get a method label in here somehow. |
| event.To(ctx).Metric(SentBytes.Record(n)) |
| return err |
| } |