internal/jsonrpc2/servertest: support both TCP and pipe connection

Update the servertest package to support connecting to a jsonrpc2 server
using either TCP or io.Pipes. The latter is provided so that regtests
can more accurately mimic the current gopls execution mode, where gopls
is run as a sidecar and communicated with via a pipe.

Updates golang/go#36879

Change-Id: I0e14ed0e628333ba2cc7b088009f1887fcaa82a5
Reviewed-on: https://go-review.googlesource.com/c/tools/+/218777
Run-TryBot: Robert Findley <rfindley@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Heschi Kreinick <heschi@google.com>
diff --git a/gopls/test/gopls_test.go b/gopls/test/gopls_test.go
index 1adcf48..343ae21 100644
--- a/gopls/test/gopls_test.go
+++ b/gopls/test/gopls_test.go
@@ -43,7 +43,7 @@
 	ctx := tests.Context(t)
 	cache := cache.New(commandLineOptions)
 	ss := lsprpc.NewStreamServer(cache, false)
-	ts := servertest.NewServer(ctx, ss)
+	ts := servertest.NewTCPServer(ctx, ss)
 	for _, data := range data {
 		defer data.Exported.Cleanup()
 		t.Run(data.Folder, func(t *testing.T) {
diff --git a/internal/jsonrpc2/servertest/servertest.go b/internal/jsonrpc2/servertest/servertest.go
index b6ef7df..818662d 100644
--- a/internal/jsonrpc2/servertest/servertest.go
+++ b/internal/jsonrpc2/servertest/servertest.go
@@ -9,46 +9,118 @@
 import (
 	"context"
 	"fmt"
+	"io"
 	"net"
+	"sync"
 
 	"golang.org/x/tools/internal/jsonrpc2"
 )
 
-// Server is a helper for executing tests against a remote jsonrpc2 connection.
-// Once initialized, its Addr field may be used to connect a jsonrpc2 client.
-type Server struct {
+// Connector is the interface used to connect to a server.
+type Connector interface {
+	Connect(context.Context) *jsonrpc2.Conn
+}
+
+// TCPServer is a helper for executing tests against a remote jsonrpc2
+// connection. Once initialized, its Addr field may be used to connect a
+// jsonrpc2 client.
+type TCPServer struct {
 	Addr string
 
 	ln net.Listener
+	cls *closerList
 }
 
-// NewServer returns a new test server listening on local tcp port and serving
-// incoming jsonrpc2 streams using the provided stream server. It panics on any
-// error.
-func NewServer(ctx context.Context, server jsonrpc2.StreamServer) *Server {
+// NewTCPServer returns a new test server listening on local tcp port and
+// serving incoming jsonrpc2 streams using the provided stream server. It
+// panics on any error.
+func NewTCPServer(ctx context.Context, server jsonrpc2.StreamServer) *TCPServer {
 	ln, err := net.Listen("tcp", "127.0.0.1:0")
 	if err != nil {
 		panic(fmt.Sprintf("servertest: failed to listen: %v", err))
 	}
 	go jsonrpc2.Serve(ctx, ln, server)
-	return &Server{Addr: ln.Addr().String(), ln: ln}
+	return &TCPServer{Addr: ln.Addr().String(), ln: ln, cls: &closerList{}}
 }
 
 // Connect dials the test server and returns a jsonrpc2 Connection that is
 // ready for use.
-func (s *Server) Connect(ctx context.Context) *jsonrpc2.Conn {
+func (s *TCPServer) Connect(ctx context.Context) *jsonrpc2.Conn {
 	netConn, err := net.Dial("tcp", s.Addr)
 	if err != nil {
 		panic(fmt.Sprintf("servertest: failed to connect to test instance: %v", err))
 	}
+	s.cls.add(func() {
+		netConn.Close()
+	})
 	conn := jsonrpc2.NewConn(jsonrpc2.NewHeaderStream(netConn, netConn))
 	go conn.Run(ctx)
 	return conn
 }
 
-// Close is a placeholder for proper test server shutdown.
-// TODO: implement proper shutdown, which gracefully closes existing
-// connections to the test server.
-func (s *Server) Close() error {
+// Close closes all connected pipes.
+func (s *TCPServer) Close() error {
+	s.cls.closeAll()
 	return nil
 }
+
+// PipeServer is a test server that handles connections over io.Pipes.
+type PipeServer struct {
+	server jsonrpc2.StreamServer
+	cls *closerList
+}
+
+// NewPipeServer returns a test server that can be connected to via io.Pipes.
+func NewPipeServer(ctx context.Context, server jsonrpc2.StreamServer) *PipeServer {
+	return &PipeServer{server: server, cls: &closerList{}}
+}
+
+// Connect creates new io.Pipes and binds them to the underlying StreamServer.
+func (s *PipeServer) Connect(ctx context.Context) *jsonrpc2.Conn {
+	// Pipes connect like this:
+	// Client🡒(sWriter)🡒(sReader)🡒Server
+	//       🡔(cReader)🡐(cWriter)🡗
+	sReader, sWriter := io.Pipe()
+	cReader, cWriter := io.Pipe()
+	s.cls.add(func() {
+		sReader.Close()
+		sWriter.Close()
+		cReader.Close()
+		cWriter.Close()
+	})
+	serverStream := jsonrpc2.NewStream(sReader, cWriter)
+	go s.server.ServeStream(ctx, serverStream)
+
+	clientStream := jsonrpc2.NewStream(cReader, sWriter)
+	clientConn := jsonrpc2.NewConn(clientStream)
+	go clientConn.Run(ctx)
+	return clientConn
+}
+
+// Close closes all connected pipes.
+func (s *PipeServer) Close() error {
+	s.cls.closeAll()
+	return nil
+}
+
+// closerList tracks closers to run when a testserver is closed.  This is a
+// convenience, so that callers don't have to worry about closing each
+// connection.
+type closerList struct {
+	mu sync.Mutex
+	closers []func()
+}
+
+func (l *closerList) add(closer func()) {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	l.closers = append(l.closers, closer)
+}
+
+func (l *closerList) closeAll() {
+	l.mu.Lock()
+	defer l.mu.Unlock()
+	for _, closer := range l.closers {
+		closer()
+	}
+}
diff --git a/internal/jsonrpc2/servertest/servertest_test.go b/internal/jsonrpc2/servertest/servertest_test.go
index 578a56e..851c0f1 100644
--- a/internal/jsonrpc2/servertest/servertest_test.go
+++ b/internal/jsonrpc2/servertest/servertest_test.go
@@ -30,14 +30,31 @@
 func TestTestServer(t *testing.T) {
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
-	ts := NewServer(ctx, jsonrpc2.HandlerServer(fakeHandler{}))
-	defer ts.Close()
-	conn := ts.Connect(ctx)
-	var got msg
-	if err := conn.Call(ctx, "ping", &msg{"ping"}, &got); err != nil {
-		t.Fatal(err)
+	server := jsonrpc2.HandlerServer(fakeHandler{})
+	tcpTS := NewTCPServer(ctx, server)
+	defer tcpTS.Close()
+	pipeTS := NewPipeServer(ctx, server)
+	defer pipeTS.Close()
+
+
+	tests := []struct {
+		name string
+		connector Connector
+	} {
+		{"tcp", tcpTS},
+		{"pipe", pipeTS},
 	}
-	if want := "pong"; got.Msg != want {
-		t.Errorf("conn.Call(...): returned %q, want %q", got, want)
+
+	for _, test := range tests {
+		t.Run(test.name, func(t *testing.T) {
+			conn := test.connector.Connect(ctx)
+			var got msg
+			if err := conn.Call(ctx, "ping", &msg{"ping"}, &got); err != nil {
+				t.Fatal(err)
+			}
+			if want := "pong"; got.Msg != want {
+				t.Errorf("conn.Call(...): returned %q, want %q", got, want)
+			}
+		})
 	}
 }
diff --git a/internal/lsp/cmd/cmd_test.go b/internal/lsp/cmd/cmd_test.go
index 0e9c97f..56ebace 100644
--- a/internal/lsp/cmd/cmd_test.go
+++ b/internal/lsp/cmd/cmd_test.go
@@ -45,10 +45,10 @@
 	}
 }
 
-func testServer(ctx context.Context) *servertest.Server {
+func testServer(ctx context.Context) *servertest.TCPServer {
 	cache := cache.New(nil)
 	ss := lsprpc.NewStreamServer(cache, false)
-	return servertest.NewServer(ctx, ss)
+	return servertest.NewTCPServer(ctx, ss)
 }
 
 func TestDefinitionHelpExample(t *testing.T) {
diff --git a/internal/lsp/lsprpc/lsprpc_test.go b/internal/lsp/lsprpc/lsprpc_test.go
index b7c20fe..4ecf974 100644
--- a/internal/lsp/lsprpc/lsprpc_test.go
+++ b/internal/lsp/lsprpc/lsprpc_test.go
@@ -46,7 +46,7 @@
 			return server
 		},
 	}
-	ts := servertest.NewServer(ctx, ss)
+	ts := servertest.NewPipeServer(ctx, ss)
 	cc := ts.Connect(ctx)
 	cc.AddHandler(protocol.ClientHandler(client))
 
@@ -100,14 +100,14 @@
 		},
 	}
 	ctx := context.Background()
-	tsDirect := servertest.NewServer(ctx, ss)
+	tsDirect := servertest.NewTCPServer(ctx, ss)
 
 	forwarder := NewForwarder(tsDirect.Addr, false)
-	tsForwarded := servertest.NewServer(ctx, forwarder)
+	tsForwarded := servertest.NewPipeServer(ctx, forwarder)
 
 	tests := []struct {
 		serverType string
-		ts         *servertest.Server
+		ts         servertest.Connector
 	}{
 		{"direct", tsDirect},
 		{"forwarder", tsForwarded},
diff --git a/internal/lsp/regtest/env.go b/internal/lsp/regtest/env.go
index 8bc100f..61eca0b 100644
--- a/internal/lsp/regtest/env.go
+++ b/internal/lsp/regtest/env.go
@@ -40,7 +40,7 @@
 // remote), any tests that execute on the same Runner will share the same
 // state.
 type Runner struct {
-	ts      *servertest.Server
+	ts      *servertest.TCPServer
 	modes   EnvMode
 	timeout time.Duration
 }
@@ -49,7 +49,7 @@
 // run tests.
 func NewTestRunner(modes EnvMode, testTimeout time.Duration) *Runner {
 	ss := lsprpc.NewStreamServer(cache.New(nil), false)
-	ts := servertest.NewServer(context.Background(), ss)
+	ts := servertest.NewTCPServer(context.Background(), ss)
 	return &Runner{
 		ts:      ts,
 		modes:   modes,
@@ -69,9 +69,9 @@
 	t.Helper()
 
 	tests := []struct {
-		name       string
-		mode       EnvMode
-		makeServer func(context.Context, *testing.T) (*servertest.Server, func())
+		name         string
+		mode         EnvMode
+		getConnector func(context.Context, *testing.T) (servertest.Connector, func())
 	}{
 		{"singleton", Singleton, r.singletonEnv},
 		{"shared", Shared, r.sharedEnv},
@@ -92,7 +92,7 @@
 				t.Fatal(err)
 			}
 			defer ws.Close()
-			ts, cleanup := tc.makeServer(ctx, t)
+			ts, cleanup := tc.getConnector(ctx, t)
 			defer cleanup()
 			env := NewEnv(ctx, t, ws, ts)
 			test(ctx, t, env)
@@ -100,22 +100,22 @@
 	}
 }
 
-func (r *Runner) singletonEnv(ctx context.Context, t *testing.T) (*servertest.Server, func()) {
+func (r *Runner) singletonEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) {
 	ss := lsprpc.NewStreamServer(cache.New(nil), false)
-	ts := servertest.NewServer(ctx, ss)
+	ts := servertest.NewPipeServer(ctx, ss)
 	cleanup := func() {
 		ts.Close()
 	}
 	return ts, cleanup
 }
 
-func (r *Runner) sharedEnv(ctx context.Context, t *testing.T) (*servertest.Server, func()) {
+func (r *Runner) sharedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) {
 	return r.ts, func() {}
 }
 
-func (r *Runner) forwardedEnv(ctx context.Context, t *testing.T) (*servertest.Server, func()) {
+func (r *Runner) forwardedEnv(ctx context.Context, t *testing.T) (servertest.Connector, func()) {
 	forwarder := lsprpc.NewForwarder(r.ts.Addr, false)
-	ts2 := servertest.NewServer(ctx, forwarder)
+	ts2 := servertest.NewTCPServer(ctx, forwarder)
 	cleanup := func() {
 		ts2.Close()
 	}
@@ -134,7 +134,7 @@
 	// but they are available if needed.
 	W      *fake.Workspace
 	E      *fake.Editor
-	Server *servertest.Server
+	Server servertest.Connector
 
 	// mu guards the fields below, for the purpose of checking conditions on
 	// every change to diagnostics.
@@ -154,7 +154,7 @@
 
 // NewEnv creates a new test environment using the given workspace and gopls
 // server.
-func NewEnv(ctx context.Context, t *testing.T, ws *fake.Workspace, ts *servertest.Server) *Env {
+func NewEnv(ctx context.Context, t *testing.T, ws *fake.Workspace, ts servertest.Connector) *Env {
 	t.Helper()
 	conn := ts.Connect(ctx)
 	editor, err := fake.NewConnectedEditor(ctx, ws, conn)