|  | // Copyright 2018 The Go Authors. All rights reserved. | 
|  | // Use of this source code is governed by a BSD-style | 
|  | // license that can be found in the LICENSE file. | 
|  |  | 
|  | package jsonrpc2 | 
|  |  | 
|  | import ( | 
|  | "context" | 
|  | "encoding/json" | 
|  | "fmt" | 
|  | "io" | 
|  | "sync/atomic" | 
|  |  | 
|  | "golang.org/x/exp/event" | 
|  | errors "golang.org/x/xerrors" | 
|  | ) | 
|  |  | 
|  | // Binder builds a connection configuration. | 
|  | // This may be used in servers to generate a new configuration per connection. | 
|  | // ConnectionOptions itself implements Binder returning itself unmodified, to | 
|  | // allow for the simple cases where no per connection information is needed. | 
|  | type Binder interface { | 
|  | // Bind is invoked when creating a new connection. | 
|  | // The connection is not ready to use when Bind is called. | 
|  | Bind(context.Context, *Connection) (ConnectionOptions, error) | 
|  | } | 
|  |  | 
|  | // ConnectionOptions holds the options for new connections. | 
|  | type ConnectionOptions struct { | 
|  | // Framer allows control over the message framing and encoding. | 
|  | // If nil, HeaderFramer will be used. | 
|  | Framer Framer | 
|  | // Preempter allows registration of a pre-queue message handler. | 
|  | // If nil, no messages will be preempted. | 
|  | Preempter Preempter | 
|  | // Handler is used as the queued message handler for inbound messages. | 
|  | // If nil, all responses will be ErrNotHandled. | 
|  | Handler Handler | 
|  | } | 
|  |  | 
|  | // Connection manages the jsonrpc2 protocol, connecting responses back to their | 
|  | // calls. | 
|  | // Connection is bidirectional; it does not have a designated server or client | 
|  | // end. | 
|  | type Connection struct { | 
|  | seq         int64 // must only be accessed using atomic operations | 
|  | closer      io.Closer | 
|  | writerBox   chan Writer | 
|  | outgoingBox chan map[ID]chan<- *Response | 
|  | incomingBox chan map[ID]*incoming | 
|  | async       async | 
|  | } | 
|  |  | 
|  | type AsyncCall struct { | 
|  | id        ID | 
|  | response  chan *Response // the channel a response will be delivered on | 
|  | resultBox chan asyncResult | 
|  | ctx       context.Context | 
|  | } | 
|  |  | 
|  | type asyncResult struct { | 
|  | result []byte | 
|  | err    error | 
|  | } | 
|  |  | 
|  | // incoming is used to track an incoming request as it is being handled | 
|  | type incoming struct { | 
|  | request   *Request        // the request being processed | 
|  | baseCtx   context.Context // a base context for the message processing | 
|  | handleCtx context.Context // the context for handling the message, child of baseCtx | 
|  | cancel    func()          // a function that cancels the handling context | 
|  | } | 
|  |  | 
|  | // Bind returns the options unmodified. | 
|  | func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) { | 
|  | return o, nil | 
|  | } | 
|  |  | 
|  | // newConnection creates a new connection and runs it. | 
|  | // This is used by the Dial and Serve functions to build the actual connection. | 
|  | func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) { | 
|  | c := &Connection{ | 
|  | closer:      rwc, | 
|  | writerBox:   make(chan Writer, 1), | 
|  | outgoingBox: make(chan map[ID]chan<- *Response, 1), | 
|  | incomingBox: make(chan map[ID]*incoming, 1), | 
|  | } | 
|  | c.async.init() | 
|  |  | 
|  | options, err := binder.Bind(ctx, c) | 
|  | if err != nil { | 
|  | return nil, err | 
|  | } | 
|  | if options.Framer == nil { | 
|  | options.Framer = HeaderFramer() | 
|  | } | 
|  | if options.Preempter == nil { | 
|  | options.Preempter = defaultHandler{} | 
|  | } | 
|  | if options.Handler == nil { | 
|  | options.Handler = defaultHandler{} | 
|  | } | 
|  | c.outgoingBox <- make(map[ID]chan<- *Response) | 
|  | c.incomingBox <- make(map[ID]*incoming) | 
|  | // the goroutines started here will continue until the underlying stream is closed | 
|  | reader := options.Framer.Reader(rwc) | 
|  | readToQueue := make(chan *incoming) | 
|  | queueToDeliver := make(chan *incoming) | 
|  | go c.readIncoming(ctx, reader, readToQueue) | 
|  | go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver) | 
|  | go c.deliverMessages(ctx, options.Handler, queueToDeliver) | 
|  | // releaseing the writer must be the last thing we do in case any requests | 
|  | // are blocked waiting for the connection to be ready | 
|  | c.writerBox <- options.Framer.Writer(rwc) | 
|  | return c, nil | 
|  | } | 
|  |  | 
|  | // Notify invokes the target method but does not wait for a response. | 
|  | // The params will be marshaled to JSON before sending over the wire, and will | 
|  | // be handed to the method invoked. | 
|  | func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error { | 
|  | notify, err := NewNotification(method, params) | 
|  | if err != nil { | 
|  | return errors.Errorf("marshaling notify parameters: %v", err) | 
|  | } | 
|  | ctx = event.Start(ctx, method, RPCDirection(Outbound)) | 
|  | Started.Record(ctx, 1, Method(method)) | 
|  | var errLabel event.Label | 
|  | if err = c.write(ctx, notify); err != nil { | 
|  | errLabel = event.Value("error", err) | 
|  | } | 
|  | Finished.Record(ctx, 1, errLabel) | 
|  | event.End(ctx) | 
|  | return err | 
|  | } | 
|  |  | 
|  | // Call invokes the target method and returns an object that can be used to await the response. | 
|  | // The params will be marshaled to JSON before sending over the wire, and will | 
|  | // be handed to the method invoked. | 
|  | // You do not have to wait for the response, it can just be ignored if not needed. | 
|  | // If sending the call failed, the response will be ready and have the error in it. | 
|  | func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall { | 
|  | result := &AsyncCall{ | 
|  | id:        Int64ID(atomic.AddInt64(&c.seq, 1)), | 
|  | resultBox: make(chan asyncResult, 1), | 
|  | } | 
|  | // TODO: rewrite this using the new target/prototype stuff | 
|  | ctx = event.Start(ctx, method, | 
|  | Method(method), RPCDirection(Outbound), RPCID(fmt.Sprintf("%q", result.id))) | 
|  | Started.Record(ctx, 1, Method(method)) | 
|  | result.ctx = ctx | 
|  | // generate a new request identifier | 
|  | call, err := NewCall(result.id, method, params) | 
|  | if err != nil { | 
|  | // set the result to failed | 
|  | result.resultBox <- asyncResult{err: errors.Errorf("marshaling call parameters: %w", err)} | 
|  | return result | 
|  | } | 
|  | // We have to add ourselves to the pending map before we send, otherwise we | 
|  | // are racing the response. | 
|  | // rchan is buffered in case the response arrives without a listener. | 
|  | result.response = make(chan *Response, 1) | 
|  | pending := <-c.outgoingBox | 
|  | pending[result.id] = result.response | 
|  | c.outgoingBox <- pending | 
|  | // now we are ready to send | 
|  | if err := c.write(ctx, call); err != nil { | 
|  | // sending failed, we will never get a response, so deliver a fake one | 
|  | r, _ := NewResponse(result.id, nil, err) | 
|  | c.incomingResponse(r) | 
|  | } | 
|  | return result | 
|  | } | 
|  |  | 
|  | // ID used for this call. | 
|  | // This can be used to cancel the call if needed. | 
|  | func (a *AsyncCall) ID() ID { return a.id } | 
|  |  | 
|  | // IsReady can be used to check if the result is already prepared. | 
|  | // This is guaranteed to return true on a result for which Await has already | 
|  | // returned, or a call that failed to send in the first place. | 
|  | func (a *AsyncCall) IsReady() bool { | 
|  | select { | 
|  | case r := <-a.resultBox: | 
|  | a.resultBox <- r | 
|  | return true | 
|  | default: | 
|  | return false | 
|  | } | 
|  | } | 
|  |  | 
|  | // Await the results of a Call. | 
|  | // The response will be unmarshaled from JSON into the result. | 
|  | func (a *AsyncCall) Await(ctx context.Context, result interface{}) error { | 
|  | status := "NONE" | 
|  | defer event.End(a.ctx, StatusCode(status)) | 
|  | var r asyncResult | 
|  | select { | 
|  | case response := <-a.response: | 
|  | // response just arrived, prepare the result | 
|  | switch { | 
|  | case response.Error != nil: | 
|  | r.err = response.Error | 
|  | status = "ERROR" | 
|  | default: | 
|  | r.result = response.Result | 
|  | status = "OK" | 
|  | } | 
|  | case r = <-a.resultBox: | 
|  | // result already available | 
|  | case <-ctx.Done(): | 
|  | status = "CANCELLED" | 
|  | return ctx.Err() | 
|  | } | 
|  | // refill the box for the next caller | 
|  | a.resultBox <- r | 
|  | // and unpack the result | 
|  | if r.err != nil { | 
|  | return r.err | 
|  | } | 
|  | if result == nil || len(r.result) == 0 { | 
|  | return nil | 
|  | } | 
|  | return json.Unmarshal(r.result, result) | 
|  | } | 
|  |  | 
|  | // Respond deliverers a response to an incoming Call. | 
|  | // It is an error to not call this exactly once for any message for which a | 
|  | // handler has previously returned ErrAsyncResponse. It is also an error to | 
|  | // call this for any other message. | 
|  | func (c *Connection) Respond(id ID, result interface{}, rerr error) error { | 
|  | pending := <-c.incomingBox | 
|  | defer func() { c.incomingBox <- pending }() | 
|  | entry, found := pending[id] | 
|  | if !found { | 
|  | return nil | 
|  | } | 
|  | delete(pending, id) | 
|  | return c.respond(entry, result, rerr) | 
|  | } | 
|  |  | 
|  | // Cancel is used to cancel an inbound message by ID, it does not cancel | 
|  | // outgoing messages. | 
|  | // This is only used inside a message handler that is layering a | 
|  | // cancellation protocol on top of JSON RPC 2. | 
|  | // It will not complain if the ID is not a currently active message, and it will | 
|  | // not cause any messages that have not arrived yet with that ID to be | 
|  | // cancelled. | 
|  | func (c *Connection) Cancel(id ID) { | 
|  | pending := <-c.incomingBox | 
|  | defer func() { c.incomingBox <- pending }() | 
|  | if entry, found := pending[id]; found && entry.cancel != nil { | 
|  | entry.cancel() | 
|  | entry.cancel = nil | 
|  | } | 
|  | } | 
|  |  | 
|  | // Wait blocks until the connection is fully closed, but does not close it. | 
|  | func (c *Connection) Wait() error { | 
|  | return c.async.wait() | 
|  | } | 
|  |  | 
|  | // Close can be used to close the underlying stream, and then wait for the connection to | 
|  | // fully shut down. | 
|  | // This does not cancel in flight requests, but waits for them to gracefully complete. | 
|  | func (c *Connection) Close() error { | 
|  | // close the underlying stream | 
|  | if err := c.closer.Close(); err != nil && !isClosingError(err) { | 
|  | return err | 
|  | } | 
|  | // and then wait for it to cause the connection to close | 
|  | if err := c.Wait(); err != nil && !isClosingError(err) { | 
|  | return err | 
|  | } | 
|  | return nil | 
|  | } | 
|  |  | 
|  | // readIncoming collects inbound messages from the reader and delivers them, either responding | 
|  | // to outgoing calls or feeding requests to the queue. | 
|  | func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) { | 
|  | defer close(toQueue) | 
|  | for { | 
|  | // get the next message | 
|  | // no lock is needed, this is the only reader | 
|  | msg, n, err := reader.Read(ctx) | 
|  | if err != nil { | 
|  | // The stream failed, we cannot continue | 
|  | c.async.setError(err) | 
|  | return | 
|  | } | 
|  | switch msg := msg.(type) { | 
|  | case *Request: | 
|  | entry := &incoming{ | 
|  | request: msg, | 
|  | } | 
|  | // add a span to the context for this request | 
|  | var idLabel event.Label | 
|  | if msg.IsCall() { | 
|  | idLabel = RPCID(fmt.Sprintf("%q", msg.ID)) | 
|  | } | 
|  | entry.baseCtx = event.Start(ctx, msg.Method, | 
|  | Method(msg.Method), RPCDirection(Inbound), idLabel) | 
|  | Started.Record(entry.baseCtx, 1, Method(msg.Method)) | 
|  | ReceivedBytes.Record(entry.baseCtx, n, Method(msg.Method)) | 
|  | // in theory notifications cannot be cancelled, but we build them a cancel context anyway | 
|  | entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx) | 
|  | // if the request is a call, add it to the incoming map so it can be | 
|  | // cancelled by id | 
|  | if msg.IsCall() { | 
|  | pending := <-c.incomingBox | 
|  | pending[msg.ID] = entry | 
|  | c.incomingBox <- pending | 
|  | } | 
|  | // send the message to the incoming queue | 
|  | toQueue <- entry | 
|  | case *Response: | 
|  | // If method is not set, this should be a response, in which case we must | 
|  | // have an id to send the response back to the caller. | 
|  | c.incomingResponse(msg) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | func (c *Connection) incomingResponse(msg *Response) { | 
|  | pending := <-c.outgoingBox | 
|  | response, ok := pending[msg.ID] | 
|  | if ok { | 
|  | delete(pending, msg.ID) | 
|  | } | 
|  | c.outgoingBox <- pending | 
|  | if response != nil { | 
|  | response <- msg | 
|  | } | 
|  | } | 
|  |  | 
|  | // manageQueue reads incoming requests, attempts to process them with the preempter, or queue them | 
|  | // up for normal handling. | 
|  | func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) { | 
|  | defer close(toDeliver) | 
|  | q := []*incoming{} | 
|  | ok := true | 
|  | for { | 
|  | var nextReq *incoming | 
|  | if len(q) == 0 { | 
|  | // no messages in the queue | 
|  | // if we were closing, then we are done | 
|  | if !ok { | 
|  | return | 
|  | } | 
|  | // not closing, but nothing in the queue, so just block waiting for a read | 
|  | nextReq, ok = <-fromRead | 
|  | } else { | 
|  | // we have a non empty queue, so pick whichever of reading or delivering | 
|  | // that we can make progress on | 
|  | select { | 
|  | case nextReq, ok = <-fromRead: | 
|  | case toDeliver <- q[0]: | 
|  | // TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction? | 
|  | q = q[1:] | 
|  | } | 
|  | } | 
|  | if nextReq != nil { | 
|  | // TODO: should we allow to limit the queue size? | 
|  | var result interface{} | 
|  | rerr := nextReq.handleCtx.Err() | 
|  | if rerr == nil { | 
|  | // only preempt if not already cancelled | 
|  | result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request) | 
|  | } | 
|  | switch { | 
|  | case rerr == ErrNotHandled: | 
|  | // message not handled, add it to the queue for the main handler | 
|  | q = append(q, nextReq) | 
|  | case rerr == ErrAsyncResponse: | 
|  | // message handled but the response will come later | 
|  | default: | 
|  | // anything else means the message is fully handled | 
|  | c.reply(nextReq, result, rerr) | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) { | 
|  | defer c.async.done() | 
|  | for entry := range fromQueue { | 
|  | // cancel any messages in the queue that we have a pending cancel for | 
|  | var result interface{} | 
|  | rerr := entry.handleCtx.Err() | 
|  | if rerr == nil { | 
|  | // only deliver if not already cancelled | 
|  | result, rerr = handler.Handle(entry.handleCtx, entry.request) | 
|  | } | 
|  | switch { | 
|  | case rerr == ErrNotHandled: | 
|  | // message not handled, report it back to the caller as an error | 
|  | c.reply(entry, nil, errors.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method)) | 
|  | case rerr == ErrAsyncResponse: | 
|  | // message handled but the response will come later | 
|  | default: | 
|  | c.reply(entry, result, rerr) | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | // reply is used to reply to an incoming request that has just been handled | 
|  | func (c *Connection) reply(entry *incoming, result interface{}, rerr error) { | 
|  | if entry.request.IsCall() { | 
|  | // we have a call finishing, remove it from the incoming map | 
|  | pending := <-c.incomingBox | 
|  | defer func() { c.incomingBox <- pending }() | 
|  | delete(pending, entry.request.ID) | 
|  | } | 
|  | if err := c.respond(entry, result, rerr); err != nil { | 
|  | // no way to propagate this error | 
|  | // TODO: should we do more than just log it? | 
|  | event.Error(entry.baseCtx, "jsonrpc2 message delivery failed", err) | 
|  | } | 
|  | } | 
|  |  | 
|  | // respond sends a response. | 
|  | // This is the code shared between reply and SendResponse. | 
|  | func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error { | 
|  | var err error | 
|  | if entry.request.IsCall() { | 
|  | // send the response | 
|  | if result == nil && rerr == nil { | 
|  | // call with no response, send an error anyway | 
|  | rerr = errors.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method) | 
|  | } | 
|  | var response *Response | 
|  | response, err = NewResponse(entry.request.ID, result, rerr) | 
|  | if err == nil { | 
|  | // we write the response with the base context, in case the message was cancelled | 
|  | err = c.write(entry.baseCtx, response) | 
|  | } | 
|  | } else { | 
|  | switch { | 
|  | case rerr != nil: | 
|  | // notification failed | 
|  | err = errors.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr) | 
|  | rerr = nil | 
|  | case result != nil: | 
|  | // notification produced a response, which is an error | 
|  | err = errors.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method) | 
|  | default: | 
|  | // normal notification finish | 
|  | } | 
|  | } | 
|  | var status string | 
|  | switch { | 
|  | case rerr != nil || err != nil: | 
|  | status = "ERROR" | 
|  | default: | 
|  | status = "OK" | 
|  | } | 
|  | // and just to be clean, invoke and clear the cancel if needed | 
|  | if entry.cancel != nil { | 
|  | entry.cancel() | 
|  | entry.cancel = nil | 
|  | } | 
|  | // mark the entire request processing as done | 
|  | event.End(entry.baseCtx, StatusCode(status)) | 
|  | return err | 
|  | } | 
|  |  | 
|  | // write is used by all things that write outgoing messages, including replies. | 
|  | // it makes sure that writes are atomic | 
|  | func (c *Connection) write(ctx context.Context, msg Message) error { | 
|  | writer := <-c.writerBox | 
|  | defer func() { c.writerBox <- writer }() | 
|  | n, err := writer.Write(ctx, msg) | 
|  | // TODO: get a method label in here somehow. | 
|  | SentBytes.Record(ctx, n) | 
|  | return err | 
|  | } |