internal/jsonrpc2: change StreamServer to operate on Conn instead of Stream

Really the name is wrong now, but this is just a stepping stone towards removing
it entirely in favour of a new listener/dialer/server/client pattern, so I am
minimizing the churn by leaving the names alone for now.

Change-Id: I771d117490763ebe05ed2a8c52d490deeb4d5333
Reviewed-on: https://go-review.googlesource.com/c/tools/+/232878
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Robert Findley <rfindley@google.com>
diff --git a/internal/jsonrpc2/serve.go b/internal/jsonrpc2/serve.go
index 670491d..6d0a9dc 100644
--- a/internal/jsonrpc2/serve.go
+++ b/internal/jsonrpc2/serve.go
@@ -20,25 +20,24 @@
 // semantics.
 
 // A StreamServer is used to serve incoming jsonrpc2 clients communicating over
-// a newly created stream.
+// a newly created connection.
 type StreamServer interface {
-	ServeStream(context.Context, Stream) error
+	ServeStream(context.Context, Conn) error
 }
 
 // The ServerFunc type is an adapter that implements the StreamServer interface
 // using an ordinary function.
-type ServerFunc func(context.Context, Stream) error
+type ServerFunc func(context.Context, Conn) error
 
 // ServeStream calls f(ctx, s).
-func (f ServerFunc) ServeStream(ctx context.Context, s Stream) error {
-	return f(ctx, s)
+func (f ServerFunc) ServeStream(ctx context.Context, c Conn) error {
+	return f(ctx, c)
 }
 
 // HandlerServer returns a StreamServer that handles incoming streams using the
 // provided handler.
 func HandlerServer(h Handler) StreamServer {
-	return ServerFunc(func(ctx context.Context, s Stream) error {
-		conn := NewConn(s)
+	return ServerFunc(func(ctx context.Context, conn Conn) error {
 		conn.Go(ctx, h)
 		<-conn.Done()
 		return conn.Err()
@@ -82,7 +81,7 @@
 			nc, err := ln.Accept()
 			if err != nil {
 				select {
-				case doneListening <- fmt.Errorf("Accept(): %v", err):
+				case doneListening <- fmt.Errorf("Accept(): %w", err):
 				case <-ctx.Done():
 				}
 				return
@@ -99,7 +98,8 @@
 			connTimer.Stop()
 			stream := NewHeaderStream(netConn)
 			go func() {
-				closedConns <- server.ServeStream(ctx, stream)
+				conn := NewConn(stream)
+				closedConns <- server.ServeStream(ctx, conn)
 				stream.Close()
 			}()
 		case err := <-doneListening:
diff --git a/internal/jsonrpc2/servertest/servertest.go b/internal/jsonrpc2/servertest/servertest.go
index 9c93f6f..9281bd8 100644
--- a/internal/jsonrpc2/servertest/servertest.go
+++ b/internal/jsonrpc2/servertest/servertest.go
@@ -88,7 +88,8 @@
 		cPipe.Close()
 	})
 	serverStream := s.framer(sPipe)
-	go s.server.ServeStream(ctx, serverStream)
+	serverConn := jsonrpc2.NewConn(serverStream)
+	go s.server.ServeStream(ctx, serverConn)
 
 	clientStream := s.framer(cPipe)
 	return jsonrpc2.NewConn(clientStream)
diff --git a/internal/lsp/cmd/serve.go b/internal/lsp/cmd/serve.go
index 273c38d..00157bd 100644
--- a/internal/lsp/cmd/serve.go
+++ b/internal/lsp/cmd/serve.go
@@ -97,7 +97,8 @@
 	if s.Trace && di != nil {
 		stream = protocol.LoggingStream(stream, di.LogWriter)
 	}
-	return ss.ServeStream(ctx, stream)
+	conn := jsonrpc2.NewConn(stream)
+	return ss.ServeStream(ctx, conn)
 }
 
 // parseAddr parses the -listen flag in to a network, and address.
diff --git a/internal/lsp/lsprpc/lsprpc.go b/internal/lsp/lsprpc/lsprpc.go
index 313f947..449537d 100644
--- a/internal/lsp/lsprpc/lsprpc.go
+++ b/internal/lsp/lsprpc/lsprpc.go
@@ -105,10 +105,9 @@
 
 // ServeStream implements the jsonrpc2.StreamServer interface, by handling
 // incoming streams using a new lsp server.
-func (s *StreamServer) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error {
+func (s *StreamServer) ServeStream(ctx context.Context, conn jsonrpc2.Conn) error {
 	index := atomic.AddInt64(&clientIndex, 1)
 
-	conn := jsonrpc2.NewConn(stream)
 	client := protocol.ClientDispatcher(conn)
 	session := s.cache.NewSession(ctx)
 	dc := &debugClient{
@@ -245,8 +244,7 @@
 
 // ServeStream dials the forwarder remote and binds the remote to serve the LSP
 // on the incoming stream.
-func (f *Forwarder) ServeStream(ctx context.Context, stream jsonrpc2.Stream) error {
-	clientConn := jsonrpc2.NewConn(stream)
+func (f *Forwarder) ServeStream(ctx context.Context, clientConn jsonrpc2.Conn) error {
 	client := protocol.ClientDispatcher(clientConn)
 
 	netConn, err := f.connectToRemote(ctx)