| // Copyright 2018 The Go Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package jsonrpc2 |
| |
| import ( |
| "context" |
| "encoding/json" |
| "errors" |
| "fmt" |
| "io" |
| "sync" |
| "sync/atomic" |
| "time" |
| |
| "golang.org/x/tools/internal/event" |
| "golang.org/x/tools/internal/event/keys" |
| "golang.org/x/tools/internal/event/label" |
| "golang.org/x/tools/internal/event/tag" |
| ) |
| |
| // Binder builds a connection configuration. |
| // This may be used in servers to generate a new configuration per connection. |
| // ConnectionOptions itself implements Binder returning itself unmodified, to |
| // allow for the simple cases where no per connection information is needed. |
| type Binder interface { |
| // Bind returns the ConnectionOptions to use when establishing the passed-in |
| // Connection. |
| // |
| // The connection is not ready to use when Bind is called, |
| // but Bind may close it without reading or writing to it. |
| Bind(context.Context, *Connection) ConnectionOptions |
| } |
| |
| // A BinderFunc implements the Binder interface for a standalone Bind function. |
| type BinderFunc func(context.Context, *Connection) ConnectionOptions |
| |
| func (f BinderFunc) Bind(ctx context.Context, c *Connection) ConnectionOptions { |
| return f(ctx, c) |
| } |
| |
| var _ Binder = BinderFunc(nil) |
| |
| // ConnectionOptions holds the options for new connections. |
| type ConnectionOptions struct { |
| // Framer allows control over the message framing and encoding. |
| // If nil, HeaderFramer will be used. |
| Framer Framer |
| // Preempter allows registration of a pre-queue message handler. |
| // If nil, no messages will be preempted. |
| Preempter Preempter |
| // Handler is used as the queued message handler for inbound messages. |
| // If nil, all responses will be ErrNotHandled. |
| Handler Handler |
| // OnInternalError, if non-nil, is called with any internal errors that occur |
| // while serving the connection, such as protocol errors or invariant |
| // violations. (If nil, internal errors result in panics.) |
| OnInternalError func(error) |
| } |
| |
| // Connection manages the jsonrpc2 protocol, connecting responses back to their |
| // calls. |
| // Connection is bidirectional; it does not have a designated server or client |
| // end. |
| type Connection struct { |
| seq int64 // must only be accessed using atomic operations |
| |
| stateMu sync.Mutex |
| state inFlightState // accessed only in updateInFlight |
| done chan struct{} // closed (under stateMu) when state.closed is true and all goroutines have completed |
| |
| writer chan Writer // 1-buffered; stores the writer when not in use |
| |
| handler Handler |
| |
| onInternalError func(error) |
| onDone func() |
| } |
| |
| // inFlightState records the state of the incoming and outgoing calls on a |
| // Connection. |
| type inFlightState struct { |
| connClosing bool // true when the Connection's Close method has been called |
| reading bool // true while the readIncoming goroutine is running |
| readErr error // non-nil when the readIncoming goroutine exits (typically io.EOF) |
| writeErr error // non-nil if a call to the Writer has failed with a non-canceled Context |
| |
| // closer shuts down and cleans up the Reader and Writer state, ideally |
| // interrupting any Read or Write call that is currently blocked. It is closed |
| // when the state is idle and one of: connClosing is true, readErr is non-nil, |
| // or writeErr is non-nil. |
| // |
| // After the closer has been invoked, the closer field is set to nil |
| // and the closeErr field is simultaneously set to its result. |
| closer io.Closer |
| closeErr error // error returned from closer.Close |
| |
| outgoingCalls map[ID]*AsyncCall // calls only |
| outgoingNotifications int // # of notifications awaiting "write" |
| |
| // incoming stores the total number of incoming calls and notifications |
| // that have not yet written or processed a result. |
| incoming int |
| |
| incomingByID map[ID]*incomingRequest // calls only |
| |
| // handlerQueue stores the backlog of calls and notifications that were not |
| // already handled by a preempter. |
| // The queue does not include the request currently being handled (if any). |
| handlerQueue []*incomingRequest |
| handlerRunning bool |
| } |
| |
| // updateInFlight locks the state of the connection's in-flight requests, allows |
| // f to mutate that state, and closes the connection if it is idle and either |
| // is closing or has a read or write error. |
| func (c *Connection) updateInFlight(f func(*inFlightState)) { |
| c.stateMu.Lock() |
| defer c.stateMu.Unlock() |
| |
| s := &c.state |
| |
| f(s) |
| |
| select { |
| case <-c.done: |
| // The connection was already completely done at the start of this call to |
| // updateInFlight, so it must remain so. (The call to f should have noticed |
| // that and avoided making any updates that would cause the state to be |
| // non-idle.) |
| if !s.idle() { |
| panic("jsonrpc2_v2: updateInFlight transitioned to non-idle when already done") |
| } |
| return |
| default: |
| } |
| |
| if s.idle() && s.shuttingDown(ErrUnknown) != nil { |
| if s.closer != nil { |
| s.closeErr = s.closer.Close() |
| s.closer = nil // prevent duplicate Close calls |
| } |
| if s.reading { |
| // The readIncoming goroutine is still running. Our call to Close should |
| // cause it to exit soon, at which point it will make another call to |
| // updateInFlight, set s.reading to false, and mark the Connection done. |
| } else { |
| // The readIncoming goroutine has exited, or never started to begin with. |
| // Since everything else is idle, we're completely done. |
| if c.onDone != nil { |
| c.onDone() |
| } |
| close(c.done) |
| } |
| } |
| } |
| |
| // idle reports whether the connction is in a state with no pending calls or |
| // notifications. |
| // |
| // If idle returns true, the readIncoming goroutine may still be running, |
| // but no other goroutines are doing work on behalf of the connnection. |
| func (s *inFlightState) idle() bool { |
| return len(s.outgoingCalls) == 0 && s.outgoingNotifications == 0 && s.incoming == 0 && !s.handlerRunning |
| } |
| |
| // shuttingDown reports whether the connection is in a state that should |
| // disallow new (incoming and outgoing) calls. It returns either nil or |
| // an error that is or wraps the provided errClosing. |
| func (s *inFlightState) shuttingDown(errClosing error) error { |
| if s.connClosing { |
| // If Close has been called explicitly, it doesn't matter what state the |
| // Reader and Writer are in: we shouldn't be starting new work because the |
| // caller told us not to start new work. |
| return errClosing |
| } |
| if s.readErr != nil { |
| // If the read side of the connection is broken, we cannot read new call |
| // requests, and cannot read responses to our outgoing calls. |
| return fmt.Errorf("%w: %v", errClosing, s.readErr) |
| } |
| if s.writeErr != nil { |
| // If the write side of the connection is broken, we cannot write responses |
| // for incoming calls, and cannot write requests for outgoing calls. |
| return fmt.Errorf("%w: %v", errClosing, s.writeErr) |
| } |
| return nil |
| } |
| |
| // incomingRequest is used to track an incoming request as it is being handled |
| type incomingRequest struct { |
| *Request // the request being processed |
| ctx context.Context |
| cancel context.CancelFunc |
| endSpan func() // called (and set to nil) when the response is sent |
| } |
| |
| // Bind returns the options unmodified. |
| func (o ConnectionOptions) Bind(context.Context, *Connection) ConnectionOptions { |
| return o |
| } |
| |
| // newConnection creates a new connection and runs it. |
| // |
| // This is used by the Dial and Serve functions to build the actual connection. |
| // |
| // The connection is closed automatically (and its resources cleaned up) when |
| // the last request has completed after the underlying ReadWriteCloser breaks, |
| // but it may be stopped earlier by calling Close (for a clean shutdown). |
| func newConnection(bindCtx context.Context, rwc io.ReadWriteCloser, binder Binder, onDone func()) *Connection { |
| // TODO: Should we create a new event span here? |
| // This will propagate cancellation from ctx; should it? |
| ctx := notDone{bindCtx} |
| |
| c := &Connection{ |
| state: inFlightState{closer: rwc}, |
| done: make(chan struct{}), |
| writer: make(chan Writer, 1), |
| onDone: onDone, |
| } |
| // It's tempting to set a finalizer on c to verify that the state has gone |
| // idle when the connection becomes unreachable. Unfortunately, the Binder |
| // interface makes that unsafe: it allows the Handler to close over the |
| // Connection, which could create a reference cycle that would cause the |
| // Connection to become uncollectable. |
| |
| options := binder.Bind(bindCtx, c) |
| framer := options.Framer |
| if framer == nil { |
| framer = HeaderFramer() |
| } |
| c.handler = options.Handler |
| if c.handler == nil { |
| c.handler = defaultHandler{} |
| } |
| c.onInternalError = options.OnInternalError |
| |
| c.writer <- framer.Writer(rwc) |
| reader := framer.Reader(rwc) |
| |
| c.updateInFlight(func(s *inFlightState) { |
| select { |
| case <-c.done: |
| // Bind already closed the connection; don't start a goroutine to read it. |
| return |
| default: |
| } |
| |
| // The goroutine started here will continue until the underlying stream is closed. |
| // |
| // (If the Binder closed the Connection already, this should error out and |
| // return almost immediately.) |
| s.reading = true |
| go c.readIncoming(ctx, reader, options.Preempter) |
| }) |
| return c |
| } |
| |
| // Notify invokes the target method but does not wait for a response. |
| // The params will be marshaled to JSON before sending over the wire, and will |
| // be handed to the method invoked. |
| func (c *Connection) Notify(ctx context.Context, method string, params interface{}) (err error) { |
| ctx, done := event.Start(ctx, method, |
| tag.Method.Of(method), |
| tag.RPCDirection.Of(tag.Outbound), |
| ) |
| attempted := false |
| |
| defer func() { |
| labelStatus(ctx, err) |
| done() |
| if attempted { |
| c.updateInFlight(func(s *inFlightState) { |
| s.outgoingNotifications-- |
| }) |
| } |
| }() |
| |
| c.updateInFlight(func(s *inFlightState) { |
| // If the connection is shutting down, allow outgoing notifications only if |
| // there is at least one call still in flight. The number of calls in flight |
| // cannot increase once shutdown begins, and allowing outgoing notifications |
| // may permit notifications that will cancel in-flight calls. |
| if len(s.outgoingCalls) == 0 && len(s.incomingByID) == 0 { |
| err = s.shuttingDown(ErrClientClosing) |
| if err != nil { |
| return |
| } |
| } |
| s.outgoingNotifications++ |
| attempted = true |
| }) |
| if err != nil { |
| return err |
| } |
| |
| notify, err := NewNotification(method, params) |
| if err != nil { |
| return fmt.Errorf("marshaling notify parameters: %v", err) |
| } |
| |
| event.Metric(ctx, tag.Started.Of(1)) |
| return c.write(ctx, notify) |
| } |
| |
| // Call invokes the target method and returns an object that can be used to await the response. |
| // The params will be marshaled to JSON before sending over the wire, and will |
| // be handed to the method invoked. |
| // You do not have to wait for the response, it can just be ignored if not needed. |
| // If sending the call failed, the response will be ready and have the error in it. |
| func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall { |
| // Generate a new request identifier. |
| id := Int64ID(atomic.AddInt64(&c.seq, 1)) |
| ctx, endSpan := event.Start(ctx, method, |
| tag.Method.Of(method), |
| tag.RPCDirection.Of(tag.Outbound), |
| tag.RPCID.Of(fmt.Sprintf("%q", id)), |
| ) |
| |
| ac := &AsyncCall{ |
| id: id, |
| ready: make(chan struct{}), |
| ctx: ctx, |
| endSpan: endSpan, |
| } |
| // When this method returns, either ac is retired, or the request has been |
| // written successfully and the call is awaiting a response (to be provided by |
| // the readIncoming goroutine). |
| |
| call, err := NewCall(ac.id, method, params) |
| if err != nil { |
| ac.retire(&Response{ID: id, Error: fmt.Errorf("marshaling call parameters: %w", err)}) |
| return ac |
| } |
| |
| c.updateInFlight(func(s *inFlightState) { |
| err = s.shuttingDown(ErrClientClosing) |
| if err != nil { |
| return |
| } |
| if s.outgoingCalls == nil { |
| s.outgoingCalls = make(map[ID]*AsyncCall) |
| } |
| s.outgoingCalls[ac.id] = ac |
| }) |
| if err != nil { |
| ac.retire(&Response{ID: id, Error: err}) |
| return ac |
| } |
| |
| event.Metric(ctx, tag.Started.Of(1)) |
| if err := c.write(ctx, call); err != nil { |
| // Sending failed. We will never get a response, so deliver a fake one if it |
| // wasn't already retired by the connection breaking. |
| c.updateInFlight(func(s *inFlightState) { |
| if s.outgoingCalls[ac.id] == ac { |
| delete(s.outgoingCalls, ac.id) |
| ac.retire(&Response{ID: id, Error: err}) |
| } else { |
| // ac was already retired by the readIncoming goroutine: |
| // perhaps our write raced with the Read side of the connection breaking. |
| } |
| }) |
| } |
| return ac |
| } |
| |
| type AsyncCall struct { |
| id ID |
| ready chan struct{} // closed after response has been set and span has been ended |
| response *Response |
| ctx context.Context // for event logging only |
| endSpan func() // close the tracing span when all processing for the message is complete |
| } |
| |
| // ID used for this call. |
| // This can be used to cancel the call if needed. |
| func (ac *AsyncCall) ID() ID { return ac.id } |
| |
| // IsReady can be used to check if the result is already prepared. |
| // This is guaranteed to return true on a result for which Await has already |
| // returned, or a call that failed to send in the first place. |
| func (ac *AsyncCall) IsReady() bool { |
| select { |
| case <-ac.ready: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // retire processes the response to the call. |
| func (ac *AsyncCall) retire(response *Response) { |
| select { |
| case <-ac.ready: |
| panic(fmt.Sprintf("jsonrpc2: retire called twice for ID %v", ac.id)) |
| default: |
| } |
| |
| ac.response = response |
| labelStatus(ac.ctx, response.Error) |
| ac.endSpan() |
| // Allow the trace context, which may retain a lot of reachable values, |
| // to be garbage-collected. |
| ac.ctx, ac.endSpan = nil, nil |
| |
| close(ac.ready) |
| } |
| |
| // Await waits for (and decodes) the results of a Call. |
| // The response will be unmarshaled from JSON into the result. |
| func (ac *AsyncCall) Await(ctx context.Context, result interface{}) error { |
| select { |
| case <-ctx.Done(): |
| return ctx.Err() |
| case <-ac.ready: |
| } |
| if ac.response.Error != nil { |
| return ac.response.Error |
| } |
| if result == nil { |
| return nil |
| } |
| return json.Unmarshal(ac.response.Result, result) |
| } |
| |
| // Respond delivers a response to an incoming Call. |
| // |
| // Respond must be called exactly once for any message for which a handler |
| // returns ErrAsyncResponse. It must not be called for any other message. |
| func (c *Connection) Respond(id ID, result interface{}, err error) error { |
| var req *incomingRequest |
| c.updateInFlight(func(s *inFlightState) { |
| req = s.incomingByID[id] |
| }) |
| if req == nil { |
| return c.internalErrorf("Request not found for ID %v", id) |
| } |
| |
| if err == ErrAsyncResponse { |
| // Respond is supposed to supply the asynchronous response, so it would be |
| // confusing to call Respond with an error that promises to call Respond |
| // again. |
| err = c.internalErrorf("Respond called with ErrAsyncResponse for %q", req.Method) |
| } |
| return c.processResult("Respond", req, result, err) |
| } |
| |
| // Cancel cancels the Context passed to the Handle call for the inbound message |
| // with the given ID. |
| // |
| // Cancel will not complain if the ID is not a currently active message, and it |
| // will not cause any messages that have not arrived yet with that ID to be |
| // cancelled. |
| func (c *Connection) Cancel(id ID) { |
| var req *incomingRequest |
| c.updateInFlight(func(s *inFlightState) { |
| req = s.incomingByID[id] |
| }) |
| if req != nil { |
| req.cancel() |
| } |
| } |
| |
| // Wait blocks until the connection is fully closed, but does not close it. |
| func (c *Connection) Wait() error { |
| var err error |
| <-c.done |
| c.updateInFlight(func(s *inFlightState) { |
| err = s.closeErr |
| }) |
| return err |
| } |
| |
| // Close stops accepting new requests, waits for in-flight requests and enqueued |
| // Handle calls to complete, and then closes the underlying stream. |
| // |
| // After the start of a Close, notification requests (that lack IDs and do not |
| // receive responses) will continue to be passed to the Preempter, but calls |
| // with IDs will receive immediate responses with ErrServerClosing, and no new |
| // requests (not even notifications!) will be enqueued to the Handler. |
| func (c *Connection) Close() error { |
| // Stop handling new requests, and interrupt the reader (by closing the |
| // connection) as soon as the active requests finish. |
| c.updateInFlight(func(s *inFlightState) { s.connClosing = true }) |
| |
| return c.Wait() |
| } |
| |
| // readIncoming collects inbound messages from the reader and delivers them, either responding |
| // to outgoing calls or feeding requests to the queue. |
| func (c *Connection) readIncoming(ctx context.Context, reader Reader, preempter Preempter) { |
| var err error |
| for { |
| var ( |
| msg Message |
| n int64 |
| ) |
| msg, n, err = reader.Read(ctx) |
| if err != nil { |
| break |
| } |
| |
| switch msg := msg.(type) { |
| case *Request: |
| c.acceptRequest(ctx, msg, n, preempter) |
| |
| case *Response: |
| c.updateInFlight(func(s *inFlightState) { |
| if ac, ok := s.outgoingCalls[msg.ID]; ok { |
| delete(s.outgoingCalls, msg.ID) |
| ac.retire(msg) |
| } else { |
| // TODO: How should we report unexpected responses? |
| } |
| }) |
| |
| default: |
| c.internalErrorf("Read returned an unexpected message of type %T", msg) |
| } |
| } |
| |
| c.updateInFlight(func(s *inFlightState) { |
| s.reading = false |
| s.readErr = err |
| |
| // Retire any outgoing requests that were still in flight: with the Reader no |
| // longer being processed, they necessarily cannot receive a response. |
| for id, ac := range s.outgoingCalls { |
| ac.retire(&Response{ID: id, Error: err}) |
| } |
| s.outgoingCalls = nil |
| }) |
| } |
| |
| // acceptRequest either handles msg synchronously or enqueues it to be handled |
| // asynchronously. |
| func (c *Connection) acceptRequest(ctx context.Context, msg *Request, msgBytes int64, preempter Preempter) { |
| // Add a span to the context for this request. |
| labels := append(make([]label.Label, 0, 3), // Make space for the ID if present. |
| tag.Method.Of(msg.Method), |
| tag.RPCDirection.Of(tag.Inbound), |
| ) |
| if msg.IsCall() { |
| labels = append(labels, tag.RPCID.Of(fmt.Sprintf("%q", msg.ID))) |
| } |
| ctx, endSpan := event.Start(ctx, msg.Method, labels...) |
| event.Metric(ctx, |
| tag.Started.Of(1), |
| tag.ReceivedBytes.Of(msgBytes)) |
| |
| // In theory notifications cannot be cancelled, but we build them a cancel |
| // context anyway. |
| ctx, cancel := context.WithCancel(ctx) |
| req := &incomingRequest{ |
| Request: msg, |
| ctx: ctx, |
| cancel: cancel, |
| endSpan: endSpan, |
| } |
| |
| // If the request is a call, add it to the incoming map so it can be |
| // cancelled (or responded) by ID. |
| var err error |
| c.updateInFlight(func(s *inFlightState) { |
| s.incoming++ |
| |
| if req.IsCall() { |
| if s.incomingByID[req.ID] != nil { |
| err = fmt.Errorf("%w: request ID %v already in use", ErrInvalidRequest, req.ID) |
| req.ID = ID{} // Don't misattribute this error to the existing request. |
| return |
| } |
| |
| if s.incomingByID == nil { |
| s.incomingByID = make(map[ID]*incomingRequest) |
| } |
| s.incomingByID[req.ID] = req |
| |
| // When shutting down, reject all new Call requests, even if they could |
| // theoretically be handled by the preempter. The preempter could return |
| // ErrAsyncResponse, which would increase the amount of work in flight |
| // when we're trying to ensure that it strictly decreases. |
| err = s.shuttingDown(ErrServerClosing) |
| } |
| }) |
| if err != nil { |
| c.processResult("acceptRequest", req, nil, err) |
| return |
| } |
| |
| if preempter != nil { |
| result, err := preempter.Preempt(req.ctx, req.Request) |
| |
| if req.IsCall() && errors.Is(err, ErrAsyncResponse) { |
| // This request will remain in flight until Respond is called for it. |
| return |
| } |
| |
| if !errors.Is(err, ErrNotHandled) { |
| c.processResult("Preempt", req, result, err) |
| return |
| } |
| } |
| |
| c.updateInFlight(func(s *inFlightState) { |
| // If the connection is shutting down, don't enqueue anything to the |
| // handler — not even notifications. That ensures that if the handler |
| // continues to make progress, it will eventually become idle and |
| // close the connection. |
| err = s.shuttingDown(ErrServerClosing) |
| if err != nil { |
| return |
| } |
| |
| // We enqueue requests that have not been preempted to an unbounded slice. |
| // Unfortunately, we cannot in general limit the size of the handler |
| // queue: we have to read every response that comes in on the wire |
| // (because it may be responding to a request issued by, say, an |
| // asynchronous handler), and in order to get to that response we have |
| // to read all of the requests that came in ahead of it. |
| s.handlerQueue = append(s.handlerQueue, req) |
| if !s.handlerRunning { |
| // We start the handleAsync goroutine when it has work to do, and let it |
| // exit when the queue empties. |
| // |
| // Otherwise, in order to synchronize the handler we would need some other |
| // goroutine (probably readIncoming?) to explicitly wait for handleAsync |
| // to finish, and that would complicate error reporting: either the error |
| // report from the goroutine would be blocked on the handler emptying its |
| // queue (which was tried, and introduced a deadlock detected by |
| // TestCloseCallRace), or the error would need to be reported separately |
| // from synchronizing completion. Allowing the handler goroutine to exit |
| // when idle seems simpler than trying to implement either of those |
| // alternatives correctly. |
| s.handlerRunning = true |
| go c.handleAsync() |
| } |
| }) |
| if err != nil { |
| c.processResult("acceptRequest", req, nil, err) |
| } |
| } |
| |
| // handleAsync invokes the handler on the requests in the handler queue |
| // sequentially until the queue is empty. |
| func (c *Connection) handleAsync() { |
| for { |
| var req *incomingRequest |
| c.updateInFlight(func(s *inFlightState) { |
| if len(s.handlerQueue) > 0 { |
| req, s.handlerQueue = s.handlerQueue[0], s.handlerQueue[1:] |
| } else { |
| s.handlerRunning = false |
| } |
| }) |
| if req == nil { |
| return |
| } |
| |
| // Only deliver to the Handler if not already canceled. |
| if err := req.ctx.Err(); err != nil { |
| c.updateInFlight(func(s *inFlightState) { |
| if s.writeErr != nil { |
| // Assume that req.ctx was canceled due to s.writeErr. |
| // TODO(#51365): use a Context API to plumb this through req.ctx. |
| err = fmt.Errorf("%w: %v", ErrServerClosing, s.writeErr) |
| } |
| }) |
| c.processResult("handleAsync", req, nil, err) |
| continue |
| } |
| |
| result, err := c.handler.Handle(req.ctx, req.Request) |
| c.processResult(c.handler, req, result, err) |
| } |
| } |
| |
| // processResult processes the result of a request and, if appropriate, sends a response. |
| func (c *Connection) processResult(from interface{}, req *incomingRequest, result interface{}, err error) error { |
| switch err { |
| case ErrAsyncResponse: |
| if !req.IsCall() { |
| return c.internalErrorf("%#v returned ErrAsyncResponse for a %q Request without an ID", from, req.Method) |
| } |
| return nil // This request is still in flight, so don't record the result yet. |
| case ErrNotHandled, ErrMethodNotFound: |
| // Add detail describing the unhandled method. |
| err = fmt.Errorf("%w: %q", ErrMethodNotFound, req.Method) |
| } |
| |
| if req.endSpan == nil { |
| return c.internalErrorf("%#v produced a duplicate %q Response", from, req.Method) |
| } |
| |
| if result != nil && err != nil { |
| c.internalErrorf("%#v returned a non-nil result with a non-nil error for %s:\n%v\n%#v", from, req.Method, err, result) |
| result = nil // Discard the spurious result and respond with err. |
| } |
| |
| if req.IsCall() { |
| if result == nil && err == nil { |
| err = c.internalErrorf("%#v returned a nil result and nil error for a %q Request that requires a Response", from, req.Method) |
| } |
| |
| response, respErr := NewResponse(req.ID, result, err) |
| |
| // The caller could theoretically reuse the request's ID as soon as we've |
| // sent the response, so ensure that it is removed from the incoming map |
| // before sending. |
| c.updateInFlight(func(s *inFlightState) { |
| delete(s.incomingByID, req.ID) |
| }) |
| if respErr == nil { |
| writeErr := c.write(notDone{req.ctx}, response) |
| if err == nil { |
| err = writeErr |
| } |
| } else { |
| err = c.internalErrorf("%#v returned a malformed result for %q: %w", from, req.Method, respErr) |
| } |
| } else { // req is a notification |
| if result != nil { |
| err = c.internalErrorf("%#v returned a non-nil result for a %q Request without an ID", from, req.Method) |
| } else if err != nil { |
| err = fmt.Errorf("%w: %q notification failed: %v", ErrInternal, req.Method, err) |
| } |
| if err != nil { |
| // TODO: can/should we do anything with this error beyond writing it to the event log? |
| // (Is this the right label to attach to the log?) |
| event.Label(req.ctx, keys.Err.Of(err)) |
| } |
| } |
| |
| labelStatus(req.ctx, err) |
| |
| // Cancel the request and finalize the event span to free any associated resources. |
| req.cancel() |
| req.endSpan() |
| req.endSpan = nil |
| c.updateInFlight(func(s *inFlightState) { |
| if s.incoming == 0 { |
| panic("jsonrpc2_v2: processResult called when incoming count is already zero") |
| } |
| s.incoming-- |
| }) |
| return nil |
| } |
| |
| // write is used by all things that write outgoing messages, including replies. |
| // it makes sure that writes are atomic |
| func (c *Connection) write(ctx context.Context, msg Message) error { |
| writer := <-c.writer |
| defer func() { c.writer <- writer }() |
| n, err := writer.Write(ctx, msg) |
| event.Metric(ctx, tag.SentBytes.Of(n)) |
| |
| if err != nil && ctx.Err() == nil { |
| // The call to Write failed, and since ctx.Err() is nil we can't attribute |
| // the failure (even indirectly) to Context cancellation. The writer appears |
| // to be broken, and future writes are likely to also fail. |
| // |
| // If the read side of the connection is also broken, we might not even be |
| // able to receive cancellation notifications. Since we can't reliably write |
| // the results of incoming calls and can't receive explicit cancellations, |
| // cancel the calls now. |
| c.updateInFlight(func(s *inFlightState) { |
| if s.writeErr == nil { |
| s.writeErr = err |
| for _, r := range s.incomingByID { |
| r.cancel() |
| } |
| } |
| }) |
| } |
| |
| return err |
| } |
| |
| // internalErrorf reports an internal error. By default it panics, but if |
| // c.onInternalError is non-nil it instead calls that and returns an error |
| // wrapping ErrInternal. |
| func (c *Connection) internalErrorf(format string, args ...interface{}) error { |
| err := fmt.Errorf(format, args...) |
| if c.onInternalError == nil { |
| panic("jsonrpc2: " + err.Error()) |
| } |
| c.onInternalError(err) |
| |
| return fmt.Errorf("%w: %v", ErrInternal, err) |
| } |
| |
| // labelStatus labels the status of the event in ctx based on whether err is nil. |
| func labelStatus(ctx context.Context, err error) { |
| if err == nil { |
| event.Label(ctx, tag.StatusCode.Of("OK")) |
| } else { |
| event.Label(ctx, tag.StatusCode.Of("ERROR")) |
| } |
| } |
| |
| // notDone is a context.Context wrapper that returns a nil Done channel. |
| type notDone struct{ ctx context.Context } |
| |
| func (ic notDone) Value(key interface{}) interface{} { |
| return ic.ctx.Value(key) |
| } |
| |
| func (notDone) Done() <-chan struct{} { return nil } |
| func (notDone) Err() error { return nil } |
| func (notDone) Deadline() (time.Time, bool) { return time.Time{}, false } |