blob: 7995f404e58b3270511c338d60109d64a6eca26b [file] [log] [blame] [edit]
// Copyright 2018 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package jsonrpc2
import (
"context"
"encoding/json"
"fmt"
"io"
"sync"
"sync/atomic"
"golang.org/x/tools/internal/event"
"golang.org/x/tools/internal/event/label"
"golang.org/x/tools/internal/event/tag"
)
// Binder builds a connection configuration.
// This may be used in servers to generate a new configuration per connection.
// ConnectionOptions itself implements Binder returning itself unmodified, to
// allow for the simple cases where no per connection information is needed.
type Binder interface {
// Bind returns the ConnectionOptions to use when establishing the passed-in
// Connection.
// The connection is not ready to use when Bind is called.
Bind(context.Context, *Connection) (ConnectionOptions, error)
}
// A BinderFunc implements the Binder interface for a standalone Bind function.
type BinderFunc func(context.Context, *Connection) (ConnectionOptions, error)
func (f BinderFunc) Bind(ctx context.Context, c *Connection) (ConnectionOptions, error) {
return f(ctx, c)
}
var _ Binder = BinderFunc(nil)
// ConnectionOptions holds the options for new connections.
type ConnectionOptions struct {
// Framer allows control over the message framing and encoding.
// If nil, HeaderFramer will be used.
Framer Framer
// Preempter allows registration of a pre-queue message handler.
// If nil, no messages will be preempted.
Preempter Preempter
// Handler is used as the queued message handler for inbound messages.
// If nil, all responses will be ErrNotHandled.
Handler Handler
}
// Connection manages the jsonrpc2 protocol, connecting responses back to their
// calls.
// Connection is bidirectional; it does not have a designated server or client
// end.
type Connection struct {
seq int64 // must only be accessed using atomic operations
closeOnce sync.Once
closer io.Closer
writer chan Writer
outgoing chan map[ID]chan<- *Response
incoming chan map[ID]*incoming
async *async
}
type AsyncCall struct {
id ID
response chan *Response // the channel a response will be delivered on
result chan asyncResult
endSpan func() // close the tracing span when all processing for the message is complete
}
type asyncResult struct {
result []byte
err error
}
// incoming is used to track an incoming request as it is being handled
type incoming struct {
request *Request // the request being processed
baseCtx context.Context // a base context for the message processing
done func() // a function called when all processing for the message is complete
handleCtx context.Context // the context for handling the message, child of baseCtx
cancel func() // a function that cancels the handling context
}
// Bind returns the options unmodified.
func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) {
return o, nil
}
// newConnection creates a new connection and runs it.
// This is used by the Dial and Serve functions to build the actual connection.
func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) {
c := &Connection{
closer: rwc,
writer: make(chan Writer, 1),
outgoing: make(chan map[ID]chan<- *Response, 1),
incoming: make(chan map[ID]*incoming, 1),
async: newAsync(),
}
options, err := binder.Bind(ctx, c)
if err != nil {
return nil, err
}
if options.Framer == nil {
options.Framer = HeaderFramer()
}
if options.Preempter == nil {
options.Preempter = defaultHandler{}
}
if options.Handler == nil {
options.Handler = defaultHandler{}
}
c.outgoing <- make(map[ID]chan<- *Response)
c.incoming <- make(map[ID]*incoming)
// the goroutines started here will continue until the underlying stream is closed
reader := options.Framer.Reader(rwc)
readToQueue := make(chan *incoming)
queueToDeliver := make(chan *incoming)
go c.readIncoming(ctx, reader, readToQueue)
go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver)
go c.deliverMessages(ctx, options.Handler, queueToDeliver)
// releaseing the writer must be the last thing we do in case any requests
// are blocked waiting for the connection to be ready
c.writer <- options.Framer.Writer(rwc)
return c, nil
}
// Notify invokes the target method but does not wait for a response.
// The params will be marshaled to JSON before sending over the wire, and will
// be handed to the method invoked.
func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error {
notify, err := NewNotification(method, params)
if err != nil {
return fmt.Errorf("marshaling notify parameters: %v", err)
}
ctx, done := event.Start(ctx, method,
tag.Method.Of(method),
tag.RPCDirection.Of(tag.Outbound),
)
event.Metric(ctx, tag.Started.Of(1))
err = c.write(ctx, notify)
switch {
case err != nil:
event.Label(ctx, tag.StatusCode.Of("ERROR"))
default:
event.Label(ctx, tag.StatusCode.Of("OK"))
}
done()
return err
}
// Call invokes the target method and returns an object that can be used to await the response.
// The params will be marshaled to JSON before sending over the wire, and will
// be handed to the method invoked.
// You do not have to wait for the response, it can just be ignored if not needed.
// If sending the call failed, the response will be ready and have the error in it.
func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall {
result := &AsyncCall{
id: Int64ID(atomic.AddInt64(&c.seq, 1)),
result: make(chan asyncResult, 1),
}
// generate a new request identifier
call, err := NewCall(result.id, method, params)
if err != nil {
//set the result to failed
result.result <- asyncResult{err: fmt.Errorf("marshaling call parameters: %w", err)}
return result
}
ctx, endSpan := event.Start(ctx, method,
tag.Method.Of(method),
tag.RPCDirection.Of(tag.Outbound),
tag.RPCID.Of(fmt.Sprintf("%q", result.id)),
)
result.endSpan = endSpan
event.Metric(ctx, tag.Started.Of(1))
// We have to add ourselves to the pending map before we send, otherwise we
// are racing the response.
// rchan is buffered in case the response arrives without a listener.
result.response = make(chan *Response, 1)
outgoing, ok := <-c.outgoing
if !ok {
// If the call failed due to (say) an I/O error or broken pipe, attribute it
// as such. (If the error is nil, then the connection must have been shut
// down cleanly.)
err := c.async.wait()
if err == nil {
err = ErrClientClosing
}
resp, respErr := NewResponse(result.id, nil, err)
if respErr != nil {
panic(fmt.Errorf("unexpected error from NewResponse: %w", respErr))
}
result.response <- resp
return result
}
outgoing[result.id] = result.response
c.outgoing <- outgoing
// now we are ready to send
if err := c.write(ctx, call); err != nil {
// sending failed, we will never get a response, so deliver a fake one
r, _ := NewResponse(result.id, nil, err)
c.incomingResponse(r)
}
return result
}
// ID used for this call.
// This can be used to cancel the call if needed.
func (a *AsyncCall) ID() ID { return a.id }
// IsReady can be used to check if the result is already prepared.
// This is guaranteed to return true on a result for which Await has already
// returned, or a call that failed to send in the first place.
func (a *AsyncCall) IsReady() bool {
select {
case r := <-a.result:
a.result <- r
return true
default:
return false
}
}
// Await the results of a Call.
// The response will be unmarshaled from JSON into the result.
func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
defer a.endSpan()
var r asyncResult
select {
case response := <-a.response:
// response just arrived, prepare the result
switch {
case response.Error != nil:
r.err = response.Error
event.Label(ctx, tag.StatusCode.Of("ERROR"))
default:
r.result = response.Result
event.Label(ctx, tag.StatusCode.Of("OK"))
}
case r = <-a.result:
// result already available
case <-ctx.Done():
event.Label(ctx, tag.StatusCode.Of("CANCELLED"))
return ctx.Err()
}
// refill the box for the next caller
a.result <- r
// and unpack the result
if r.err != nil {
return r.err
}
if result == nil || len(r.result) == 0 {
return nil
}
return json.Unmarshal(r.result, result)
}
// Respond delivers a response to an incoming Call.
//
// Respond must be called exactly once for any message for which a handler
// returns ErrAsyncResponse. It must not be called for any other message.
func (c *Connection) Respond(id ID, result interface{}, rerr error) error {
pending := <-c.incoming
defer func() { c.incoming <- pending }()
entry, found := pending[id]
if !found {
return nil
}
delete(pending, id)
return c.respond(entry, result, rerr)
}
// Cancel is used to cancel an inbound message by ID, it does not cancel
// outgoing messages.
// This is only used inside a message handler that is layering a
// cancellation protocol on top of JSON RPC 2.
// It will not complain if the ID is not a currently active message, and it will
// not cause any messages that have not arrived yet with that ID to be
// cancelled.
func (c *Connection) Cancel(id ID) {
pending := <-c.incoming
defer func() { c.incoming <- pending }()
if entry, found := pending[id]; found && entry.cancel != nil {
entry.cancel()
entry.cancel = nil
}
}
// Wait blocks until the connection is fully closed, but does not close it.
func (c *Connection) Wait() error {
return c.async.wait()
}
// Close can be used to close the underlying stream, and then wait for the connection to
// fully shut down.
// This does not cancel in flight requests, but waits for them to gracefully complete.
func (c *Connection) Close() error {
// close the underlying stream
c.closeOnce.Do(func() {
if err := c.closer.Close(); err != nil {
c.async.setError(err)
}
})
// and then wait for it to cause the connection to close
return c.Wait()
}
// readIncoming collects inbound messages from the reader and delivers them, either responding
// to outgoing calls or feeding requests to the queue.
func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) (err error) {
defer func() {
// Retire any outgoing requests that were still in flight.
// With the Reader no longer being processed, they necessarily cannot receive a response.
outgoing := <-c.outgoing
close(c.outgoing) // Prevent new outgoing requests, which would deadlock.
for id, response := range outgoing {
response <- &Response{ID: id, Error: err}
}
close(toQueue)
}()
for {
// get the next message
// no lock is needed, this is the only reader
msg, n, err := reader.Read(ctx)
if err != nil {
// The stream failed, we cannot continue
if !isClosingError(err) {
c.async.setError(err)
}
return err
}
switch msg := msg.(type) {
case *Request:
entry := &incoming{
request: msg,
}
// add a span to the context for this request
labels := append(make([]label.Label, 0, 3), // make space for the id if present
tag.Method.Of(msg.Method),
tag.RPCDirection.Of(tag.Inbound),
)
if msg.IsCall() {
labels = append(labels, tag.RPCID.Of(fmt.Sprintf("%q", msg.ID)))
}
entry.baseCtx, entry.done = event.Start(ctx, msg.Method, labels...)
event.Metric(entry.baseCtx,
tag.Started.Of(1),
tag.ReceivedBytes.Of(n))
// in theory notifications cannot be cancelled, but we build them a cancel context anyway
entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx)
// if the request is a call, add it to the incoming map so it can be
// cancelled by id
if msg.IsCall() {
pending := <-c.incoming
pending[msg.ID] = entry
c.incoming <- pending
}
// send the message to the incoming queue
toQueue <- entry
case *Response:
// If method is not set, this should be a response, in which case we must
// have an id to send the response back to the caller.
c.incomingResponse(msg)
}
}
}
func (c *Connection) incomingResponse(msg *Response) {
var response chan<- *Response
if outgoing, ok := <-c.outgoing; ok {
response = outgoing[msg.ID]
delete(outgoing, msg.ID)
c.outgoing <- outgoing
}
if response != nil {
response <- msg
}
}
// manageQueue reads incoming requests, attempts to process them with the preempter, or queue them
// up for normal handling.
func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) {
defer close(toDeliver)
q := []*incoming{}
ok := true
for {
var nextReq *incoming
if len(q) == 0 {
// no messages in the queue
// if we were closing, then we are done
if !ok {
return
}
// not closing, but nothing in the queue, so just block waiting for a read
nextReq, ok = <-fromRead
} else {
// we have a non empty queue, so pick whichever of reading or delivering
// that we can make progress on
select {
case nextReq, ok = <-fromRead:
case toDeliver <- q[0]:
//TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction?
q = q[1:]
}
}
if nextReq != nil {
// TODO: should we allow to limit the queue size?
var result interface{}
rerr := nextReq.handleCtx.Err()
if rerr == nil {
// only preempt if not already cancelled
result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request)
}
switch {
case rerr == ErrNotHandled:
// message not handled, add it to the queue for the main handler
q = append(q, nextReq)
case rerr == ErrAsyncResponse:
// message handled but the response will come later
default:
// anything else means the message is fully handled
c.reply(nextReq, result, rerr)
}
}
}
}
func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) {
defer func() {
// Close the underlying ReadWriteCloser if not already closed. We're about
// to mark the Connection as done, so we'd better actually be done! 😅
//
// TODO(bcmills): This is actually a bit premature, since we may have
// asynchronous handlers still in flight at this point, but it's at least no
// more premature than calling c.async.done at this point (which we were
// already doing). This will get a proper fix in https://go.dev/cl/388134.
c.closeOnce.Do(func() {
if err := c.closer.Close(); err != nil {
c.async.setError(err)
}
})
c.async.done()
}()
for entry := range fromQueue {
// cancel any messages in the queue that we have a pending cancel for
var result interface{}
rerr := entry.handleCtx.Err()
if rerr == nil {
// only deliver if not already cancelled
result, rerr = handler.Handle(entry.handleCtx, entry.request)
}
switch {
case rerr == ErrNotHandled:
// message not handled, report it back to the caller as an error
c.reply(entry, nil, fmt.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method))
case rerr == ErrAsyncResponse:
// message handled but the response will come later
default:
c.reply(entry, result, rerr)
}
}
}
// reply is used to reply to an incoming request that has just been handled
func (c *Connection) reply(entry *incoming, result interface{}, rerr error) {
if entry.request.IsCall() {
// we have a call finishing, remove it from the incoming map
pending := <-c.incoming
defer func() { c.incoming <- pending }()
delete(pending, entry.request.ID)
}
if err := c.respond(entry, result, rerr); err != nil {
// no way to propagate this error
//TODO: should we do more than just log it?
event.Error(entry.baseCtx, "jsonrpc2 message delivery failed", err)
}
}
// respond sends a response.
// This is the code shared between reply and SendResponse.
func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error {
var err error
if entry.request.IsCall() {
// send the response
if result == nil && rerr == nil {
// call with no response, send an error anyway
rerr = fmt.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method)
}
var response *Response
response, err = NewResponse(entry.request.ID, result, rerr)
if err == nil {
// we write the response with the base context, in case the message was cancelled
err = c.write(entry.baseCtx, response)
}
} else {
switch {
case rerr != nil:
// notification failed
err = fmt.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr)
rerr = nil
case result != nil:
//notification produced a response, which is an error
err = fmt.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method)
default:
// normal notification finish
}
}
switch {
case rerr != nil || err != nil:
event.Label(entry.baseCtx, tag.StatusCode.Of("ERROR"))
default:
event.Label(entry.baseCtx, tag.StatusCode.Of("OK"))
}
// and just to be clean, invoke and clear the cancel if needed
if entry.cancel != nil {
entry.cancel()
entry.cancel = nil
}
// mark the entire request processing as done
entry.done()
return err
}
// write is used by all things that write outgoing messages, including replies.
// it makes sure that writes are atomic
func (c *Connection) write(ctx context.Context, msg Message) error {
writer := <-c.writer
defer func() { c.writer <- writer }()
n, err := writer.Write(ctx, msg)
event.Metric(ctx, tag.SentBytes.Of(n))
return err
}