internal/jsonrpc2: clean up the tests

This runs the tests as sub tests, and makes sure they clean up
correctly.
It also installs the telemetry debug handlers.

Change-Id: I75b4f7a19be378603d97875fc31091be316949e0
Reviewed-on: https://go-review.googlesource.com/c/tools/+/228718
Run-TryBot: Ian Cottrell <iancottrell@google.com>
TryBot-Result: Gobot Gobot <gobot@golang.org>
Reviewed-by: Robert Findley <rfindley@google.com>
diff --git a/internal/jsonrpc2/jsonrpc2_test.go b/internal/jsonrpc2/jsonrpc2_test.go
index 0de16d2..55b3d87 100644
--- a/internal/jsonrpc2/jsonrpc2_test.go
+++ b/internal/jsonrpc2/jsonrpc2_test.go
@@ -7,14 +7,17 @@
 import (
 	"context"
 	"encoding/json"
+	"errors"
 	"flag"
 	"fmt"
 	"io"
 	"path"
 	"reflect"
+	"sync"
 	"testing"
 
 	"golang.org/x/tools/internal/jsonrpc2"
+	"golang.org/x/tools/internal/telemetry/export/eventtest"
 )
 
 var logRPC = flag.Bool("logrpc", false, "Enable jsonrpc2 communication logging")
@@ -58,47 +61,52 @@
 	}
 }
 
-func TestPlainCall(t *testing.T) {
-	ctx := context.Background()
-	a, b := prepare(ctx, t, false)
-	for _, test := range callTests {
-		results := test.newResults()
-		if _, err := a.Call(ctx, test.method, test.params, results); err != nil {
-			t.Fatalf("%v:Call failed: %v", test.method, err)
+func TestCall(t *testing.T) {
+	ctx := eventtest.NewContext(context.Background(), t)
+	for _, headers := range []bool{false, true} {
+		name := "Plain"
+		if headers {
+			name = "Headers"
 		}
-		test.verifyResults(t, results)
-		if _, err := b.Call(ctx, test.method, test.params, results); err != nil {
-			t.Fatalf("%v:Call failed: %v", test.method, err)
-		}
-		test.verifyResults(t, results)
+		t.Run(name, func(t *testing.T) {
+			ctx := eventtest.NewContext(ctx, t)
+			a, b, done := prepare(ctx, t, headers)
+			defer done()
+			for _, test := range callTests {
+				t.Run(test.method, func(t *testing.T) {
+					ctx := eventtest.NewContext(ctx, t)
+					results := test.newResults()
+					if _, err := a.Call(ctx, test.method, test.params, results); err != nil {
+						t.Fatalf("%v:Call failed: %v", test.method, err)
+					}
+					test.verifyResults(t, results)
+					if _, err := b.Call(ctx, test.method, test.params, results); err != nil {
+						t.Fatalf("%v:Call failed: %v", test.method, err)
+					}
+					test.verifyResults(t, results)
+				})
+			}
+		})
 	}
 }
 
-func TestHeaderCall(t *testing.T) {
-	ctx := context.Background()
-	a, b := prepare(ctx, t, true)
-	for _, test := range callTests {
-		results := test.newResults()
-		if _, err := a.Call(ctx, test.method, test.params, results); err != nil {
-			t.Fatalf("%v:Call failed: %v", test.method, err)
-		}
-		test.verifyResults(t, results)
-		if _, err := b.Call(ctx, test.method, test.params, results); err != nil {
-			t.Fatalf("%v:Call failed: %v", test.method, err)
-		}
-		test.verifyResults(t, results)
-	}
-}
-
-func prepare(ctx context.Context, t *testing.T, withHeaders bool) (*jsonrpc2.Conn, *jsonrpc2.Conn) {
+func prepare(ctx context.Context, t *testing.T, withHeaders bool) (*jsonrpc2.Conn, *jsonrpc2.Conn, func()) {
+	// make a wait group that can be used to wait for the system to shut down
+	wg := &sync.WaitGroup{}
 	aR, bW := io.Pipe()
 	bR, aW := io.Pipe()
-	a := run(ctx, t, withHeaders, aR, aW)
-	b := run(ctx, t, withHeaders, bR, bW)
-	return a, b
+	a := run(ctx, t, withHeaders, aR, aW, wg)
+	b := run(ctx, t, withHeaders, bR, bW, wg)
+	return a, b, func() {
+		// we close the main writer, this should cascade through the server and
+		// cause normal shutdown of the entire chain
+		aW.Close()
+		// this should then wait for that entire cascade,
+		wg.Wait()
+	}
 }
 
-func run(ctx context.Context, t *testing.T, withHeaders bool, r io.ReadCloser, w io.WriteCloser) *jsonrpc2.Conn {
+func run(ctx context.Context, t *testing.T, withHeaders bool, r io.ReadCloser, w io.WriteCloser, wg *sync.WaitGroup) *jsonrpc2.Conn {
 	var stream jsonrpc2.Stream
 	if withHeaders {
 		stream = jsonrpc2.NewHeaderStream(r, w)
@@ -106,12 +114,19 @@
 		stream = jsonrpc2.NewStream(r, w)
 	}
 	conn := jsonrpc2.NewConn(stream)
+	wg.Add(1)
 	go func() {
 		defer func() {
+			// this will happen when Run returns, which means at least one of the
+			// streams has already been closed
+			// we close both streams anyway, this may be redundant but is safe
 			r.Close()
 			w.Close()
+			// and then signal that this connection is done
+			wg.Done()
 		}()
-		if err := conn.Run(ctx, testHandler(*logRPC)); err != nil {
+		err := conn.Run(ctx, testHandler(*logRPC))
+		if err != nil && !errors.Is(err, jsonrpc2.ErrDisconnected) {
 			t.Errorf("Stream failed: %v", err)
 		}
 	}()
diff --git a/internal/jsonrpc2/serve.go b/internal/jsonrpc2/serve.go
index fe1899e..d3d8370 100644
--- a/internal/jsonrpc2/serve.go
+++ b/internal/jsonrpc2/serve.go
@@ -7,10 +7,11 @@
 import (
 	"context"
 	"fmt"
-	"log"
 	"net"
 	"os"
 	"time"
+
+	"golang.org/x/tools/internal/telemetry/event"
 )
 
 // NOTE: This file provides an experimental API for serving multiple remote
@@ -96,7 +97,7 @@
 		case err := <-doneListening:
 			return err
 		case err := <-closedConns:
-			log.Printf("closed a connection with error: %v", err)
+			event.Error(ctx, "closed a connection", err)
 			activeConns--
 			if activeConns == 0 {
 				connTimer.Reset(idleTimeout)