blob: f0e527cc123dac0351c41abd987bbe163425cfcf [file] [log] [blame]
Jonathan Amsterdamf9561772021-06-10 11:31:29 -04001// Copyright 2018 The Go Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package jsonrpc2
6
7import (
8 "context"
9 "encoding/json"
10 "fmt"
11 "io"
12 "sync/atomic"
13
14 "golang.org/x/exp/event"
15 errors "golang.org/x/xerrors"
16)
17
18// Binder builds a connection configuration.
19// This may be used in servers to generate a new configuration per connection.
20// ConnectionOptions itself implements Binder returning itself unmodified, to
21// allow for the simple cases where no per connection information is needed.
22type Binder interface {
23 // Bind is invoked when creating a new connection.
24 // The connection is not ready to use when Bind is called.
25 Bind(context.Context, *Connection) (ConnectionOptions, error)
26}
27
28// ConnectionOptions holds the options for new connections.
29type ConnectionOptions struct {
30 // Framer allows control over the message framing and encoding.
31 // If nil, HeaderFramer will be used.
32 Framer Framer
33 // Preempter allows registration of a pre-queue message handler.
34 // If nil, no messages will be preempted.
35 Preempter Preempter
36 // Handler is used as the queued message handler for inbound messages.
37 // If nil, all responses will be ErrNotHandled.
38 Handler Handler
39}
40
41// Connection manages the jsonrpc2 protocol, connecting responses back to their
42// calls.
43// Connection is bidirectional; it does not have a designated server or client
44// end.
45type Connection struct {
46 seq int64 // must only be accessed using atomic operations
47 closer io.Closer
48 writerBox chan Writer
49 outgoingBox chan map[ID]chan<- *Response
50 incomingBox chan map[ID]*incoming
51 async async
52}
53
54type AsyncCall struct {
Ian Cottrelle1f8bae2021-06-10 17:26:03 -040055 id ID
56 response chan *Response // the channel a response will be delivered on
57 resultBox chan asyncResult
58 ctx context.Context
Jonathan Amsterdamf9561772021-06-10 11:31:29 -040059}
60
61type asyncResult struct {
62 result []byte
63 err error
64}
65
66// incoming is used to track an incoming request as it is being handled
67type incoming struct {
Ian Cottrelle1f8bae2021-06-10 17:26:03 -040068 request *Request // the request being processed
69 baseCtx context.Context // a base context for the message processing
70 handleCtx context.Context // the context for handling the message, child of baseCtx
71 cancel func() // a function that cancels the handling context
Jonathan Amsterdamf9561772021-06-10 11:31:29 -040072}
73
74// Bind returns the options unmodified.
75func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) {
76 return o, nil
77}
78
79// newConnection creates a new connection and runs it.
80// This is used by the Dial and Serve functions to build the actual connection.
81func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) {
82 c := &Connection{
83 closer: rwc,
84 writerBox: make(chan Writer, 1),
85 outgoingBox: make(chan map[ID]chan<- *Response, 1),
86 incomingBox: make(chan map[ID]*incoming, 1),
87 }
Maxime Soulé3640c572022-10-11 14:25:48 +000088 c.async.init()
Jonathan Amsterdamf9561772021-06-10 11:31:29 -040089
90 options, err := binder.Bind(ctx, c)
91 if err != nil {
92 return nil, err
93 }
94 if options.Framer == nil {
95 options.Framer = HeaderFramer()
96 }
97 if options.Preempter == nil {
98 options.Preempter = defaultHandler{}
99 }
100 if options.Handler == nil {
101 options.Handler = defaultHandler{}
102 }
103 c.outgoingBox <- make(map[ID]chan<- *Response)
104 c.incomingBox <- make(map[ID]*incoming)
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400105 // the goroutines started here will continue until the underlying stream is closed
106 reader := options.Framer.Reader(rwc)
107 readToQueue := make(chan *incoming)
108 queueToDeliver := make(chan *incoming)
109 go c.readIncoming(ctx, reader, readToQueue)
110 go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver)
111 go c.deliverMessages(ctx, options.Handler, queueToDeliver)
112 // releaseing the writer must be the last thing we do in case any requests
113 // are blocked waiting for the connection to be ready
114 c.writerBox <- options.Framer.Writer(rwc)
115 return c, nil
116}
117
118// Notify invokes the target method but does not wait for a response.
119// The params will be marshaled to JSON before sending over the wire, and will
120// be handed to the method invoked.
121func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error {
122 notify, err := NewNotification(method, params)
123 if err != nil {
124 return errors.Errorf("marshaling notify parameters: %v", err)
125 }
Ian Cottrellca6c8a12021-06-21 00:44:05 -0400126 ctx = event.Start(ctx, method, RPCDirection(Outbound))
127 Started.Record(ctx, 1, Method(method))
Ian Cottrelle1f8bae2021-06-10 17:26:03 -0400128 var errLabel event.Label
129 if err = c.write(ctx, notify); err != nil {
Ian Cottrellca6c8a12021-06-21 00:44:05 -0400130 errLabel = event.Value("error", err)
Ian Cottrelle1f8bae2021-06-10 17:26:03 -0400131 }
132 Finished.Record(ctx, 1, errLabel)
133 event.End(ctx)
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400134 return err
135}
136
137// Call invokes the target method and returns an object that can be used to await the response.
138// The params will be marshaled to JSON before sending over the wire, and will
139// be handed to the method invoked.
140// You do not have to wait for the response, it can just be ignored if not needed.
141// If sending the call failed, the response will be ready and have the error in it.
142func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall {
143 result := &AsyncCall{
144 id: Int64ID(atomic.AddInt64(&c.seq, 1)),
145 resultBox: make(chan asyncResult, 1),
146 }
Maxime Soulé7b3493d2023-08-08 11:06:49 +0200147 // TODO: rewrite this using the new target/prototype stuff
Ian Cottrelle1f8bae2021-06-10 17:26:03 -0400148 ctx = event.Start(ctx, method,
Ian Cottrellca6c8a12021-06-21 00:44:05 -0400149 Method(method), RPCDirection(Outbound), RPCID(fmt.Sprintf("%q", result.id)))
150 Started.Record(ctx, 1, Method(method))
Ian Cottrelle1f8bae2021-06-10 17:26:03 -0400151 result.ctx = ctx
Maxime Soulé7b3493d2023-08-08 11:06:49 +0200152 // generate a new request identifier
153 call, err := NewCall(result.id, method, params)
154 if err != nil {
155 // set the result to failed
156 result.resultBox <- asyncResult{err: errors.Errorf("marshaling call parameters: %w", err)}
157 return result
158 }
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400159 // We have to add ourselves to the pending map before we send, otherwise we
160 // are racing the response.
161 // rchan is buffered in case the response arrives without a listener.
162 result.response = make(chan *Response, 1)
163 pending := <-c.outgoingBox
164 pending[result.id] = result.response
165 c.outgoingBox <- pending
166 // now we are ready to send
167 if err := c.write(ctx, call); err != nil {
168 // sending failed, we will never get a response, so deliver a fake one
169 r, _ := NewResponse(result.id, nil, err)
170 c.incomingResponse(r)
171 }
172 return result
173}
174
175// ID used for this call.
176// This can be used to cancel the call if needed.
177func (a *AsyncCall) ID() ID { return a.id }
178
179// IsReady can be used to check if the result is already prepared.
180// This is guaranteed to return true on a result for which Await has already
181// returned, or a call that failed to send in the first place.
182func (a *AsyncCall) IsReady() bool {
183 select {
184 case r := <-a.resultBox:
185 a.resultBox <- r
186 return true
187 default:
188 return false
189 }
190}
191
192// Await the results of a Call.
193// The response will be unmarshaled from JSON into the result.
194func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
195 status := "NONE"
Ian Cottrellca6c8a12021-06-21 00:44:05 -0400196 defer event.End(a.ctx, StatusCode(status))
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400197 var r asyncResult
198 select {
199 case response := <-a.response:
200 // response just arrived, prepare the result
201 switch {
202 case response.Error != nil:
203 r.err = response.Error
204 status = "ERROR"
205 default:
206 r.result = response.Result
207 status = "OK"
208 }
209 case r = <-a.resultBox:
210 // result already available
211 case <-ctx.Done():
212 status = "CANCELLED"
213 return ctx.Err()
214 }
215 // refill the box for the next caller
216 a.resultBox <- r
217 // and unpack the result
218 if r.err != nil {
219 return r.err
220 }
221 if result == nil || len(r.result) == 0 {
222 return nil
223 }
224 return json.Unmarshal(r.result, result)
225}
226
227// Respond deliverers a response to an incoming Call.
228// It is an error to not call this exactly once for any message for which a
229// handler has previously returned ErrAsyncResponse. It is also an error to
230// call this for any other message.
231func (c *Connection) Respond(id ID, result interface{}, rerr error) error {
232 pending := <-c.incomingBox
233 defer func() { c.incomingBox <- pending }()
234 entry, found := pending[id]
235 if !found {
236 return nil
237 }
238 delete(pending, id)
239 return c.respond(entry, result, rerr)
240}
241
242// Cancel is used to cancel an inbound message by ID, it does not cancel
243// outgoing messages.
244// This is only used inside a message handler that is layering a
245// cancellation protocol on top of JSON RPC 2.
246// It will not complain if the ID is not a currently active message, and it will
247// not cause any messages that have not arrived yet with that ID to be
248// cancelled.
249func (c *Connection) Cancel(id ID) {
250 pending := <-c.incomingBox
251 defer func() { c.incomingBox <- pending }()
252 if entry, found := pending[id]; found && entry.cancel != nil {
253 entry.cancel()
254 entry.cancel = nil
255 }
256}
257
258// Wait blocks until the connection is fully closed, but does not close it.
259func (c *Connection) Wait() error {
260 return c.async.wait()
261}
262
263// Close can be used to close the underlying stream, and then wait for the connection to
264// fully shut down.
265// This does not cancel in flight requests, but waits for them to gracefully complete.
266func (c *Connection) Close() error {
267 // close the underlying stream
268 if err := c.closer.Close(); err != nil && !isClosingError(err) {
269 return err
270 }
271 // and then wait for it to cause the connection to close
272 if err := c.Wait(); err != nil && !isClosingError(err) {
273 return err
274 }
275 return nil
276}
277
278// readIncoming collects inbound messages from the reader and delivers them, either responding
279// to outgoing calls or feeding requests to the queue.
280func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) {
281 defer close(toQueue)
282 for {
283 // get the next message
284 // no lock is needed, this is the only reader
285 msg, n, err := reader.Read(ctx)
286 if err != nil {
287 // The stream failed, we cannot continue
288 c.async.setError(err)
289 return
290 }
291 switch msg := msg.(type) {
292 case *Request:
293 entry := &incoming{
294 request: msg,
295 }
296 // add a span to the context for this request
Ian Cottrelle1f8bae2021-06-10 17:26:03 -0400297 var idLabel event.Label
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400298 if msg.IsCall() {
Ian Cottrellca6c8a12021-06-21 00:44:05 -0400299 idLabel = RPCID(fmt.Sprintf("%q", msg.ID))
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400300 }
Ian Cottrelle1f8bae2021-06-10 17:26:03 -0400301 entry.baseCtx = event.Start(ctx, msg.Method,
Ian Cottrellca6c8a12021-06-21 00:44:05 -0400302 Method(msg.Method), RPCDirection(Inbound), idLabel)
303 Started.Record(entry.baseCtx, 1, Method(msg.Method))
304 ReceivedBytes.Record(entry.baseCtx, n, Method(msg.Method))
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400305 // in theory notifications cannot be cancelled, but we build them a cancel context anyway
306 entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx)
307 // if the request is a call, add it to the incoming map so it can be
308 // cancelled by id
309 if msg.IsCall() {
310 pending := <-c.incomingBox
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400311 pending[msg.ID] = entry
Maxime Soulé3a778c52022-09-30 13:05:53 +0000312 c.incomingBox <- pending
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400313 }
314 // send the message to the incoming queue
315 toQueue <- entry
316 case *Response:
317 // If method is not set, this should be a response, in which case we must
318 // have an id to send the response back to the caller.
319 c.incomingResponse(msg)
320 }
321 }
322}
323
324func (c *Connection) incomingResponse(msg *Response) {
325 pending := <-c.outgoingBox
326 response, ok := pending[msg.ID]
327 if ok {
328 delete(pending, msg.ID)
329 }
330 c.outgoingBox <- pending
331 if response != nil {
332 response <- msg
333 }
334}
335
cuishuanga2e15db2022-03-03 12:53:37 +0000336// manageQueue reads incoming requests, attempts to process them with the preempter, or queue them
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400337// up for normal handling.
338func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) {
339 defer close(toDeliver)
340 q := []*incoming{}
341 ok := true
342 for {
343 var nextReq *incoming
344 if len(q) == 0 {
345 // no messages in the queue
346 // if we were closing, then we are done
347 if !ok {
348 return
349 }
350 // not closing, but nothing in the queue, so just block waiting for a read
351 nextReq, ok = <-fromRead
352 } else {
353 // we have a non empty queue, so pick whichever of reading or delivering
354 // that we can make progress on
355 select {
356 case nextReq, ok = <-fromRead:
357 case toDeliver <- q[0]:
Maxime Soulé7b3493d2023-08-08 11:06:49 +0200358 // TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction?
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400359 q = q[1:]
360 }
361 }
362 if nextReq != nil {
363 // TODO: should we allow to limit the queue size?
364 var result interface{}
365 rerr := nextReq.handleCtx.Err()
366 if rerr == nil {
367 // only preempt if not already cancelled
368 result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request)
369 }
370 switch {
371 case rerr == ErrNotHandled:
372 // message not handled, add it to the queue for the main handler
373 q = append(q, nextReq)
374 case rerr == ErrAsyncResponse:
375 // message handled but the response will come later
376 default:
377 // anything else means the message is fully handled
378 c.reply(nextReq, result, rerr)
379 }
380 }
381 }
382}
383
384func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) {
385 defer c.async.done()
386 for entry := range fromQueue {
387 // cancel any messages in the queue that we have a pending cancel for
388 var result interface{}
389 rerr := entry.handleCtx.Err()
390 if rerr == nil {
391 // only deliver if not already cancelled
392 result, rerr = handler.Handle(entry.handleCtx, entry.request)
393 }
394 switch {
395 case rerr == ErrNotHandled:
396 // message not handled, report it back to the caller as an error
397 c.reply(entry, nil, errors.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method))
398 case rerr == ErrAsyncResponse:
399 // message handled but the response will come later
400 default:
401 c.reply(entry, result, rerr)
402 }
403 }
404}
405
406// reply is used to reply to an incoming request that has just been handled
407func (c *Connection) reply(entry *incoming, result interface{}, rerr error) {
408 if entry.request.IsCall() {
409 // we have a call finishing, remove it from the incoming map
410 pending := <-c.incomingBox
411 defer func() { c.incomingBox <- pending }()
412 delete(pending, entry.request.ID)
413 }
414 if err := c.respond(entry, result, rerr); err != nil {
415 // no way to propagate this error
Maxime Soulé7b3493d2023-08-08 11:06:49 +0200416 // TODO: should we do more than just log it?
Ian Cottrellab7525a2021-06-24 09:52:57 -0400417 event.Error(entry.baseCtx, "jsonrpc2 message delivery failed", err)
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400418 }
419}
420
421// respond sends a response.
422// This is the code shared between reply and SendResponse.
423func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error {
424 var err error
425 if entry.request.IsCall() {
426 // send the response
427 if result == nil && rerr == nil {
428 // call with no response, send an error anyway
429 rerr = errors.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method)
430 }
431 var response *Response
432 response, err = NewResponse(entry.request.ID, result, rerr)
433 if err == nil {
434 // we write the response with the base context, in case the message was cancelled
435 err = c.write(entry.baseCtx, response)
436 }
437 } else {
438 switch {
439 case rerr != nil:
440 // notification failed
441 err = errors.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr)
442 rerr = nil
443 case result != nil:
Maxime Soulé7b3493d2023-08-08 11:06:49 +0200444 // notification produced a response, which is an error
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400445 err = errors.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method)
446 default:
447 // normal notification finish
448 }
449 }
450 var status string
451 switch {
452 case rerr != nil || err != nil:
453 status = "ERROR"
454 default:
455 status = "OK"
456 }
457 // and just to be clean, invoke and clear the cancel if needed
458 if entry.cancel != nil {
459 entry.cancel()
460 entry.cancel = nil
461 }
462 // mark the entire request processing as done
Ian Cottrellca6c8a12021-06-21 00:44:05 -0400463 event.End(entry.baseCtx, StatusCode(status))
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400464 return err
465}
466
467// write is used by all things that write outgoing messages, including replies.
468// it makes sure that writes are atomic
469func (c *Connection) write(ctx context.Context, msg Message) error {
470 writer := <-c.writerBox
471 defer func() { c.writerBox <- writer }()
472 n, err := writer.Write(ctx, msg)
473 // TODO: get a method label in here somehow.
Ian Cottrelle1f8bae2021-06-10 17:26:03 -0400474 SentBytes.Record(ctx, n)
Jonathan Amsterdamf9561772021-06-10 11:31:29 -0400475 return err
476}