Merge pull request #546 from bradfitz/quiet

test: reduce end2end log spam
diff --git a/test/end2end_test.go b/test/end2end_test.go
index e19333b..e71cf6f 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -38,8 +38,10 @@
 	"flag"
 	"fmt"
 	"io"
+	"log"
 	"math"
 	"net"
+	"os"
 	"reflect"
 	"runtime"
 	"sort"
@@ -54,6 +56,7 @@
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/credentials"
+	"google.golang.org/grpc/grpclog"
 	"google.golang.org/grpc/health"
 	healthpb "google.golang.org/grpc/health/grpc_health_v1alpha"
 	"google.golang.org/grpc/metadata"
@@ -280,6 +283,13 @@
 
 func TestReconnectTimeout(t *testing.T) {
 	defer leakCheck(t)()
+	restore := declareLogNoise(t,
+		"transport: http2Client.notifyError got notified that the client transport was broken",
+		"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport",
+		"grpc: Conn.transportMonitor exits due to: grpc: timed out trying to connect",
+	)
+	defer restore()
+
 	lis, err := net.Listen("tcp", ":0")
 	if err != nil {
 		t.Fatalf("Failed to listen: %v", err)
@@ -520,6 +530,14 @@
 }
 
 func testTimeoutOnDeadServer(t *testing.T, e env) {
+	restore := declareLogNoise(t,
+		"transport: http2Client.notifyError got notified that the client transport was broken EOF",
+		"grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing",
+		"grpc: Conn.resetTransport failed to create client transport: connection error",
+		"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
+	)
+	defer restore()
+
 	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
@@ -554,6 +572,7 @@
 		t.Fatalf("cc.State() = %s, %v, want %s or %s, <nil>", state, err, grpc.Connecting, grpc.TransientFailure)
 	}
 	cc.Close()
+	awaitNewConnLogOutput()
 }
 
 func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
@@ -591,6 +610,12 @@
 }
 
 func testHealthCheckOnFailure(t *testing.T, e env) {
+	defer leakCheck(t)()
+	restore := declareLogNoise(t,
+		"Failed to dial ",
+		"grpc: the client connection is closing; please retry",
+	)
+	defer restore()
 	hs := health.NewHealthServer()
 	hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1)
 	s, addr := serverSetUp(t, true, hs, math.MaxUint32, nil, nil, e)
@@ -599,6 +624,7 @@
 	if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.Health"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") {
 		t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded)
 	}
+	awaitNewConnLogOutput()
 }
 
 func TestHealthCheckOff(t *testing.T) {
@@ -841,6 +867,10 @@
 // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy
 // and error-prone paths.
 func testRetry(t *testing.T, e env) {
+	restore := declareLogNoise(t,
+		"transport: http2Client.notifyError got notified that the client transport was broken",
+	)
+	defer restore()
 	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
@@ -922,6 +952,10 @@
 }
 
 func testCancel(t *testing.T, e env) {
+	restore := declareLogNoise(t,
+		"grpc: the client connection is closing; please retry",
+	)
+	defer restore()
 	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
@@ -945,6 +979,9 @@
 	if grpc.Code(err) != codes.Canceled {
 		t.Fatalf(`TestService/UnaryCall(_, _) = %v, %v; want <nil>, error code: %d`, reply, err, codes.Canceled)
 	}
+	cc.Close()
+
+	awaitNewConnLogOutput()
 }
 
 func TestCancelNoIO(t *testing.T) {
@@ -1585,3 +1622,126 @@
 		}
 	}
 }
+
+type lockingWriter struct {
+	mu sync.Mutex
+	w  io.Writer
+}
+
+func (lw *lockingWriter) Write(p []byte) (n int, err error) {
+	lw.mu.Lock()
+	defer lw.mu.Unlock()
+	return lw.w.Write(p)
+}
+
+func (lw *lockingWriter) setWriter(w io.Writer) {
+	lw.mu.Lock()
+	defer lw.mu.Unlock()
+	lw.w = w
+}
+
+var testLogOutput = &lockingWriter{w: os.Stderr}
+
+// awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to
+// terminate, if they're still running. It spams logs with this
+// message.  We wait for it so our log filter is still
+// active. Otherwise the "defer restore()" at the top of various test
+// functions restores our log filter and then the goroutine spams.
+func awaitNewConnLogOutput() {
+	awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry")
+}
+
+func awaitLogOutput(maxWait time.Duration, phrase string) {
+	pb := []byte(phrase)
+
+	timer := time.NewTimer(maxWait)
+	defer timer.Stop()
+	wakeup := make(chan bool, 1)
+	for {
+		if logOutputHasContents(pb, wakeup) {
+			return
+		}
+		select {
+		case <-timer.C:
+			// Too slow. Oh well.
+			return
+		case <-wakeup:
+		}
+	}
+}
+
+func logOutputHasContents(v []byte, wakeup chan<- bool) bool {
+	testLogOutput.mu.Lock()
+	defer testLogOutput.mu.Unlock()
+	fw, ok := testLogOutput.w.(*filterWriter)
+	if !ok {
+		return false
+	}
+	fw.mu.Lock()
+	defer fw.mu.Unlock()
+	if bytes.Contains(fw.buf.Bytes(), v) {
+		return true
+	}
+	fw.wakeup = wakeup
+	return false
+}
+
+func init() {
+	grpclog.SetLogger(log.New(testLogOutput, "", log.LstdFlags))
+}
+
+var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering")
+
+func noop() {}
+
+// declareLogNoise declares that t is expected to emit the following noisy phrases,
+// even on success. Those phrases will be filtered from grpclog output
+// and only be shown if *verbose_logs or t ends up failing.
+// The returned restore function should be called with defer to be run
+// before the test ends.
+func declareLogNoise(t *testing.T, phrases ...string) (restore func()) {
+	if *verboseLogs {
+		return noop
+	}
+	fw := &filterWriter{dst: os.Stderr, filter: phrases}
+	testLogOutput.setWriter(fw)
+	return func() {
+		if t.Failed() {
+			fw.mu.Lock()
+			defer fw.mu.Unlock()
+			if fw.buf.Len() > 0 {
+				t.Logf("Complete log output:\n%s", fw.buf.Bytes())
+			}
+		}
+		testLogOutput.setWriter(os.Stderr)
+	}
+}
+
+type filterWriter struct {
+	dst    io.Writer
+	filter []string
+
+	mu     sync.Mutex
+	buf    bytes.Buffer
+	wakeup chan<- bool // if non-nil, gets true on write
+}
+
+func (fw *filterWriter) Write(p []byte) (n int, err error) {
+	fw.mu.Lock()
+	fw.buf.Write(p)
+	if fw.wakeup != nil {
+		select {
+		case fw.wakeup <- true:
+		default:
+		}
+	}
+	fw.mu.Unlock()
+
+	ps := string(p)
+	for _, f := range fw.filter {
+		if strings.Contains(ps, f) {
+			return len(p), nil
+		}
+	}
+	return fw.dst.Write(p)
+}