internal/jsonrpc2_v2: an updated jsonrpc2 library
Change-Id: I609173baa6842d33068a7e9596d54f03d89c5401
Reviewed-on: https://go-review.googlesource.com/c/tools/+/292169
Run-TryBot: Ian Cottrell <iancottrell@google.com>
gopls-CI: kokoro <noreply+kokoro@google.com>
TryBot-Result: Go Bot <gobot@golang.org>
Trust: Ian Cottrell <iancottrell@google.com>
Reviewed-by: Robert Findley <rfindley@google.com>
diff --git a/internal/jsonrpc2_v2/conn.go b/internal/jsonrpc2_v2/conn.go
new file mode 100644
index 0000000..6d92c0c
--- /dev/null
+++ b/internal/jsonrpc2_v2/conn.go
@@ -0,0 +1,486 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "sync/atomic"
+
+ "golang.org/x/tools/internal/event"
+ "golang.org/x/tools/internal/event/label"
+ "golang.org/x/tools/internal/lsp/debug/tag"
+ errors "golang.org/x/xerrors"
+)
+
+// Binder builds a connection configuration.
+// This may be used in servers to generate a new configuration per connection.
+// ConnectionOptions itself implements Binder returning itself unmodified, to
+// allow for the simple cases where no per connection information is needed.
+type Binder interface {
+ // Bind is invoked when creating a new connection.
+ // The connection is not ready to use when Bind is called.
+ Bind(context.Context, *Connection) (ConnectionOptions, error)
+}
+
+// ConnectionOptions holds the options for new connections.
+type ConnectionOptions struct {
+ // Framer allows control over the message framing and encoding.
+ // If nil, HeaderFramer will be used.
+ Framer Framer
+ // Preempter allows registration of a pre-queue message handler.
+ // If nil, no messages will be preempted.
+ Preempter Preempter
+ // Handler is used as the queued message handler for inbound messages.
+ // If nil, all responses will be ErrNotHandled.
+ Handler Handler
+}
+
+// Connection manages the jsonrpc2 protocol, connecting responses back to their
+// calls.
+// Connection is bidirectional; it does not have a designated server or client
+// end.
+type Connection struct {
+ seq int64 // must only be accessed using atomic operations
+ closer io.Closer
+ writerBox chan Writer
+ outgoingBox chan map[ID]chan<- *Response
+ incomingBox chan map[ID]*incoming
+ async async
+}
+
+type AsyncCall struct {
+ id ID
+ response chan *Response // the channel a response will be delivered on
+ resultBox chan asyncResult
+ endSpan func() // close the tracing span when all processing for the message is complete
+}
+
+type asyncResult struct {
+ result []byte
+ err error
+}
+
+// incoming is used to track an incoming request as it is being handled
+type incoming struct {
+ request *Request // the request being processed
+ baseCtx context.Context // a base context for the message processing
+ done func() // a function called when all processing for the message is complete
+ handleCtx context.Context // the context for handling the message, child of baseCtx
+ cancel func() // a function that cancels the handling context
+}
+
+// Bind returns the options unmodified.
+func (o ConnectionOptions) Bind(context.Context, *Connection) (ConnectionOptions, error) {
+ return o, nil
+}
+
+// newConnection creates a new connection and runs it.
+// This is used by the Dial and Serve functions to build the actual connection.
+func newConnection(ctx context.Context, rwc io.ReadWriteCloser, binder Binder) (*Connection, error) {
+ c := &Connection{
+ closer: rwc,
+ writerBox: make(chan Writer, 1),
+ outgoingBox: make(chan map[ID]chan<- *Response, 1),
+ incomingBox: make(chan map[ID]*incoming, 1),
+ }
+
+ options, err := binder.Bind(ctx, c)
+ if err != nil {
+ return nil, err
+ }
+ if options.Framer == nil {
+ options.Framer = HeaderFramer()
+ }
+ if options.Preempter == nil {
+ options.Preempter = defaultHandler{}
+ }
+ if options.Handler == nil {
+ options.Handler = defaultHandler{}
+ }
+ c.outgoingBox <- make(map[ID]chan<- *Response)
+ c.incomingBox <- make(map[ID]*incoming)
+ c.async.init()
+ // the goroutines started here will continue until the underlying stream is closed
+ reader := options.Framer.Reader(rwc)
+ readToQueue := make(chan *incoming)
+ queueToDeliver := make(chan *incoming)
+ go c.readIncoming(ctx, reader, readToQueue)
+ go c.manageQueue(ctx, options.Preempter, readToQueue, queueToDeliver)
+ go c.deliverMessages(ctx, options.Handler, queueToDeliver)
+ // releaseing the writer must be the last thing we do in case any requests
+ // are blocked waiting for the connection to be ready
+ c.writerBox <- options.Framer.Writer(rwc)
+ return c, nil
+}
+
+// Notify invokes the target method but does not wait for a response.
+// The params will be marshaled to JSON before sending over the wire, and will
+// be handed to the method invoked.
+func (c *Connection) Notify(ctx context.Context, method string, params interface{}) error {
+ notify, err := NewNotification(method, params)
+ if err != nil {
+ return errors.Errorf("marshaling notify parameters: %v", err)
+ }
+ ctx, done := event.Start(ctx, method,
+ tag.Method.Of(method),
+ tag.RPCDirection.Of(tag.Outbound),
+ )
+ event.Metric(ctx, tag.Started.Of(1))
+ err = c.write(ctx, notify)
+ switch {
+ case err != nil:
+ event.Label(ctx, tag.StatusCode.Of("ERROR"))
+ default:
+ event.Label(ctx, tag.StatusCode.Of("OK"))
+ }
+ done()
+ return err
+}
+
+// Call invokes the target method and returns an object that can be used to await the response.
+// The params will be marshaled to JSON before sending over the wire, and will
+// be handed to the method invoked.
+// You do not have to wait for the response, it can just be ignored if not needed.
+// If sending the call failed, the response will be ready and have the error in it.
+func (c *Connection) Call(ctx context.Context, method string, params interface{}) *AsyncCall {
+ result := &AsyncCall{
+ id: Int64ID(atomic.AddInt64(&c.seq, 1)),
+ resultBox: make(chan asyncResult, 1),
+ }
+ // generate a new request identifier
+ call, err := NewCall(result.id, method, params)
+ if err != nil {
+ //set the result to failed
+ result.resultBox <- asyncResult{err: errors.Errorf("marshaling call parameters: %w", err)}
+ return result
+ }
+ ctx, endSpan := event.Start(ctx, method,
+ tag.Method.Of(method),
+ tag.RPCDirection.Of(tag.Outbound),
+ tag.RPCID.Of(fmt.Sprintf("%q", result.id)),
+ )
+ result.endSpan = endSpan
+ event.Metric(ctx, tag.Started.Of(1))
+ // We have to add ourselves to the pending map before we send, otherwise we
+ // are racing the response.
+ // rchan is buffered in case the response arrives without a listener.
+ result.response = make(chan *Response, 1)
+ pending := <-c.outgoingBox
+ pending[result.id] = result.response
+ c.outgoingBox <- pending
+ // now we are ready to send
+ if err := c.write(ctx, call); err != nil {
+ // sending failed, we will never get a response, so deliver a fake one
+ r, _ := NewResponse(result.id, nil, err)
+ c.incomingResponse(r)
+ }
+ return result
+}
+
+// ID used for this call.
+// This can be used to cancel the call if needed.
+func (a *AsyncCall) ID() ID { return a.id }
+
+// IsReady can be used to check if the result is already prepared.
+// This is guaranteed to return true on a result for which Await has already
+// returned, or a call that failed to send in the first place.
+func (a *AsyncCall) IsReady() bool {
+ select {
+ case r := <-a.resultBox:
+ a.resultBox <- r
+ return true
+ default:
+ return false
+ }
+}
+
+// Await the results of a Call.
+// The response will be unmarshaled from JSON into the result.
+func (a *AsyncCall) Await(ctx context.Context, result interface{}) error {
+ defer a.endSpan()
+ var r asyncResult
+ select {
+ case response := <-a.response:
+ // response just arrived, prepare the result
+ switch {
+ case response.Error != nil:
+ r.err = response.Error
+ event.Label(ctx, tag.StatusCode.Of("ERROR"))
+ default:
+ r.result = response.Result
+ event.Label(ctx, tag.StatusCode.Of("OK"))
+ }
+ case r = <-a.resultBox:
+ // result already available
+ case <-ctx.Done():
+ event.Label(ctx, tag.StatusCode.Of("CANCELLED"))
+ return ctx.Err()
+ }
+ // refill the box for the next caller
+ a.resultBox <- r
+ // and unpack the result
+ if r.err != nil {
+ return r.err
+ }
+ if result == nil || len(r.result) == 0 {
+ return nil
+ }
+ return json.Unmarshal(r.result, result)
+}
+
+// Respond deliverers a response to an incoming Call.
+// It is an error to not call this exactly once for any message for which a
+// handler has previously returned ErrAsyncResponse. It is also an error to
+// call this for any other message.
+func (c *Connection) Respond(id ID, result interface{}, rerr error) error {
+ pending := <-c.incomingBox
+ defer func() { c.incomingBox <- pending }()
+ entry, found := pending[id]
+ if !found {
+ return nil
+ }
+ delete(pending, id)
+ return c.respond(entry, result, rerr)
+}
+
+// Cancel is used to cancel an inbound message by ID, it does not cancel
+// outgoing messages.
+// This is only used inside a message handler that is layering a
+// cancellation protocol on top of JSON RPC 2.
+// It will not complain if the ID is not a currently active message, and it will
+// not cause any messages that have not arrived yet with that ID to be
+// cancelled.
+func (c *Connection) Cancel(id ID) {
+ pending := <-c.incomingBox
+ defer func() { c.incomingBox <- pending }()
+ if entry, found := pending[id]; found && entry.cancel != nil {
+ entry.cancel()
+ entry.cancel = nil
+ }
+}
+
+// Wait blocks until the connection is fully closed, but does not close it.
+func (c *Connection) Wait() error {
+ return c.async.wait()
+}
+
+// Close can be used to close the underlying stream, and then wait for the connection to
+// fully shut down.
+// This does not cancel in flight requests, but waits for them to gracefully complete.
+func (c *Connection) Close() error {
+ // close the underlying stream
+ if err := c.closer.Close(); err != nil && !isClosingError(err) {
+ return err
+ }
+ // and then wait for it to cause the connection to close
+ if err := c.Wait(); err != nil && !isClosingError(err) {
+ return err
+ }
+ return nil
+}
+
+// readIncoming collects inbound messages from the reader and delivers them, either responding
+// to outgoing calls or feeding requests to the queue.
+func (c *Connection) readIncoming(ctx context.Context, reader Reader, toQueue chan<- *incoming) {
+ defer close(toQueue)
+ for {
+ // get the next message
+ // no lock is needed, this is the only reader
+ msg, n, err := reader.Read(ctx)
+ if err != nil {
+ // The stream failed, we cannot continue
+ c.async.setError(err)
+ return
+ }
+ switch msg := msg.(type) {
+ case *Request:
+ entry := &incoming{
+ request: msg,
+ }
+ // add a span to the context for this request
+ labels := append(make([]label.Label, 0, 3), // make space for the id if present
+ tag.Method.Of(msg.Method),
+ tag.RPCDirection.Of(tag.Inbound),
+ )
+ if msg.IsCall() {
+ labels = append(labels, tag.RPCID.Of(fmt.Sprintf("%q", msg.ID)))
+ }
+ entry.baseCtx, entry.done = event.Start(ctx, msg.Method, labels...)
+ event.Metric(entry.baseCtx,
+ tag.Started.Of(1),
+ tag.ReceivedBytes.Of(n))
+ // in theory notifications cannot be cancelled, but we build them a cancel context anyway
+ entry.handleCtx, entry.cancel = context.WithCancel(entry.baseCtx)
+ // if the request is a call, add it to the incoming map so it can be
+ // cancelled by id
+ if msg.IsCall() {
+ pending := <-c.incomingBox
+ c.incomingBox <- pending
+ pending[msg.ID] = entry
+ }
+ // send the message to the incoming queue
+ toQueue <- entry
+ case *Response:
+ // If method is not set, this should be a response, in which case we must
+ // have an id to send the response back to the caller.
+ c.incomingResponse(msg)
+ }
+ }
+}
+
+func (c *Connection) incomingResponse(msg *Response) {
+ pending := <-c.outgoingBox
+ response, ok := pending[msg.ID]
+ if ok {
+ delete(pending, msg.ID)
+ }
+ c.outgoingBox <- pending
+ if response != nil {
+ response <- msg
+ }
+}
+
+// manageQueue reads incoming requests, attempts to proccess them with the preempter, or queue them
+// up for normal handling.
+func (c *Connection) manageQueue(ctx context.Context, preempter Preempter, fromRead <-chan *incoming, toDeliver chan<- *incoming) {
+ defer close(toDeliver)
+ q := []*incoming{}
+ ok := true
+ for {
+ var nextReq *incoming
+ if len(q) == 0 {
+ // no messages in the queue
+ // if we were closing, then we are done
+ if !ok {
+ return
+ }
+ // not closing, but nothing in the queue, so just block waiting for a read
+ nextReq, ok = <-fromRead
+ } else {
+ // we have a non empty queue, so pick whichever of reading or delivering
+ // that we can make progress on
+ select {
+ case nextReq, ok = <-fromRead:
+ case toDeliver <- q[0]:
+ //TODO: this causes a lot of shuffling, should we use a growing ring buffer? compaction?
+ q = q[1:]
+ }
+ }
+ if nextReq != nil {
+ // TODO: should we allow to limit the queue size?
+ var result interface{}
+ rerr := nextReq.handleCtx.Err()
+ if rerr == nil {
+ // only preempt if not already cancelled
+ result, rerr = preempter.Preempt(nextReq.handleCtx, nextReq.request)
+ }
+ switch {
+ case rerr == ErrNotHandled:
+ // message not handled, add it to the queue for the main handler
+ q = append(q, nextReq)
+ case rerr == ErrAsyncResponse:
+ // message handled but the response will come later
+ default:
+ // anything else means the message is fully handled
+ c.reply(nextReq, result, rerr)
+ }
+ }
+ }
+}
+
+func (c *Connection) deliverMessages(ctx context.Context, handler Handler, fromQueue <-chan *incoming) {
+ defer c.async.done()
+ for entry := range fromQueue {
+ // cancel any messages in the queue that we have a pending cancel for
+ var result interface{}
+ rerr := entry.handleCtx.Err()
+ if rerr == nil {
+ // only deliver if not already cancelled
+ result, rerr = handler.Handle(entry.handleCtx, entry.request)
+ }
+ switch {
+ case rerr == ErrNotHandled:
+ // message not handled, report it back to the caller as an error
+ c.reply(entry, nil, errors.Errorf("%w: %q", ErrMethodNotFound, entry.request.Method))
+ case rerr == ErrAsyncResponse:
+ // message handled but the response will come later
+ default:
+ c.reply(entry, result, rerr)
+ }
+ }
+}
+
+// reply is used to reply to an incoming request that has just been handled
+func (c *Connection) reply(entry *incoming, result interface{}, rerr error) {
+ if entry.request.IsCall() {
+ // we have a call finishing, remove it from the incoming map
+ pending := <-c.incomingBox
+ defer func() { c.incomingBox <- pending }()
+ delete(pending, entry.request.ID)
+ }
+ if err := c.respond(entry, result, rerr); err != nil {
+ // no way to propagate this error
+ //TODO: should we do more than just log it?
+ event.Error(entry.baseCtx, "jsonrpc2 message delivery failed", err)
+ }
+}
+
+// respond sends a response.
+// This is the code shared between reply and SendResponse.
+func (c *Connection) respond(entry *incoming, result interface{}, rerr error) error {
+ var err error
+ if entry.request.IsCall() {
+ // send the response
+ if result == nil && rerr == nil {
+ // call with no response, send an error anyway
+ rerr = errors.Errorf("%w: %q produced no response", ErrInternal, entry.request.Method)
+ }
+ var response *Response
+ response, err = NewResponse(entry.request.ID, result, rerr)
+ if err == nil {
+ // we write the response with the base context, in case the message was cancelled
+ err = c.write(entry.baseCtx, response)
+ }
+ } else {
+ switch {
+ case rerr != nil:
+ // notification failed
+ err = errors.Errorf("%w: %q notification failed: %v", ErrInternal, entry.request.Method, rerr)
+ rerr = nil
+ case result != nil:
+ //notification produced a response, which is an error
+ err = errors.Errorf("%w: %q produced unwanted response", ErrInternal, entry.request.Method)
+ default:
+ // normal notification finish
+ }
+ }
+ switch {
+ case rerr != nil || err != nil:
+ event.Label(entry.baseCtx, tag.StatusCode.Of("ERROR"))
+ default:
+ event.Label(entry.baseCtx, tag.StatusCode.Of("OK"))
+ }
+ // and just to be clean, invoke and clear the cancel if needed
+ if entry.cancel != nil {
+ entry.cancel()
+ entry.cancel = nil
+ }
+ // mark the entire request processing as done
+ entry.done()
+ return err
+}
+
+// write is used by all things that write outgoing messages, including replies.
+// it makes sure that writes are atomic
+func (c *Connection) write(ctx context.Context, msg Message) error {
+ writer := <-c.writerBox
+ defer func() { c.writerBox <- writer }()
+ n, err := writer.Write(ctx, msg)
+ event.Metric(ctx, tag.SentBytes.Of(n))
+ return err
+}
diff --git a/internal/jsonrpc2_v2/frame.go b/internal/jsonrpc2_v2/frame.go
new file mode 100644
index 0000000..634717c
--- /dev/null
+++ b/internal/jsonrpc2_v2/frame.go
@@ -0,0 +1,179 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+ "bufio"
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "strconv"
+ "strings"
+
+ errors "golang.org/x/xerrors"
+)
+
+// Reader abstracts the transport mechanics from the JSON RPC protocol.
+// A Conn reads messages from the reader it was provided on construction,
+// and assumes that each call to Read fully transfers a single message,
+// or returns an error.
+// A reader is not safe for concurrent use, it is expected it will be used by
+// a single Conn in a safe manner.
+type Reader interface {
+ // Read gets the next message from the stream.
+ Read(context.Context) (Message, int64, error)
+}
+
+// Writer abstracts the transport mechanics from the JSON RPC protocol.
+// A Conn writes messages using the writer it was provided on construction,
+// and assumes that each call to Write fully transfers a single message,
+// or returns an error.
+// A writer is not safe for concurrent use, it is expected it will be used by
+// a single Conn in a safe manner.
+type Writer interface {
+ // Write sends a message to the stream.
+ Write(context.Context, Message) (int64, error)
+}
+
+// Framer wraps low level byte readers and writers into jsonrpc2 message
+// readers and writers.
+// It is responsible for the framing and encoding of messages into wire form.
+type Framer interface {
+ // Reader wraps a byte reader into a message reader.
+ Reader(rw io.Reader) Reader
+ // Writer wraps a byte writer into a message writer.
+ Writer(rw io.Writer) Writer
+}
+
+// RawFramer returns a new Framer.
+// The messages are sent with no wrapping, and rely on json decode consistency
+// to determine message boundaries.
+func RawFramer() Framer { return rawFramer{} }
+
+type rawFramer struct{}
+type rawReader struct{ in *json.Decoder }
+type rawWriter struct{ out io.Writer }
+
+func (rawFramer) Reader(rw io.Reader) Reader {
+ return &rawReader{in: json.NewDecoder(rw)}
+}
+
+func (rawFramer) Writer(rw io.Writer) Writer {
+ return &rawWriter{out: rw}
+}
+
+func (r *rawReader) Read(ctx context.Context) (Message, int64, error) {
+ select {
+ case <-ctx.Done():
+ return nil, 0, ctx.Err()
+ default:
+ }
+ var raw json.RawMessage
+ if err := r.in.Decode(&raw); err != nil {
+ return nil, 0, err
+ }
+ msg, err := DecodeMessage(raw)
+ return msg, int64(len(raw)), err
+}
+
+func (w *rawWriter) Write(ctx context.Context, msg Message) (int64, error) {
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ default:
+ }
+ data, err := EncodeMessage(msg)
+ if err != nil {
+ return 0, errors.Errorf("marshaling message: %v", err)
+ }
+ n, err := w.out.Write(data)
+ return int64(n), err
+}
+
+// HeaderFramer returns a new Framer.
+// The messages are sent with HTTP content length and MIME type headers.
+// This is the format used by LSP and others.
+func HeaderFramer() Framer { return headerFramer{} }
+
+type headerFramer struct{}
+type headerReader struct{ in *bufio.Reader }
+type headerWriter struct{ out io.Writer }
+
+func (headerFramer) Reader(rw io.Reader) Reader {
+ return &headerReader{in: bufio.NewReader(rw)}
+}
+
+func (headerFramer) Writer(rw io.Writer) Writer {
+ return &headerWriter{out: rw}
+}
+
+func (r *headerReader) Read(ctx context.Context) (Message, int64, error) {
+ select {
+ case <-ctx.Done():
+ return nil, 0, ctx.Err()
+ default:
+ }
+ var total, length int64
+ // read the header, stop on the first empty line
+ for {
+ line, err := r.in.ReadString('\n')
+ total += int64(len(line))
+ if err != nil {
+ return nil, total, errors.Errorf("failed reading header line: %w", err)
+ }
+ line = strings.TrimSpace(line)
+ // check we have a header line
+ if line == "" {
+ break
+ }
+ colon := strings.IndexRune(line, ':')
+ if colon < 0 {
+ return nil, total, errors.Errorf("invalid header line %q", line)
+ }
+ name, value := line[:colon], strings.TrimSpace(line[colon+1:])
+ switch name {
+ case "Content-Length":
+ if length, err = strconv.ParseInt(value, 10, 32); err != nil {
+ return nil, total, errors.Errorf("failed parsing Content-Length: %v", value)
+ }
+ if length <= 0 {
+ return nil, total, errors.Errorf("invalid Content-Length: %v", length)
+ }
+ default:
+ // ignoring unknown headers
+ }
+ }
+ if length == 0 {
+ return nil, total, errors.Errorf("missing Content-Length header")
+ }
+ data := make([]byte, length)
+ n, err := io.ReadFull(r.in, data)
+ total += int64(n)
+ if err != nil {
+ return nil, total, err
+ }
+ msg, err := DecodeMessage(data)
+ return msg, total, err
+}
+
+func (w *headerWriter) Write(ctx context.Context, msg Message) (int64, error) {
+ select {
+ case <-ctx.Done():
+ return 0, ctx.Err()
+ default:
+ }
+ data, err := EncodeMessage(msg)
+ if err != nil {
+ return 0, errors.Errorf("marshaling message: %v", err)
+ }
+ n, err := fmt.Fprintf(w.out, "Content-Length: %v\r\n\r\n", len(data))
+ total := int64(n)
+ if err == nil {
+ n, err = w.out.Write(data)
+ total += int64(n)
+ }
+ return total, err
+}
diff --git a/internal/jsonrpc2_v2/jsonrpc2.go b/internal/jsonrpc2_v2/jsonrpc2.go
new file mode 100644
index 0000000..49f32cb
--- /dev/null
+++ b/internal/jsonrpc2_v2/jsonrpc2.go
@@ -0,0 +1,84 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package jsonrpc2 is a minimal implementation of the JSON RPC 2 spec.
+// https://www.jsonrpc.org/specification
+// It is intended to be compatible with other implementations at the wire level.
+package jsonrpc2
+
+import (
+ "context"
+ "errors"
+)
+
+var (
+ // ErrIdleTimeout is returned when serving timed out waiting for new connections.
+ ErrIdleTimeout = errors.New("timed out waiting for new connections")
+ // ErrNotHandled is returned from a handler to indicate it did not handle the
+ // message.
+ ErrNotHandled = errors.New("JSON RPC not handled")
+ // ErrAsyncResponse is returned from a handler to indicate it will generate a
+ // response asynchronously.
+ ErrAsyncResponse = errors.New("JSON RPC asynchronous response")
+)
+
+// Preempter handles messages on a connection before they are queued to the main
+// handler.
+// Primarily this is used for cancel handlers or notifications for which out of
+// order processing is not an issue.
+type Preempter interface {
+ // Preempt is invoked for each incoming request before it is queued.
+ // If the request is a call, it must return a value or an error for the reply.
+ // Preempt should not block or start any new messages on the connection.
+ Preempt(ctx context.Context, req *Request) (interface{}, error)
+}
+
+// Handler handles messages on a connection.
+type Handler interface {
+ // Handle is invoked for each incoming request.
+ // If the request is a call, it must return a value or an error for the reply.
+ Handle(ctx context.Context, req *Request) (interface{}, error)
+}
+
+type defaultHandler struct{}
+
+func (defaultHandler) Preempt(context.Context, *Request) (interface{}, error) {
+ return nil, ErrNotHandled
+}
+
+func (defaultHandler) Handle(context.Context, *Request) (interface{}, error) {
+ return nil, ErrNotHandled
+}
+
+// async is a small helper for things with an asynchronous result that you can
+// wait for.
+type async struct {
+ ready chan struct{}
+ errBox chan error
+}
+
+func (a *async) init() {
+ a.ready = make(chan struct{})
+ a.errBox = make(chan error, 1)
+ a.errBox <- nil
+}
+
+func (a *async) done() {
+ close(a.ready)
+}
+
+func (a *async) wait() error {
+ <-a.ready
+ err := <-a.errBox
+ a.errBox <- err
+ return err
+}
+
+func (a *async) setError(err error) {
+ storedErr := <-a.errBox
+ if storedErr == nil {
+ storedErr = err
+ }
+ a.errBox <- storedErr
+}
diff --git a/internal/jsonrpc2_v2/jsonrpc2_test.go b/internal/jsonrpc2_v2/jsonrpc2_test.go
new file mode 100644
index 0000000..8f2eca1
--- /dev/null
+++ b/internal/jsonrpc2_v2/jsonrpc2_test.go
@@ -0,0 +1,389 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2_test
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "path"
+ "reflect"
+ "testing"
+ "time"
+
+ "golang.org/x/tools/internal/event/export/eventtest"
+ jsonrpc2 "golang.org/x/tools/internal/jsonrpc2_v2"
+ "golang.org/x/tools/internal/stack/stacktest"
+ errors "golang.org/x/xerrors"
+)
+
+var callTests = []invoker{
+ call{"no_args", nil, true},
+ call{"one_string", "fish", "got:fish"},
+ call{"one_number", 10, "got:10"},
+ call{"join", []string{"a", "b", "c"}, "a/b/c"},
+ sequence{"notify", []invoker{
+ notify{"set", 3},
+ notify{"add", 5},
+ call{"get", nil, 8},
+ }},
+ sequence{"preempt", []invoker{
+ async{"a", "wait", "a"},
+ notify{"unblock", "a"},
+ collect{"a", true, false},
+ }},
+ sequence{"basic cancel", []invoker{
+ async{"b", "wait", "b"},
+ cancel{"b"},
+ collect{"b", nil, true},
+ }},
+ sequence{"queue", []invoker{
+ async{"a", "wait", "a"},
+ notify{"set", 1},
+ notify{"add", 2},
+ notify{"add", 3},
+ notify{"add", 4},
+ call{"peek", nil, 0}, // accumulator will not have any adds yet
+ notify{"unblock", "a"},
+ collect{"a", true, false},
+ call{"get", nil, 10}, // accumulator now has all the adds
+ }},
+ sequence{"fork", []invoker{
+ async{"a", "fork", "a"},
+ notify{"set", 1},
+ notify{"add", 2},
+ notify{"add", 3},
+ notify{"add", 4},
+ call{"get", nil, 10}, // fork will not have blocked the adds
+ notify{"unblock", "a"},
+ collect{"a", true, false},
+ }},
+}
+
+type binder struct {
+ framer jsonrpc2.Framer
+ runTest func(*handler)
+}
+
+type handler struct {
+ conn *jsonrpc2.Connection
+ accumulator int
+ waitersBox chan map[string]chan struct{}
+ calls map[string]*jsonrpc2.AsyncCall
+}
+
+type invoker interface {
+ Name() string
+ Invoke(t *testing.T, ctx context.Context, h *handler)
+}
+
+type notify struct {
+ method string
+ params interface{}
+}
+
+type call struct {
+ method string
+ params interface{}
+ expect interface{}
+}
+
+type async struct {
+ name string
+ method string
+ params interface{}
+}
+
+type collect struct {
+ name string
+ expect interface{}
+ fails bool
+}
+
+type cancel struct {
+ name string
+}
+
+type sequence struct {
+ name string
+ tests []invoker
+}
+
+type echo call
+
+type cancelParams struct{ ID int64 }
+
+func TestConnectionRaw(t *testing.T) {
+ testConnection(t, jsonrpc2.RawFramer())
+}
+
+func TestConnectionHeader(t *testing.T) {
+ testConnection(t, jsonrpc2.HeaderFramer())
+}
+
+func testConnection(t *testing.T, framer jsonrpc2.Framer) {
+ stacktest.NoLeak(t)
+ ctx := eventtest.NewContext(context.Background(), t)
+ listener, err := jsonrpc2.NetPipe(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ server, err := jsonrpc2.Serve(ctx, listener, binder{framer, nil}, jsonrpc2.ServeOptions{})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() {
+ listener.Close()
+ server.Wait()
+ }()
+
+ for _, test := range callTests {
+ t.Run(test.Name(), func(t *testing.T) {
+ client, err := jsonrpc2.Dial(ctx,
+ listener.Dialer(), binder{framer, func(h *handler) {
+ defer h.conn.Close()
+ ctx := eventtest.NewContext(ctx, t)
+ test.Invoke(t, ctx, h)
+ if call, ok := test.(*call); ok {
+ // also run all simple call tests in echo mode
+ (*echo)(call).Invoke(t, ctx, h)
+ }
+ }})
+ if err != nil {
+ t.Fatal(err)
+ }
+ client.Wait()
+ })
+ }
+}
+
+func (test notify) Name() string { return test.method }
+func (test notify) Invoke(t *testing.T, ctx context.Context, h *handler) {
+ if err := h.conn.Notify(ctx, test.method, test.params); err != nil {
+ t.Fatalf("%v:Notify failed: %v", test.method, err)
+ }
+}
+
+func (test call) Name() string { return test.method }
+func (test call) Invoke(t *testing.T, ctx context.Context, h *handler) {
+ results := newResults(test.expect)
+ if err := h.conn.Call(ctx, test.method, test.params).Await(ctx, results); err != nil {
+ t.Fatalf("%v:Call failed: %v", test.method, err)
+ }
+ verifyResults(t, test.method, results, test.expect)
+}
+
+func (test echo) Invoke(t *testing.T, ctx context.Context, h *handler) {
+ results := newResults(test.expect)
+ if err := h.conn.Call(ctx, "echo", []interface{}{test.method, test.params}).Await(ctx, results); err != nil {
+ t.Fatalf("%v:Echo failed: %v", test.method, err)
+ }
+ verifyResults(t, test.method, results, test.expect)
+}
+
+func (test async) Name() string { return test.name }
+func (test async) Invoke(t *testing.T, ctx context.Context, h *handler) {
+ h.calls[test.name] = h.conn.Call(ctx, test.method, test.params)
+}
+
+func (test collect) Name() string { return test.name }
+func (test collect) Invoke(t *testing.T, ctx context.Context, h *handler) {
+ o := h.calls[test.name]
+ results := newResults(test.expect)
+ err := o.Await(ctx, results)
+ switch {
+ case test.fails && err == nil:
+ t.Fatalf("%v:Collect was supposed to fail", test.name)
+ case !test.fails && err != nil:
+ t.Fatalf("%v:Collect failed: %v", test.name, err)
+ }
+ verifyResults(t, test.name, results, test.expect)
+}
+
+func (test cancel) Name() string { return test.name }
+func (test cancel) Invoke(t *testing.T, ctx context.Context, h *handler) {
+ o := h.calls[test.name]
+ if err := h.conn.Notify(ctx, "cancel", &cancelParams{o.ID().Raw().(int64)}); err != nil {
+ t.Fatalf("%v:Collect failed: %v", test.name, err)
+ }
+}
+
+func (test sequence) Name() string { return test.name }
+func (test sequence) Invoke(t *testing.T, ctx context.Context, h *handler) {
+ for _, child := range test.tests {
+ child.Invoke(t, ctx, h)
+ }
+}
+
+// newResults makes a new empty copy of the expected type to put the results into
+func newResults(expect interface{}) interface{} {
+ switch e := expect.(type) {
+ case []interface{}:
+ var r []interface{}
+ for _, v := range e {
+ r = append(r, reflect.New(reflect.TypeOf(v)).Interface())
+ }
+ return r
+ case nil:
+ return nil
+ default:
+ return reflect.New(reflect.TypeOf(expect)).Interface()
+ }
+}
+
+// verifyResults compares the results to the expected values
+func verifyResults(t *testing.T, method string, results interface{}, expect interface{}) {
+ if expect == nil {
+ if results != nil {
+ t.Errorf("%v:Got results %+v where none expeted", method, expect)
+ }
+ return
+ }
+ val := reflect.Indirect(reflect.ValueOf(results)).Interface()
+ if !reflect.DeepEqual(val, expect) {
+ t.Errorf("%v:Results are incorrect, got %+v expect %+v", method, val, expect)
+ }
+}
+
+func (b binder) Bind(ctx context.Context, conn *jsonrpc2.Connection) (jsonrpc2.ConnectionOptions, error) {
+ h := &handler{
+ conn: conn,
+ waitersBox: make(chan map[string]chan struct{}, 1),
+ calls: make(map[string]*jsonrpc2.AsyncCall),
+ }
+ h.waitersBox <- make(map[string]chan struct{})
+ if b.runTest != nil {
+ go b.runTest(h)
+ }
+ return jsonrpc2.ConnectionOptions{
+ Framer: b.framer,
+ Preempter: h,
+ Handler: h,
+ }, nil
+}
+
+func (h *handler) waiter(name string) chan struct{} {
+ waiters := <-h.waitersBox
+ defer func() { h.waitersBox <- waiters }()
+ waiter, found := waiters[name]
+ if !found {
+ waiter = make(chan struct{})
+ waiters[name] = waiter
+ }
+ return waiter
+}
+
+func (h *handler) Preempt(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
+ switch req.Method {
+ case "unblock":
+ var name string
+ if err := json.Unmarshal(req.Params, &name); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ close(h.waiter(name))
+ return nil, nil
+ case "peek":
+ if len(req.Params) > 0 {
+ return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams)
+ }
+ return h.accumulator, nil
+ case "cancel":
+ var params cancelParams
+ if err := json.Unmarshal(req.Params, ¶ms); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ h.conn.Cancel(jsonrpc2.Int64ID(params.ID))
+ return nil, nil
+ default:
+ return nil, jsonrpc2.ErrNotHandled
+ }
+}
+
+func (h *handler) Handle(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
+ switch req.Method {
+ case "no_args":
+ if len(req.Params) > 0 {
+ return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams)
+ }
+ return true, nil
+ case "one_string":
+ var v string
+ if err := json.Unmarshal(req.Params, &v); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ return "got:" + v, nil
+ case "one_number":
+ var v int
+ if err := json.Unmarshal(req.Params, &v); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ return fmt.Sprintf("got:%d", v), nil
+ case "set":
+ var v int
+ if err := json.Unmarshal(req.Params, &v); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ h.accumulator = v
+ return nil, nil
+ case "add":
+ var v int
+ if err := json.Unmarshal(req.Params, &v); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ h.accumulator += v
+ return nil, nil
+ case "get":
+ if len(req.Params) > 0 {
+ return nil, errors.Errorf("%w: expected no params", jsonrpc2.ErrInvalidParams)
+ }
+ return h.accumulator, nil
+ case "join":
+ var v []string
+ if err := json.Unmarshal(req.Params, &v); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ return path.Join(v...), nil
+ case "echo":
+ var v []interface{}
+ if err := json.Unmarshal(req.Params, &v); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ var result interface{}
+ err := h.conn.Call(ctx, v[0].(string), v[1]).Await(ctx, &result)
+ return result, err
+ case "wait":
+ var name string
+ if err := json.Unmarshal(req.Params, &name); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ select {
+ case <-h.waiter(name):
+ return true, nil
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case <-time.After(time.Second):
+ return nil, errors.Errorf("wait for %q timed out", name)
+ }
+ case "fork":
+ var name string
+ if err := json.Unmarshal(req.Params, &name); err != nil {
+ return nil, errors.Errorf("%w: %s", jsonrpc2.ErrParse, err)
+ }
+ waitFor := h.waiter(name)
+ go func() {
+ select {
+ case <-waitFor:
+ h.conn.Respond(req.ID, true, nil)
+ case <-ctx.Done():
+ h.conn.Respond(req.ID, nil, ctx.Err())
+ case <-time.After(time.Second):
+ h.conn.Respond(req.ID, nil, errors.Errorf("wait for %q timed out", name))
+ }
+ }()
+ return nil, jsonrpc2.ErrAsyncResponse
+ default:
+ return nil, jsonrpc2.ErrNotHandled
+ }
+}
diff --git a/internal/jsonrpc2_v2/messages.go b/internal/jsonrpc2_v2/messages.go
new file mode 100644
index 0000000..652ac81
--- /dev/null
+++ b/internal/jsonrpc2_v2/messages.go
@@ -0,0 +1,181 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+ "encoding/json"
+
+ errors "golang.org/x/xerrors"
+)
+
+// ID is a Request identifier.
+type ID struct {
+ value interface{}
+}
+
+// Message is the interface to all jsonrpc2 message types.
+// They share no common functionality, but are a closed set of concrete types
+// that are allowed to implement this interface. The message types are *Request
+// and *Response.
+type Message interface {
+ // marshal builds the wire form from the API form.
+ // It is private, which makes the set of Message implementations closed.
+ marshal(to *wireCombined)
+}
+
+// Request is a Message sent to a peer to request behavior.
+// If it has an ID it is a call, otherwise it is a notification.
+type Request struct {
+ // ID of this request, used to tie the Response back to the request.
+ // This will be nil for notifications.
+ ID ID
+ // Method is a string containing the method name to invoke.
+ Method string
+ // Params is either a struct or an array with the parameters of the method.
+ Params json.RawMessage
+}
+
+// Response is a Message used as a reply to a call Request.
+// It will have the same ID as the call it is a response to.
+type Response struct {
+ // result is the content of the response.
+ Result json.RawMessage
+ // err is set only if the call failed.
+ Error error
+ // id of the request this is a response to.
+ ID ID
+}
+
+// StringID creates a new string request identifier.
+func StringID(s string) ID { return ID{value: s} }
+
+// Int64ID creates a new integer request identifier.
+func Int64ID(i int64) ID { return ID{value: i} }
+
+// IsValid returns true if the ID is a valid identifier.
+// The default value for ID will return false.
+func (id ID) IsValid() bool { return id.value != nil }
+
+// Raw returns the underlying value of the ID.
+func (id ID) Raw() interface{} { return id.value }
+
+// NewNotification constructs a new Notification message for the supplied
+// method and parameters.
+func NewNotification(method string, params interface{}) (*Request, error) {
+ p, merr := marshalToRaw(params)
+ return &Request{Method: method, Params: p}, merr
+}
+
+// NewCall constructs a new Call message for the supplied ID, method and
+// parameters.
+func NewCall(id ID, method string, params interface{}) (*Request, error) {
+ p, merr := marshalToRaw(params)
+ return &Request{ID: id, Method: method, Params: p}, merr
+}
+
+func (msg *Request) IsCall() bool { return msg.ID.IsValid() }
+
+func (msg *Request) marshal(to *wireCombined) {
+ to.ID = msg.ID.value
+ to.Method = msg.Method
+ to.Params = msg.Params
+}
+
+// NewResponse constructs a new Response message that is a reply to the
+// supplied. If err is set result may be ignored.
+func NewResponse(id ID, result interface{}, rerr error) (*Response, error) {
+ r, merr := marshalToRaw(result)
+ return &Response{ID: id, Result: r, Error: rerr}, merr
+}
+
+func (msg *Response) marshal(to *wireCombined) {
+ to.ID = msg.ID.value
+ to.Error = toWireError(msg.Error)
+ to.Result = msg.Result
+}
+
+func toWireError(err error) *wireError {
+ if err == nil {
+ // no error, the response is complete
+ return nil
+ }
+ if err, ok := err.(*wireError); ok {
+ // already a wire error, just use it
+ return err
+ }
+ result := &wireError{Message: err.Error()}
+ var wrapped *wireError
+ if errors.As(err, &wrapped) {
+ // if we wrapped a wire error, keep the code from the wrapped error
+ // but the message from the outer error
+ result.Code = wrapped.Code
+ }
+ return result
+}
+
+func EncodeMessage(msg Message) ([]byte, error) {
+ wire := wireCombined{VersionTag: wireVersion}
+ msg.marshal(&wire)
+ data, err := json.Marshal(&wire)
+ if err != nil {
+ return data, errors.Errorf("marshaling jsonrpc message: %w", err)
+ }
+ return data, nil
+}
+
+func DecodeMessage(data []byte) (Message, error) {
+ msg := wireCombined{}
+ if err := json.Unmarshal(data, &msg); err != nil {
+ return nil, errors.Errorf("unmarshaling jsonrpc message: %w", err)
+ }
+ if msg.VersionTag != wireVersion {
+ return nil, errors.Errorf("invalid message version tag %s expected %s", msg.VersionTag, wireVersion)
+ }
+ id := ID{}
+ switch v := msg.ID.(type) {
+ case nil:
+ case float64:
+ // coerce the id type to int64 if it is float64, the spec does not allow fractional parts
+ id = Int64ID(int64(v))
+ case int64:
+ id = Int64ID(v)
+ case string:
+ id = StringID(v)
+ default:
+ return nil, errors.Errorf("invalid message id type <%T>%v", v, v)
+ }
+ if msg.Method != "" {
+ // has a method, must be a call
+ return &Request{
+ Method: msg.Method,
+ ID: id,
+ Params: msg.Params,
+ }, nil
+ }
+ // no method, should be a response
+ if !id.IsValid() {
+ return nil, ErrInvalidRequest
+ }
+ resp := &Response{
+ ID: id,
+ Result: msg.Result,
+ }
+ // we have to check if msg.Error is nil to avoid a typed error
+ if msg.Error != nil {
+ resp.Error = msg.Error
+ }
+ return resp, nil
+}
+
+func marshalToRaw(obj interface{}) (json.RawMessage, error) {
+ if obj == nil {
+ return nil, nil
+ }
+ data, err := json.Marshal(obj)
+ if err != nil {
+ return nil, err
+ }
+ return json.RawMessage(data), nil
+}
diff --git a/internal/jsonrpc2_v2/net.go b/internal/jsonrpc2_v2/net.go
new file mode 100644
index 0000000..c8cfaab
--- /dev/null
+++ b/internal/jsonrpc2_v2/net.go
@@ -0,0 +1,129 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+ "context"
+ "io"
+ "net"
+ "os"
+ "time"
+)
+
+// This file contains implementations of the transport primitives that use the standard network
+// package.
+
+// NetListenOptions is the optional arguments to the NetListen function.
+type NetListenOptions struct {
+ NetListenConfig net.ListenConfig
+ NetDialer net.Dialer
+}
+
+// NetListener returns a new Listener that listents on a socket using the net package.
+func NetListener(ctx context.Context, network, address string, options NetListenOptions) (Listener, error) {
+ ln, err := options.NetListenConfig.Listen(ctx, network, address)
+ if err != nil {
+ return nil, err
+ }
+ return &netListener{net: ln}, nil
+}
+
+// netListener is the implementation of Listener for connections made using the net package.
+type netListener struct {
+ net net.Listener
+}
+
+// Accept blocks waiting for an incoming connection to the listener.
+func (l *netListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
+ return l.net.Accept()
+}
+
+// Close will cause the listener to stop listening. It will not close any connections that have
+// already been accepted.
+func (l *netListener) Close() error {
+ addr := l.net.Addr()
+ err := l.net.Close()
+ if addr.Network() == "unix" {
+ rerr := os.Remove(addr.String())
+ if rerr != nil && err == nil {
+ err = rerr
+ }
+ }
+ return err
+}
+
+// Dialer returns a dialer that can be used to connect to the listener.
+func (l *netListener) Dialer() Dialer {
+ return NetDialer(l.net.Addr().Network(), l.net.Addr().String(), net.Dialer{
+ Timeout: 5 * time.Second,
+ })
+}
+
+// NetDialer returns a Dialer using the supplied standard network dialer.
+func NetDialer(network, address string, nd net.Dialer) Dialer {
+ return &netDialer{
+ network: network,
+ address: address,
+ dialer: nd,
+ }
+}
+
+type netDialer struct {
+ network string
+ address string
+ dialer net.Dialer
+}
+
+func (n *netDialer) Dial(ctx context.Context) (io.ReadWriteCloser, error) {
+ return n.dialer.DialContext(ctx, n.network, n.address)
+}
+
+// NetPipe returns a new Listener that listens using net.Pipe.
+// It is only possibly to connect to it using the Dialier returned by the
+// Dialer method, each call to that method will generate a new pipe the other
+// side of which will be returnd from the Accept call.
+func NetPipe(ctx context.Context) (Listener, error) {
+ return &netPiper{
+ done: make(chan struct{}),
+ dialed: make(chan io.ReadWriteCloser),
+ }, nil
+}
+
+// netPiper is the implementation of Listener build on top of net.Pipes.
+type netPiper struct {
+ done chan struct{}
+ dialed chan io.ReadWriteCloser
+}
+
+// Accept blocks waiting for an incoming connection to the listener.
+func (l *netPiper) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
+ // block until we have a listener, or are closed or cancelled
+ select {
+ case rwc := <-l.dialed:
+ return rwc, nil
+ case <-l.done:
+ return nil, io.EOF
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+}
+
+// Close will cause the listener to stop listening. It will not close any connections that have
+// already been accepted.
+func (l *netPiper) Close() error {
+ // unblock any accept calls that are pending
+ close(l.done)
+ return nil
+}
+
+func (l *netPiper) Dialer() Dialer {
+ return l
+}
+
+func (l *netPiper) Dial(ctx context.Context) (io.ReadWriteCloser, error) {
+ client, server := net.Pipe()
+ l.dialed <- server
+ return client, nil
+}
diff --git a/internal/jsonrpc2_v2/serve.go b/internal/jsonrpc2_v2/serve.go
new file mode 100644
index 0000000..1bac974
--- /dev/null
+++ b/internal/jsonrpc2_v2/serve.go
@@ -0,0 +1,208 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+ "context"
+ "io"
+ "time"
+
+ "golang.org/x/tools/internal/event"
+ errors "golang.org/x/xerrors"
+)
+
+// Listener is implemented by protocols to accept new inbound connections.
+type Listener interface {
+ // Accept an inbound connection to a server.
+ // It must block until an inbound connection is made, or the listener is
+ // shut down.
+ Accept(context.Context) (io.ReadWriteCloser, error)
+
+ // Close is used to ask a listener to stop accepting new connections.
+ Close() error
+
+ // Dialer returns a dialer that can be used to connect to this listener
+ // locally.
+ // If a listener does not implement this it will return a nil.
+ Dialer() Dialer
+}
+
+// Dialer is used by clients to dial a server.
+type Dialer interface {
+ // Dial returns a new communication byte stream to a listening server.
+ Dial(ctx context.Context) (io.ReadWriteCloser, error)
+}
+
+// Server is a running server that is accepting incoming connections.
+type Server struct {
+ listener Listener
+ binder Binder
+ options ServeOptions // a copy of the config that started this server
+ async async
+}
+
+// ServeOptions holds the options to the Serve function.
+//TODO: kill ServeOptions and push timeout into the listener
+type ServeOptions struct {
+ // IdleTimeout is the maximum amount of time to remain idle and running.
+ IdleTimeout time.Duration
+}
+
+// Dial uses the dialer to make a new connection, wraps the returned
+// reader and writer using the framer to make a stream, and then builds
+// a connection on top of that stream using the binder.
+func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) {
+ // dial a server
+ rwc, err := dialer.Dial(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return newConnection(ctx, rwc, binder)
+}
+
+// Serve starts a new server listening for incoming connections and returns
+// it.
+// This returns a fully running and connected server, it does not block on
+// the listener.
+// You can call Wait to block on the server, or Shutdown to get the sever to
+// terminate gracefully.
+// To notice incoming connections, use an intercepting Binder.
+func Serve(ctx context.Context, listener Listener, binder Binder, options ServeOptions) (*Server, error) {
+ server := &Server{
+ listener: listener,
+ binder: binder,
+ options: options,
+ }
+ server.async.init()
+ go server.run(ctx)
+ return server, nil
+}
+
+// Wait returns only when the server has shut down.
+func (s *Server) Wait() error {
+ return s.async.wait()
+}
+
+// run accepts incoming connections from the listener,
+// If IdleTimeout is non-zero, run exits after there are no clients for this
+// duration, otherwise it exits only on error.
+func (s *Server) run(ctx context.Context) {
+ defer s.async.done()
+ // Max duration: ~290 years; surely that's long enough.
+ const forever = 1<<63 - 1
+ idleTimeout := s.options.IdleTimeout
+ if idleTimeout <= 0 {
+ idleTimeout = forever
+ }
+ idleTimer := time.NewTimer(idleTimeout)
+
+ // run a goroutine that listens for incoming connections and posts them
+ // back to the worker
+ newStreams := make(chan io.ReadWriteCloser)
+ go func() {
+ for {
+ // we never close the accepted connection, we rely on the other end
+ // closing or the socket closing itself naturally
+ rwc, err := s.listener.Accept(ctx)
+ if err != nil {
+ if !isClosingError(err) {
+ event.Error(ctx, "Accept", err)
+ }
+ // signal we are done generating new connections for good
+ close(newStreams)
+ return
+ }
+ newStreams <- rwc
+ }
+ }()
+
+ closedConns := make(chan struct{})
+ activeConns := 0
+ lnClosed := false
+ for {
+ select {
+ case rwc := <-newStreams:
+ // whatever happes we are not idle anymore
+ idleTimer.Stop()
+ if rwc == nil {
+ // the net listener has been closed
+ lnClosed = true
+ if activeConns == 0 {
+ // accept is done and there are no active connections, so just stop now
+ return
+ }
+ // replace the channel with one that will never trigger
+ // this is save because the only writer has already quit
+ newStreams = nil
+ // and then wait for all active connections to stop
+ continue
+ }
+ // a new inbound connection,
+ conn, err := newConnection(ctx, rwc, s.binder)
+ if err != nil {
+ if !isClosingError(err) {
+ event.Error(ctx, "NewConn", err)
+ }
+ continue
+ }
+ // register the new conn as active
+ activeConns++
+ // wrap the conn in a close monitor
+ //TODO: we do this to maintain our active count correctly, is there a better way?
+ go func() {
+ err := conn.Wait()
+ if err != nil && !isClosingError(err) {
+ event.Error(ctx, "closed a connection", err)
+ }
+ closedConns <- struct{}{}
+ }()
+ case <-closedConns:
+ activeConns--
+ if activeConns == 0 {
+ // no more active connections, restart the idle timer
+ if lnClosed {
+ // we can never get a new connection, so we are done
+ return
+ }
+ // we are idle, but might get a new connection still
+ idleTimer.Reset(idleTimeout)
+ }
+ case <-idleTimer.C:
+ // no activity for a while, time to stop serving
+ s.async.setError(ErrIdleTimeout)
+ return
+ case <-ctx.Done():
+ s.async.setError(ctx.Err())
+ return
+ }
+ }
+}
+
+// isClosingError reports if the error occurs normally during the process of
+// closing a network connection. It uses imperfect heuristics that err on the
+// side of false negatives, and should not be used for anything critical.
+func isClosingError(err error) bool {
+ if err == nil {
+ return false
+ }
+ // fully unwrap the error, so the following tests work
+ for wrapped := err; wrapped != nil; wrapped = errors.Unwrap(err) {
+ err = wrapped
+ }
+
+ // was it based on an EOF error?
+ if err == io.EOF {
+ return true
+ }
+
+ // Per https://github.com/golang/go/issues/4373, this error string should not
+ // change. This is not ideal, but since the worst that could happen here is
+ // some superfluous logging, it is acceptable.
+ if err.Error() == "use of closed network connection" {
+ return true
+ }
+
+ return false
+}
diff --git a/internal/jsonrpc2_v2/serve_test.go b/internal/jsonrpc2_v2/serve_test.go
new file mode 100644
index 0000000..1b6b3b2
--- /dev/null
+++ b/internal/jsonrpc2_v2/serve_test.go
@@ -0,0 +1,144 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2_test
+
+import (
+ "context"
+ "errors"
+ "testing"
+ "time"
+
+ jsonrpc2 "golang.org/x/tools/internal/jsonrpc2_v2"
+ "golang.org/x/tools/internal/stack/stacktest"
+)
+
+func TestIdleTimeout(t *testing.T) {
+ stacktest.NoLeak(t)
+ ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+ defer cancel()
+
+ listener, err := jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer listener.Close()
+
+ server, err := jsonrpc2.Serve(ctx, listener, jsonrpc2.ConnectionOptions{},
+ jsonrpc2.ServeOptions{
+ IdleTimeout: 100 * time.Millisecond,
+ })
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ connect := func() *jsonrpc2.Connection {
+ client, err := jsonrpc2.Dial(ctx,
+ listener.Dialer(),
+ jsonrpc2.ConnectionOptions{})
+ if err != nil {
+ t.Fatal(err)
+ }
+ return client
+ }
+ // Exercise some connection/disconnection patterns, and then assert that when
+ // our timer fires, the server exits.
+ conn1 := connect()
+ conn2 := connect()
+ if err := conn1.Close(); err != nil {
+ t.Fatalf("conn1.Close failed with error: %v", err)
+ }
+ if err := conn2.Close(); err != nil {
+ t.Fatalf("conn2.Close failed with error: %v", err)
+ }
+ conn3 := connect()
+ if err := conn3.Close(); err != nil {
+ t.Fatalf("conn3.Close failed with error: %v", err)
+ }
+
+ serverError := server.Wait()
+
+ if !errors.Is(serverError, jsonrpc2.ErrIdleTimeout) {
+ t.Errorf("run() returned error %v, want %v", serverError, jsonrpc2.ErrIdleTimeout)
+ }
+}
+
+type msg struct {
+ Msg string
+}
+
+type fakeHandler struct{}
+
+func (fakeHandler) Handle(ctx context.Context, req *jsonrpc2.Request) (interface{}, error) {
+ switch req.Method {
+ case "ping":
+ return &msg{"pong"}, nil
+ default:
+ return nil, jsonrpc2.ErrNotHandled
+ }
+}
+
+func TestServe(t *testing.T) {
+ stacktest.NoLeak(t)
+ ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
+ defer cancel()
+
+ tests := []struct {
+ name string
+ factory func(context.Context) (jsonrpc2.Listener, error)
+ }{
+ {"tcp", func(ctx context.Context) (jsonrpc2.Listener, error) {
+ return jsonrpc2.NetListener(ctx, "tcp", "localhost:0", jsonrpc2.NetListenOptions{})
+ }},
+ {"pipe", func(ctx context.Context) (jsonrpc2.Listener, error) {
+ return jsonrpc2.NetPipe(ctx)
+ }},
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+ fake, err := test.factory(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+ conn, shutdown, err := newFake(ctx, fake)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer shutdown(ctx)
+ var got msg
+ if err := conn.Call(ctx, "ping", &msg{"ting"}).Await(ctx, &got); err != nil {
+ t.Fatal(err)
+ }
+ if want := "pong"; got.Msg != want {
+ t.Errorf("conn.Call(...): returned %q, want %q", got, want)
+ }
+ })
+ }
+}
+
+func newFake(ctx context.Context, l jsonrpc2.Listener) (*jsonrpc2.Connection, func(context.Context), error) {
+ server, err := jsonrpc2.Serve(ctx, l, jsonrpc2.ConnectionOptions{
+ Handler: fakeHandler{},
+ }, jsonrpc2.ServeOptions{
+ IdleTimeout: 100 * time.Millisecond,
+ })
+ if err != nil {
+ return nil, nil, err
+ }
+
+ client, err := jsonrpc2.Dial(ctx,
+ l.Dialer(),
+ jsonrpc2.ConnectionOptions{
+ Handler: fakeHandler{},
+ })
+ if err != nil {
+ return nil, nil, err
+ }
+ return client, func(ctx context.Context) {
+ l.Close()
+ client.Close()
+ server.Wait()
+ }, nil
+}
diff --git a/internal/jsonrpc2_v2/wire.go b/internal/jsonrpc2_v2/wire.go
new file mode 100644
index 0000000..97b1ae8
--- /dev/null
+++ b/internal/jsonrpc2_v2/wire.go
@@ -0,0 +1,74 @@
+// Copyright 2018 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2
+
+import (
+ "encoding/json"
+)
+
+// This file contains the go forms of the wire specification.
+// see http://www.jsonrpc.org/specification for details
+
+var (
+ // ErrUnknown should be used for all non coded errors.
+ ErrUnknown = NewError(-32001, "JSON RPC unknown error")
+ // ErrParse is used when invalid JSON was received by the server.
+ ErrParse = NewError(-32700, "JSON RPC parse error")
+ // ErrInvalidRequest is used when the JSON sent is not a valid Request object.
+ ErrInvalidRequest = NewError(-32600, "JSON RPC invalid request")
+ // ErrMethodNotFound should be returned by the handler when the method does
+ // not exist / is not available.
+ ErrMethodNotFound = NewError(-32601, "JSON RPC method not found")
+ // ErrInvalidParams should be returned by the handler when method
+ // parameter(s) were invalid.
+ ErrInvalidParams = NewError(-32602, "JSON RPC invalid params")
+ // ErrInternal indicates a failure to process a call correctly
+ ErrInternal = NewError(-32603, "JSON RPC internal error")
+
+ // The following errors are not part of the json specification, but
+ // compliant extensions specific to this implimentation.
+
+ // ErrServerOverloaded is returned when a message was refused due to a
+ // server being temporarily unable to accept any new messages.
+ ErrServerOverloaded = NewError(-32000, "JSON RPC overloaded")
+)
+
+const wireVersion = "2.0"
+
+// wireCombined has all the fields of both Request and Response.
+// We can decode this and then work out which it is.
+type wireCombined struct {
+ VersionTag string `json:"jsonrpc"`
+ ID interface{} `json:"id,omitempty"`
+ Method string `json:"method,omitempty"`
+ Params json.RawMessage `json:"params,omitempty"`
+ Result json.RawMessage `json:"result,omitempty"`
+ Error *wireError `json:"error,omitempty"`
+}
+
+// wireError represents a structured error in a Response.
+type wireError struct {
+ // Code is an error code indicating the type of failure.
+ Code int64 `json:"code"`
+ // Message is a short description of the error.
+ Message string `json:"message"`
+ // Data is optional structured data containing additional information about the error.
+ Data json.RawMessage `json:"data,omitempty"`
+}
+
+// NewError returns an error that will encode on the wire correctly.
+// The standard codes are made available from this package, this function should
+// only be used to build errors for application specific codes as allowed by the
+// specification.
+func NewError(code int64, message string) error {
+ return &wireError{
+ Code: code,
+ Message: message,
+ }
+}
+
+func (err *wireError) Error() string {
+ return err.Message
+}
diff --git a/internal/jsonrpc2_v2/wire_test.go b/internal/jsonrpc2_v2/wire_test.go
new file mode 100644
index 0000000..e933737
--- /dev/null
+++ b/internal/jsonrpc2_v2/wire_test.go
@@ -0,0 +1,118 @@
+// Copyright 2020 The Go Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package jsonrpc2_test
+
+import (
+ "bytes"
+ "encoding/json"
+ "reflect"
+ "testing"
+
+ jsonrpc2 "golang.org/x/tools/internal/jsonrpc2_v2"
+)
+
+func TestWireMessage(t *testing.T) {
+ for _, test := range []struct {
+ name string
+ msg jsonrpc2.Message
+ encoded []byte
+ }{{
+ name: "notification",
+ msg: newNotification("alive", nil),
+ encoded: []byte(`{"jsonrpc":"2.0","method":"alive"}`),
+ }, {
+ name: "call",
+ msg: newCall("msg1", "ping", nil),
+ encoded: []byte(`{"jsonrpc":"2.0","id":"msg1","method":"ping"}`),
+ }, {
+ name: "response",
+ msg: newResponse("msg2", "pong", nil),
+ encoded: []byte(`{"jsonrpc":"2.0","id":"msg2","result":"pong"}`),
+ }, {
+ name: "numerical id",
+ msg: newCall(1, "poke", nil),
+ encoded: []byte(`{"jsonrpc":"2.0","id":1,"method":"poke"}`),
+ }, {
+ // originally reported in #39719, this checks that result is not present if
+ // it is an error response
+ name: "computing fix edits",
+ msg: newResponse(3, nil, jsonrpc2.NewError(0, "computing fix edits")),
+ encoded: []byte(`{
+ "jsonrpc":"2.0",
+ "id":3,
+ "error":{
+ "code":0,
+ "message":"computing fix edits"
+ }
+ }`),
+ }} {
+ b, err := jsonrpc2.EncodeMessage(test.msg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ checkJSON(t, b, test.encoded)
+ msg, err := jsonrpc2.DecodeMessage(test.encoded)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(msg, test.msg) {
+ t.Errorf("decoded message does not match\nGot:\n%+#v\nWant:\n%+#v", msg, test.msg)
+ }
+ }
+}
+
+func newNotification(method string, params interface{}) jsonrpc2.Message {
+ msg, err := jsonrpc2.NewNotification(method, params)
+ if err != nil {
+ panic(err)
+ }
+ return msg
+}
+
+func newID(id interface{}) jsonrpc2.ID {
+ switch v := id.(type) {
+ case nil:
+ return jsonrpc2.ID{}
+ case string:
+ return jsonrpc2.StringID(v)
+ case int:
+ return jsonrpc2.Int64ID(int64(v))
+ case int64:
+ return jsonrpc2.Int64ID(v)
+ default:
+ panic("invalid ID type")
+ }
+}
+
+func newCall(id interface{}, method string, params interface{}) jsonrpc2.Message {
+ msg, err := jsonrpc2.NewCall(newID(id), method, params)
+ if err != nil {
+ panic(err)
+ }
+ return msg
+}
+
+func newResponse(id interface{}, result interface{}, rerr error) jsonrpc2.Message {
+ msg, err := jsonrpc2.NewResponse(newID(id), result, rerr)
+ if err != nil {
+ panic(err)
+ }
+ return msg
+}
+
+func checkJSON(t *testing.T, got, want []byte) {
+ // compare the compact form, to allow for formatting differences
+ g := &bytes.Buffer{}
+ if err := json.Compact(g, []byte(got)); err != nil {
+ t.Fatal(err)
+ }
+ w := &bytes.Buffer{}
+ if err := json.Compact(w, []byte(want)); err != nil {
+ t.Fatal(err)
+ }
+ if g.String() != w.String() {
+ t.Errorf("encoded message does not match\nGot:\n%s\nWant:\n%s", g, w)
+ }
+}