Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 1 | // Copyright 2018 The Go Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package jsonrpc2 |
| 6 | |
| 7 | import ( |
| 8 | "context" |
| 9 | "encoding/json" |
| 10 | "fmt" |
| 11 | "io" |
| 12 | "sync/atomic" |
| 13 | |
| 14 | "golang.org/x/exp/event" |
| 15 | errors "golang.org/x/xerrors" |
| 16 | ) |
| 17 | |
| 18 | // Binder builds a connection configuration. |
| 19 | // This may be used in servers to generate a new configuration per connection. |
| 20 | // ConnectionOptions itself implements Binder returning itself unmodified, to |
| 21 | // allow for the simple cases where no per connection information is needed. |
| 22 | type Binder interface { |
| 23 | // Bind is invoked when creating a new connection. |
| 24 | // The connection is not ready to use when Bind is called. |
| 25 | Bind(context.Context, *Connection) (ConnectionOptions, error) |
| 26 | } |
| 27 | |
| 28 | // ConnectionOptions holds the options for new connections. |
| 29 | type ConnectionOptions struct { |
| 30 | // Framer allows control over the message framing and encoding. |
| 31 | // If nil, HeaderFramer will be used. |
| 32 | Framer Framer |
| 33 | // Preempter allows registration of a pre-queue message handler. |
| 34 | // If nil, no messages will be preempted. |
| 35 | Preempter Preempter |
| 36 | // Handler is used as the queued message handler for inbound messages. |
| 37 | // If nil, all responses will be ErrNotHandled. |
| 38 | Handler Handler |
| 39 | } |
| 40 | |
| 41 | // Connection manages the jsonrpc2 protocol, connecting responses back to their |
| 42 | // calls. |
| 43 | // Connection is bidirectional; it does not have a designated server or client |
| 44 | // end. |
| 45 | type Connection struct { |
| 46 | seq int64 // must only be accessed using atomic operations |
| 47 | closer io.Closer |
| 48 | writerBox chan Writer |
| 49 | outgoingBox chan map[ID]chan<- *Response |
| 50 | incomingBox chan map[ID]*incoming |
| 51 | async async |
| 52 | } |
| 53 | |
| 54 | type AsyncCall struct { |
Ian Cottrell | e1f8bae | 2021-06-10 17:26:03 -0400 | [diff] [blame] | 55 | id ID |
| 56 | response chan *Response // the channel a response will be delivered on |
| 57 | resultBox chan asyncResult |
| 58 | ctx context.Context |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 59 | } |
| 60 | |
| 61 | type asyncResult struct { |
| 62 | result []byte |
| 63 | err error |
| 64 | } |
| 65 | |
| 66 | // incoming is used to track an incoming request as it is being handled |
| 67 | type incoming struct { |
Ian Cottrell | e1f8bae | 2021-06-10 17:26:03 -0400 | [diff] [blame] | 68 | request *Request // the request being processed |
| 69 | baseCtx context.Context // a base context for the message processing |
| 70 | handleCtx context.Context // the context for handling the message, child of baseCtx |
| 71 | cancel func() // a function that cancels the handling context |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 72 | } |
| 73 | |
| 74 | // Bind returns the options unmodified. |
| 75 | func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) { |
| 76 | return o, nil |
| 77 | } |
| 78 | |
| 79 | // newConnection creates a new connection and runs it. |
| 80 | // This is used by the Dial and Serve functions to build the actual connection. |
| 81 | func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) { |
| 82 | c := &Connection{ |
| 83 | closer: rwc, |
| 84 | writerBox: make(chan Writer, 1), |
| 85 | outgoingBox: make(chan map[ID]chan<- *Response, 1), |
| 86 | incomingBox: make(chan map[ID]*incoming, 1), |
| 87 | } |
Maxime Soulé | 3640c57 | 2022-10-11 14:25:48 +0000 | [diff] [blame] | 88 | c.async.init() |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 89 | |
| 90 | options, err := binder.Bind(ctx, c) |
| 91 | if err != nil { |
| 92 | return nil, err |
| 93 | } |
| 94 | if options.Framer == nil { |
| 95 | options.Framer = HeaderFramer() |
| 96 | } |
| 97 | if options.Preempter == nil { |
| 98 | options.Preempter = defaultHandler{} |
| 99 | } |
| 100 | if options.Handler == nil { |
| 101 | options.Handler = defaultHandler{} |
| 102 | } |
| 103 | c.outgoingBox <- make(map[ID]chan<- *Response) |
| 104 | c.incomingBox <- make(map[ID]*incoming) |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 105 | // the goroutines started here will continue until the underlying stream is closed |
| 106 | reader := options.Framer.Reader(rwc) |
| 107 | readToQueue := make(chan *incoming) |
| 108 | queueToDeliver := make(chan *incoming) |
| 109 | go c.readIncoming(ctx, reader, readToQueue) |
| 110 | go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver) |
| 111 | go c.deliverMessages(ctx, options.Handler, queueToDeliver) |
| 112 | // releaseing the writer must be the last thing we do in case any requests |
| 113 | // are blocked waiting for the connection to be ready |
| 114 | c.writerBox <- options.Framer.Writer(rwc) |
| 115 | return c, nil |
| 116 | } |
| 117 | |
| 118 | // Notify invokes the target method but does not wait for a response. |
| 119 | // The params will be marshaled to JSON before sending over the wire, and will |
| 120 | // be handed to the method invoked. |
| 121 | func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error { |
| 122 | notify, err := NewNotification(method, params) |
| 123 | if err != nil { |
| 124 | return errors.Errorf("marshaling notify parameters: %v", err) |
| 125 | } |
Ian Cottrell | ca6c8a1 | 2021-06-21 00:44:05 -0400 | [diff] [blame] | 126 | ctx = event.Start(ctx, method, RPCDirection(Outbound)) |
| 127 | Started.Record(ctx, 1, Method(method)) |
Ian Cottrell | e1f8bae | 2021-06-10 17:26:03 -0400 | [diff] [blame] | 128 | var errLabel event.Label |
| 129 | if err = c.write(ctx, notify); err != nil { |
Ian Cottrell | ca6c8a1 | 2021-06-21 00:44:05 -0400 | [diff] [blame] | 130 | errLabel = event.Value("error", err) |
Ian Cottrell | e1f8bae | 2021-06-10 17:26:03 -0400 | [diff] [blame] | 131 | } |
| 132 | Finished.Record(ctx, 1, errLabel) |
| 133 | event.End(ctx) |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 134 | return err |
| 135 | } |
| 136 | |
| 137 | // Call invokes the target method and returns an object that can be used to await the response. |
| 138 | // The params will be marshaled to JSON before sending over the wire, and will |
| 139 | // be handed to the method invoked. |
| 140 | // You do not have to wait for the response, it can just be ignored if not needed. |
| 141 | // If sending the call failed, the response will be ready and have the error in it. |
| 142 | func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall { |
| 143 | result := &AsyncCall{ |
| 144 | id: Int64ID(atomic.AddInt64(&c.seq, 1)), |
| 145 | resultBox: make(chan asyncResult, 1), |
| 146 | } |
Maxime Soulé | 7b3493d | 2023-08-08 11:06:49 +0200 | [diff] [blame] | 147 | // TODO: rewrite this using the new target/prototype stuff |
Ian Cottrell | e1f8bae | 2021-06-10 17:26:03 -0400 | [diff] [blame] | 148 | ctx = event.Start(ctx, method, |
Ian Cottrell | ca6c8a1 | 2021-06-21 00:44:05 -0400 | [diff] [blame] | 149 | Method(method), RPCDirection(Outbound), RPCID(fmt.Sprintf("%q", result.id))) |
| 150 | Started.Record(ctx, 1, Method(method)) |
Ian Cottrell | e1f8bae | 2021-06-10 17:26:03 -0400 | [diff] [blame] | 151 | result.ctx = ctx |
Maxime Soulé | 7b3493d | 2023-08-08 11:06:49 +0200 | [diff] [blame] | 152 | // generate a new request identifier |
| 153 | call, err := NewCall(result.id, method, params) |
| 154 | if err != nil { |
| 155 | // set the result to failed |
| 156 | result.resultBox <- asyncResult{err: errors.Errorf("marshaling call parameters: %w", err)} |
| 157 | return result |
| 158 | } |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 159 | // We have to add ourselves to the pending map before we send, otherwise we |
| 160 | // are racing the response. |
| 161 | // rchan is buffered in case the response arrives without a listener. |
| 162 | result.response = make(chan *Response, 1) |
| 163 | pending := <-c.outgoingBox |
| 164 | pending[result.id] = result.response |
| 165 | c.outgoingBox <- pending |
| 166 | // now we are ready to send |
| 167 | if err := c.write(ctx, call); err != nil { |
| 168 | // sending failed, we will never get a response, so deliver a fake one |
| 169 | r, _ := NewResponse(result.id, nil, err) |
| 170 | c.incomingResponse(r) |
| 171 | } |
| 172 | return result |
| 173 | } |
| 174 | |
| 175 | // ID used for this call. |
| 176 | // This can be used to cancel the call if needed. |
| 177 | func (a *AsyncCall) ID() ID { return a.id } |
| 178 | |
| 179 | // IsReady can be used to check if the result is already prepared. |
| 180 | // This is guaranteed to return true on a result for which Await has already |
| 181 | // returned, or a call that failed to send in the first place. |
| 182 | func (a *AsyncCall) IsReady() bool { |
| 183 | select { |
| 184 | case r := <-a.resultBox: |
| 185 | a.resultBox <- r |
| 186 | return true |
| 187 | default: |
| 188 | return false |
| 189 | } |
| 190 | } |
| 191 | |
| 192 | // Await the results of a Call. |
| 193 | // The response will be unmarshaled from JSON into the result. |
| 194 | func (a *AsyncCall) Await(ctx context.Context, result interface{}) error { |
| 195 | status := "NONE" |
Ian Cottrell | ca6c8a1 | 2021-06-21 00:44:05 -0400 | [diff] [blame] | 196 | defer event.End(a.ctx, StatusCode(status)) |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 197 | var r asyncResult |
| 198 | select { |
| 199 | case response := <-a.response: |
| 200 | // response just arrived, prepare the result |
| 201 | switch { |
| 202 | case response.Error != nil: |
| 203 | r.err = response.Error |
| 204 | status = "ERROR" |
| 205 | default: |
| 206 | r.result = response.Result |
| 207 | status = "OK" |
| 208 | } |
| 209 | case r = <-a.resultBox: |
| 210 | // result already available |
| 211 | case <-ctx.Done(): |
| 212 | status = "CANCELLED" |
| 213 | return ctx.Err() |
| 214 | } |
| 215 | // refill the box for the next caller |
| 216 | a.resultBox <- r |
| 217 | // and unpack the result |
| 218 | if r.err != nil { |
| 219 | return r.err |
| 220 | } |
| 221 | if result == nil || len(r.result) == 0 { |
| 222 | return nil |
| 223 | } |
| 224 | return json.Unmarshal(r.result, result) |
| 225 | } |
| 226 | |
| 227 | // Respond deliverers a response to an incoming Call. |
| 228 | // It is an error to not call this exactly once for any message for which a |
| 229 | // handler has previously returned ErrAsyncResponse. It is also an error to |
| 230 | // call this for any other message. |
| 231 | func (c *Connection) Respond(id ID, result interface{}, rerr error) error { |
| 232 | pending := <-c.incomingBox |
| 233 | defer func() { c.incomingBox <- pending }() |
| 234 | entry, found := pending[id] |
| 235 | if !found { |
| 236 | return nil |
| 237 | } |
| 238 | delete(pending, id) |
| 239 | return c.respond(entry, result, rerr) |
| 240 | } |
| 241 | |
| 242 | // Cancel is used to cancel an inbound message by ID, it does not cancel |
| 243 | // outgoing messages. |
| 244 | // This is only used inside a message handler that is layering a |
| 245 | // cancellation protocol on top of JSON RPC 2. |
| 246 | // It will not complain if the ID is not a currently active message, and it will |
| 247 | // not cause any messages that have not arrived yet with that ID to be |
| 248 | // cancelled. |
| 249 | func (c *Connection) Cancel(id ID) { |
| 250 | pending := <-c.incomingBox |
| 251 | defer func() { c.incomingBox <- pending }() |
| 252 | if entry, found := pending[id]; found && entry.cancel != nil { |
| 253 | entry.cancel() |
| 254 | entry.cancel = nil |
| 255 | } |
| 256 | } |
| 257 | |
| 258 | // Wait blocks until the connection is fully closed, but does not close it. |
| 259 | func (c *Connection) Wait() error { |
| 260 | return c.async.wait() |
| 261 | } |
| 262 | |
| 263 | // Close can be used to close the underlying stream, and then wait for the connection to |
| 264 | // fully shut down. |
| 265 | // This does not cancel in flight requests, but waits for them to gracefully complete. |
| 266 | func (c *Connection) Close() error { |
| 267 | // close the underlying stream |
| 268 | if err := c.closer.Close(); err != nil && !isClosingError(err) { |
| 269 | return err |
| 270 | } |
| 271 | // and then wait for it to cause the connection to close |
| 272 | if err := c.Wait(); err != nil && !isClosingError(err) { |
| 273 | return err |
| 274 | } |
| 275 | return nil |
| 276 | } |
| 277 | |
| 278 | // readIncoming collects inbound messages from the reader and delivers them, either responding |
| 279 | // to outgoing calls or feeding requests to the queue. |
| 280 | func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) { |
| 281 | defer close(toQueue) |
| 282 | for { |
| 283 | // get the next message |
| 284 | // no lock is needed, this is the only reader |
| 285 | msg, n, err := reader.Read(ctx) |
| 286 | if err != nil { |
| 287 | // The stream failed, we cannot continue |
| 288 | c.async.setError(err) |
| 289 | return |
| 290 | } |
| 291 | switch msg := msg.(type) { |
| 292 | case *Request: |
| 293 | entry := &incoming{ |
| 294 | request: msg, |
| 295 | } |
| 296 | // add a span to the context for this request |
Ian Cottrell | e1f8bae | 2021-06-10 17:26:03 -0400 | [diff] [blame] | 297 | var idLabel event.Label |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 298 | if msg.IsCall() { |
Ian Cottrell | ca6c8a1 | 2021-06-21 00:44:05 -0400 | [diff] [blame] | 299 | idLabel = RPCID(fmt.Sprintf("%q", msg.ID)) |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 300 | } |
Ian Cottrell | e1f8bae | 2021-06-10 17:26:03 -0400 | [diff] [blame] | 301 | entry.baseCtx = event.Start(ctx, msg.Method, |
Ian Cottrell | ca6c8a1 | 2021-06-21 00:44:05 -0400 | [diff] [blame] | 302 | Method(msg.Method), RPCDirection(Inbound), idLabel) |
| 303 | Started.Record(entry.baseCtx, 1, Method(msg.Method)) |
| 304 | ReceivedBytes.Record(entry.baseCtx, n, Method(msg.Method)) |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 305 | // in theory notifications cannot be cancelled, but we build them a cancel context anyway |
| 306 | entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx) |
| 307 | // if the request is a call, add it to the incoming map so it can be |
| 308 | // cancelled by id |
| 309 | if msg.IsCall() { |
| 310 | pending := <-c.incomingBox |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 311 | pending[msg.ID] = entry |
Maxime Soulé | 3a778c5 | 2022-09-30 13:05:53 +0000 | [diff] [blame] | 312 | c.incomingBox <- pending |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 313 | } |
| 314 | // send the message to the incoming queue |
| 315 | toQueue <- entry |
| 316 | case *Response: |
| 317 | // If method is not set, this should be a response, in which case we must |
| 318 | // have an id to send the response back to the caller. |
| 319 | c.incomingResponse(msg) |
| 320 | } |
| 321 | } |
| 322 | } |
| 323 | |
| 324 | func (c *Connection) incomingResponse(msg *Response) { |
| 325 | pending := <-c.outgoingBox |
| 326 | response, ok := pending[msg.ID] |
| 327 | if ok { |
| 328 | delete(pending, msg.ID) |
| 329 | } |
| 330 | c.outgoingBox <- pending |
| 331 | if response != nil { |
| 332 | response <- msg |
| 333 | } |
| 334 | } |
| 335 | |
cuishuang | a2e15db | 2022-03-03 12:53:37 +0000 | [diff] [blame] | 336 | // manageQueue reads incoming requests, attempts to process them with the preempter, or queue them |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 337 | // up for normal handling. |
| 338 | func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) { |
| 339 | defer close(toDeliver) |
| 340 | q := []*incoming{} |
| 341 | ok := true |
| 342 | for { |
| 343 | var nextReq *incoming |
| 344 | if len(q) == 0 { |
| 345 | // no messages in the queue |
| 346 | // if we were closing, then we are done |
| 347 | if !ok { |
| 348 | return |
| 349 | } |
| 350 | // not closing, but nothing in the queue, so just block waiting for a read |
| 351 | nextReq, ok = <-fromRead |
| 352 | } else { |
| 353 | // we have a non empty queue, so pick whichever of reading or delivering |
| 354 | // that we can make progress on |
| 355 | select { |
| 356 | case nextReq, ok = <-fromRead: |
| 357 | case toDeliver <- q[0]: |
Maxime Soulé | 7b3493d | 2023-08-08 11:06:49 +0200 | [diff] [blame] | 358 | // TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction? |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 359 | q = q[1:] |
| 360 | } |
| 361 | } |
| 362 | if nextReq != nil { |
| 363 | // TODO: should we allow to limit the queue size? |
| 364 | var result interface{} |
| 365 | rerr := nextReq.handleCtx.Err() |
| 366 | if rerr == nil { |
| 367 | // only preempt if not already cancelled |
| 368 | result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request) |
| 369 | } |
| 370 | switch { |
| 371 | case rerr == ErrNotHandled: |
| 372 | // message not handled, add it to the queue for the main handler |
| 373 | q = append(q, nextReq) |
| 374 | case rerr == ErrAsyncResponse: |
| 375 | // message handled but the response will come later |
| 376 | default: |
| 377 | // anything else means the message is fully handled |
| 378 | c.reply(nextReq, result, rerr) |
| 379 | } |
| 380 | } |
| 381 | } |
| 382 | } |
| 383 | |
| 384 | func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) { |
| 385 | defer c.async.done() |
| 386 | for entry := range fromQueue { |
| 387 | // cancel any messages in the queue that we have a pending cancel for |
| 388 | var result interface{} |
| 389 | rerr := entry.handleCtx.Err() |
| 390 | if rerr == nil { |
| 391 | // only deliver if not already cancelled |
| 392 | result, rerr = handler.Handle(entry.handleCtx, entry.request) |
| 393 | } |
| 394 | switch { |
| 395 | case rerr == ErrNotHandled: |
| 396 | // message not handled, report it back to the caller as an error |
| 397 | c.reply(entry, nil, errors.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method)) |
| 398 | case rerr == ErrAsyncResponse: |
| 399 | // message handled but the response will come later |
| 400 | default: |
| 401 | c.reply(entry, result, rerr) |
| 402 | } |
| 403 | } |
| 404 | } |
| 405 | |
| 406 | // reply is used to reply to an incoming request that has just been handled |
| 407 | func (c *Connection) reply(entry *incoming, result interface{}, rerr error) { |
| 408 | if entry.request.IsCall() { |
| 409 | // we have a call finishing, remove it from the incoming map |
| 410 | pending := <-c.incomingBox |
| 411 | defer func() { c.incomingBox <- pending }() |
| 412 | delete(pending, entry.request.ID) |
| 413 | } |
| 414 | if err := c.respond(entry, result, rerr); err != nil { |
| 415 | // no way to propagate this error |
Maxime Soulé | 7b3493d | 2023-08-08 11:06:49 +0200 | [diff] [blame] | 416 | // TODO: should we do more than just log it? |
Ian Cottrell | ab7525a | 2021-06-24 09:52:57 -0400 | [diff] [blame] | 417 | event.Error(entry.baseCtx, "jsonrpc2 message delivery failed", err) |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 418 | } |
| 419 | } |
| 420 | |
| 421 | // respond sends a response. |
| 422 | // This is the code shared between reply and SendResponse. |
| 423 | func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error { |
| 424 | var err error |
| 425 | if entry.request.IsCall() { |
| 426 | // send the response |
| 427 | if result == nil && rerr == nil { |
| 428 | // call with no response, send an error anyway |
| 429 | rerr = errors.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method) |
| 430 | } |
| 431 | var response *Response |
| 432 | response, err = NewResponse(entry.request.ID, result, rerr) |
| 433 | if err == nil { |
| 434 | // we write the response with the base context, in case the message was cancelled |
| 435 | err = c.write(entry.baseCtx, response) |
| 436 | } |
| 437 | } else { |
| 438 | switch { |
| 439 | case rerr != nil: |
| 440 | // notification failed |
| 441 | err = errors.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr) |
| 442 | rerr = nil |
| 443 | case result != nil: |
Maxime Soulé | 7b3493d | 2023-08-08 11:06:49 +0200 | [diff] [blame] | 444 | // notification produced a response, which is an error |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 445 | err = errors.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method) |
| 446 | default: |
| 447 | // normal notification finish |
| 448 | } |
| 449 | } |
| 450 | var status string |
| 451 | switch { |
| 452 | case rerr != nil || err != nil: |
| 453 | status = "ERROR" |
| 454 | default: |
| 455 | status = "OK" |
| 456 | } |
| 457 | // and just to be clean, invoke and clear the cancel if needed |
| 458 | if entry.cancel != nil { |
| 459 | entry.cancel() |
| 460 | entry.cancel = nil |
| 461 | } |
| 462 | // mark the entire request processing as done |
Ian Cottrell | ca6c8a1 | 2021-06-21 00:44:05 -0400 | [diff] [blame] | 463 | event.End(entry.baseCtx, StatusCode(status)) |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 464 | return err |
| 465 | } |
| 466 | |
| 467 | // write is used by all things that write outgoing messages, including replies. |
| 468 | // it makes sure that writes are atomic |
| 469 | func (c *Connection) write(ctx context.Context, msg Message) error { |
| 470 | writer := <-c.writerBox |
| 471 | defer func() { c.writerBox <- writer }() |
| 472 | n, err := writer.Write(ctx, msg) |
| 473 | // TODO: get a method label in here somehow. |
Ian Cottrell | e1f8bae | 2021-06-10 17:26:03 -0400 | [diff] [blame] | 474 | SentBytes.Record(ctx, n) |
Jonathan Amsterdam | f956177 | 2021-06-10 11:31:29 -0400 | [diff] [blame] | 475 | return err |
| 476 | } |