internal/lsp: decouple message processing from stream processing.

The means that the stream reader can move forward while a message is being processed.
This will significantly improve responsivness and cancellation handling, and also
allow message handlers to send messages themselves, reducing the need to spin up
new go routines inside handlers.
The flow control changes from blocking to failing when a server is busy, which removes
the main current cause of deadlock, but may break non deadlock cases that currently wait
if the queue is not sufficiently large.

Change-Id: Ia73eb049b38d0651344abdbf16c477a8ce1a6fd1
Reviewed-on: https://go-review.googlesource.com/c/tools/+/170007
Reviewed-by: Rebecca Stambler <rstambler@golang.org>
diff --git a/internal/jsonrpc2/jsonrpc2.go b/internal/jsonrpc2/jsonrpc2.go
index 5c30829..95f9daa 100644
--- a/internal/jsonrpc2/jsonrpc2.go
+++ b/internal/jsonrpc2/jsonrpc2.go
@@ -19,16 +19,24 @@
 // Conn is a JSON RPC 2 client server connection.
 // Conn is bidirectional; it does not have a designated server or client end.
 type Conn struct {
-	seq        int64 // must only be accessed using atomic operations
-	Handler    Handler
-	Canceler   Canceler
-	Logger     Logger
-	stream     Stream
-	err        error
-	pendingMu  sync.Mutex // protects the pending map
-	pending    map[ID]chan *Response
-	handlingMu sync.Mutex // protects the handling map
-	handling   map[ID]handling
+	seq                int64 // must only be accessed using atomic operations
+	Handler            Handler
+	Canceler           Canceler
+	Logger             Logger
+	Capacity           int
+	RejectIfOverloaded bool
+	stream             Stream
+	err                error
+	pendingMu          sync.Mutex // protects the pending map
+	pending            map[ID]chan *Response
+	handlingMu         sync.Mutex // protects the handling map
+	handling           map[ID]handling
+}
+
+type queueEntry struct {
+	ctx context.Context
+	c   *Conn
+	r   *Request
 }
 
 // Handler is an option you can pass to NewConn to handle incoming requests.
@@ -237,11 +245,37 @@
 	Error      *Error           `json:"error,omitempty"`
 }
 
+func (c *Conn) deliver(ctx context.Context, q chan queueEntry, request *Request) bool {
+	e := queueEntry{ctx: ctx, c: c, r: request}
+	if !c.RejectIfOverloaded {
+		q <- e
+		return true
+	}
+	select {
+	case q <- e:
+		return true
+	default:
+		return false
+	}
+}
+
 // Run blocks until the connection is terminated, and returns any error that
 // caused the termination.
 // It must be called exactly once for each Conn.
 // It returns only when the reader is closed or there is an error in the stream.
 func (c *Conn) Run(ctx context.Context) error {
+	q := make(chan queueEntry, c.Capacity)
+	defer close(q)
+	// start the queue processor
+	go func() {
+		// TODO: idle notification?
+		for e := range q {
+			if e.ctx.Err() != nil {
+				continue
+			}
+			c.Handler(e.ctx, e.c, e.r)
+		}
+	}()
 	for {
 		// get the data for a message
 		data, err := c.stream.Read(ctx)
@@ -268,10 +302,11 @@
 			}
 			if request.IsNotify() {
 				c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
-				// we have a Notify, forward to the handler in a go routine
-				c.Handler(ctx, c, request)
+				// we have a Notify, add to the processor queue
+				c.deliver(ctx, q, request)
+				//TODO: log when we drop a message?
 			} else {
-				// we have a Call, forward to the handler in another go routine
+				// we have a Call, add to the processor queue
 				reqCtx, cancelReq := context.WithCancel(ctx)
 				c.handlingMu.Lock()
 				c.handling[*request.ID] = handling{
@@ -281,7 +316,10 @@
 				}
 				c.handlingMu.Unlock()
 				c.Logger(Receive, request.ID, -1, request.Method, request.Params, nil)
-				c.Handler(reqCtx, c, request)
+				if !c.deliver(reqCtx, q, request) {
+					// queue is full, reject the message by directly replying
+					c.Reply(ctx, request, nil, NewErrorf(CodeServerOverloaded, "no room in queue"))
+				}
 			}
 		case msg.ID != nil:
 			// we have a response, get the pending entry from the map
diff --git a/internal/jsonrpc2/wire.go b/internal/jsonrpc2/wire.go
index bb59ad5..bcf4d65 100644
--- a/internal/jsonrpc2/wire.go
+++ b/internal/jsonrpc2/wire.go
@@ -28,6 +28,10 @@
 	CodeInvalidParams = -32602
 	// CodeInternalError is not currently returned but defined for completeness.
 	CodeInternalError = -32603
+
+	//CodeServerOverloaded is returned when a message was refused due to a
+	//server being temporarily unable to accept any new messages.
+	CodeServerOverloaded = -32000
 )
 
 // Request is sent to a server to represent a Call or Notify operaton.