Merge pull request #517 from zellyn/zellyn-grpclog-comments

Comment: explain concurrent access to grpclog.logger
diff --git a/call_test.go b/call_test.go
index 22e42c2..58cef3c 100644
--- a/call_test.go
+++ b/call_test.go
@@ -34,6 +34,7 @@
 package grpc
 
 import (
+	"fmt"
 	"io"
 	"math"
 	"net"
@@ -90,7 +91,8 @@
 		var v string
 		codec := testCodec{}
 		if err := codec.Unmarshal(req, &v); err != nil {
-			t.Fatalf("Failed to unmarshal the received message %v", err)
+			t.Errorf("Failed to unmarshal the received message: %v", err)
+			return
 		}
 		if v != expectedRequest {
 			h.t.WriteStatus(s, codes.Internal, string(make([]byte, sizeLargeErr)))
@@ -100,22 +102,26 @@
 	// send a response back to end the stream.
 	reply, err := encode(testCodec{}, &expectedResponse, nil, nil)
 	if err != nil {
-		t.Fatalf("Failed to encode the response: %v", err)
+		t.Errorf("Failed to encode the response: %v", err)
+		return
 	}
 	h.t.Write(s, reply, &transport.Options{})
 	h.t.WriteStatus(s, codes.OK, "")
 }
 
 type server struct {
-	lis  net.Listener
-	port string
-	// channel to signal server is ready to serve.
-	readyChan chan bool
-	mu        sync.Mutex
-	conns     map[transport.ServerTransport]bool
+	lis        net.Listener
+	port       string
+	startedErr chan error // sent nil or an error after server starts
+	mu         sync.Mutex
+	conns      map[transport.ServerTransport]bool
 }
 
-// start starts server. Other goroutines should block on s.readyChan for futher operations.
+func newTestServer() *server {
+	return &server{startedErr: make(chan error, 1)}
+}
+
+// start starts server. Other goroutines should block on s.startedErr for futher operations.
 func (s *server) start(t *testing.T, port int, maxStreams uint32) {
 	var err error
 	if port == 0 {
@@ -124,17 +130,17 @@
 		s.lis, err = net.Listen("tcp", ":"+strconv.Itoa(port))
 	}
 	if err != nil {
-		t.Fatalf("failed to listen: %v", err)
+		s.startedErr <- fmt.Errorf("failed to listen: %v", err)
+		return
 	}
 	_, p, err := net.SplitHostPort(s.lis.Addr().String())
 	if err != nil {
-		t.Fatalf("failed to parse listener address: %v", err)
+		s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
+		return
 	}
 	s.port = p
 	s.conns = make(map[transport.ServerTransport]bool)
-	if s.readyChan != nil {
-		close(s.readyChan)
-	}
+	s.startedErr <- nil
 	for {
 		conn, err := s.lis.Accept()
 		if err != nil {
@@ -161,7 +167,10 @@
 
 func (s *server) wait(t *testing.T, timeout time.Duration) {
 	select {
-	case <-s.readyChan:
+	case err := <-s.startedErr:
+		if err != nil {
+			t.Fatal(err)
+		}
 	case <-time.After(timeout):
 		t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
 	}
@@ -178,7 +187,7 @@
 }
 
 func setUp(t *testing.T, port int, maxStreams uint32) (*server, *ClientConn) {
-	server := &server{readyChan: make(chan bool)}
+	server := newTestServer()
 	go server.start(t, port, maxStreams)
 	server.wait(t, 2*time.Second)
 	addr := "localhost:" + server.port
diff --git a/picker_test.go b/picker_test.go
index e328205..dd29497 100644
--- a/picker_test.go
+++ b/picker_test.go
@@ -103,7 +103,7 @@
 func startServers(t *testing.T, numServers, port int, maxStreams uint32) ([]*server, *testNameResolver) {
 	var servers []*server
 	for i := 0; i < numServers; i++ {
-		s := &server{readyChan: make(chan bool)}
+		s := newTestServer()
 		servers = append(servers, s)
 		go s.start(t, port, maxStreams)
 		s.wait(t, 2*time.Second)
diff --git a/test/end2end_test.go b/test/end2end_test.go
index d0e4ea2..4abe165 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -286,21 +286,23 @@
 	waitC := make(chan struct{})
 	go func() {
 		defer close(waitC)
-		argSize := 271828
-		respSize := 314159
+		const argSize = 271828
+		const respSize = 314159
 
-		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(argSize))
+		payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
 		if err != nil {
-			t.Fatal(err)
+			t.Error(err)
+			return
 		}
 
 		req := &testpb.SimpleRequest{
 			ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
-			ResponseSize: proto.Int32(int32(respSize)),
+			ResponseSize: proto.Int32(respSize),
 			Payload:      payload,
 		}
 		if _, err := tc.UnaryCall(context.Background(), req); err == nil {
-			t.Fatalf("TestService/UnaryCall(_, _) = _, <nil>, want _, non-nil")
+			t.Errorf("TestService/UnaryCall(_, _) = _, <nil>, want _, non-nil")
+			return
 		}
 	}()
 	// Block untill reconnect times out.
@@ -664,29 +666,32 @@
 }
 
 func performOneRPC(t *testing.T, tc testpb.TestServiceClient, wg *sync.WaitGroup) {
-	argSize := 2718
-	respSize := 314
+	defer wg.Done()
+	const argSize = 2718
+	const respSize = 314
 
-	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(argSize))
+	payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, argSize)
 	if err != nil {
-		t.Fatal(err)
+		t.Error(err)
+		return
 	}
 
 	req := &testpb.SimpleRequest{
 		ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
-		ResponseSize: proto.Int32(int32(respSize)),
+		ResponseSize: proto.Int32(respSize),
 		Payload:      payload,
 	}
 	reply, err := tc.UnaryCall(context.Background(), req)
 	if err != nil {
-		t.Fatalf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
+		t.Errorf("TestService/UnaryCall(_, _) = _, %v, want _, <nil>", err)
+		return
 	}
 	pt := reply.GetPayload().GetType()
 	ps := len(reply.GetPayload().GetBody())
 	if pt != testpb.PayloadType_COMPRESSABLE || ps != respSize {
-		t.Fatalf("Got the reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
+		t.Errorf("Got reply with type %d len %d; want %d, %d", pt, ps, testpb.PayloadType_COMPRESSABLE, respSize)
+		return
 	}
-	wg.Done()
 }
 
 func TestRetry(t *testing.T) {
@@ -853,7 +858,7 @@
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
 	// Make sure setting ack has been sent.
-	time.Sleep(2*time.Second)
+	time.Sleep(2 * time.Second)
 	stream, err := tc.FullDuplexCall(context.Background())
 	if err != nil {
 		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 84c8d0f..4b50f73 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -35,6 +35,7 @@
 
 import (
 	"bytes"
+	"fmt"
 	"io"
 	"math"
 	"net"
@@ -50,12 +51,11 @@
 )
 
 type server struct {
-	lis  net.Listener
-	port string
-	// channel to signal server is ready to serve.
-	readyChan chan bool
-	mu        sync.Mutex
-	conns     map[ServerTransport]bool
+	lis        net.Listener
+	port       string
+	startedErr chan error // error (or nil) with server start value
+	mu         sync.Mutex
+	conns      map[ServerTransport]bool
 }
 
 var (
@@ -136,17 +136,17 @@
 		s.lis, err = net.Listen("tcp", ":"+strconv.Itoa(port))
 	}
 	if err != nil {
-		t.Fatalf("failed to listen: %v", err)
+		s.startedErr <- fmt.Errorf("failed to listen: %v", err)
+		return
 	}
 	_, p, err := net.SplitHostPort(s.lis.Addr().String())
 	if err != nil {
-		t.Fatalf("failed to parse listener address: %v", err)
+		s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
+		return
 	}
 	s.port = p
 	s.conns = make(map[ServerTransport]bool)
-	if s.readyChan != nil {
-		close(s.readyChan)
-	}
+	s.startedErr <- nil
 	for {
 		conn, err := s.lis.Accept()
 		if err != nil {
@@ -182,7 +182,10 @@
 
 func (s *server) wait(t *testing.T, timeout time.Duration) {
 	select {
-	case <-s.readyChan:
+	case err := <-s.startedErr:
+		if err != nil {
+			t.Fatal(err)
+		}
 	case <-time.After(timeout):
 		t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
 	}
@@ -199,7 +202,7 @@
 }
 
 func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
-	server := &server{readyChan: make(chan bool)}
+	server := &server{startedErr: make(chan error, 1)}
 	go server.start(t, port, maxStreams, ht)
 	server.wait(t, 2*time.Second)
 	addr := "localhost:" + server.port