Merge pull request #505 from iamqizhao/master

Force flush headers frame for bi-di streaming
diff --git a/stream.go b/stream.go
index 63f934d..e649c4c 100644
--- a/stream.go
+++ b/stream.go
@@ -113,6 +113,7 @@
 	callHdr := &transport.CallHdr{
 		Host:   cc.authority,
 		Method: method,
+		Flush:  desc.ServerStreams&&desc.ClientStreams,
 	}
 	if cp != nil {
 		callHdr.SendCompress = cp.Type()
diff --git a/test/end2end_test.go b/test/end2end_test.go
index 22ca877..82f8373 100644
--- a/test/end2end_test.go
+++ b/test/end2end_test.go
@@ -327,7 +327,7 @@
 	return []env{{"tcp", nil, ""}, {"tcp", nil, "tls"}, {"unix", unixDialer, ""}, {"unix", unixDialer, "tls"}}
 }
 
-func serverSetUp(t *testing.T, hs *health.HealthServer, maxStream uint32, cg grpc.CompressorGenerator, dg grpc.DecompressorGenerator, e env) (s *grpc.Server, addr string) {
+func serverSetUp(t *testing.T, servON bool, hs *health.HealthServer, maxStream uint32, cg grpc.CompressorGenerator, dg grpc.DecompressorGenerator, e env) (s *grpc.Server, addr string) {
 	sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(maxStream), grpc.CompressON(cg), grpc.DecompressON(dg)}
 	la := ":0"
 	switch e.network {
@@ -350,7 +350,9 @@
 	if hs != nil {
 		healthpb.RegisterHealthServer(s, hs)
 	}
-	testpb.RegisterTestServiceServer(s, &testServer{security: e.security})
+	if servON {
+		testpb.RegisterTestServiceServer(s, &testServer{security: e.security})
+	}
 	go s.Serve(lis)
 	addr = la
 	switch e.network {
@@ -394,7 +396,7 @@
 }
 
 func testTimeoutOnDeadServer(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	ctx, _ := context.WithTimeout(context.Background(), time.Second)
@@ -448,7 +450,7 @@
 func testHealthCheckOnSuccess(t *testing.T, e env) {
 	hs := health.NewHealthServer()
 	hs.SetServingStatus("grpc.health.v1alpha.Health", 1)
-	s, addr := serverSetUp(t, hs, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, hs, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	defer tearDown(s, cc)
 	if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.Health"); err != nil {
@@ -465,7 +467,7 @@
 func testHealthCheckOnFailure(t *testing.T, e env) {
 	hs := health.NewHealthServer()
 	hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1)
-	s, addr := serverSetUp(t, hs, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, hs, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	defer tearDown(s, cc)
 	if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.Health"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") {
@@ -480,7 +482,7 @@
 }
 
 func testHealthCheckOff(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	defer tearDown(s, cc)
 	if _, err := healthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.Health") {
@@ -496,7 +498,7 @@
 
 func testHealthCheckServingStatus(t *testing.T, e env) {
 	hs := health.NewHealthServer()
-	s, addr := serverSetUp(t, hs, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, hs, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	defer tearDown(s, cc)
 	out, err := healthCheck(1*time.Second, cc, "")
@@ -535,7 +537,7 @@
 }
 
 func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, testAppUA, e)
 	// Wait until cc is connected.
 	ctx, _ := context.WithTimeout(context.Background(), time.Second)
@@ -579,7 +581,7 @@
 }
 
 func testFailedEmptyUnary(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -596,7 +598,7 @@
 }
 
 func testLargeUnary(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -631,7 +633,7 @@
 }
 
 func testMetadataUnaryRPC(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -697,7 +699,7 @@
 // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy
 // and error-prone paths.
 func testRetry(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -728,7 +730,7 @@
 
 // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
 func testRPCTimeout(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -761,7 +763,7 @@
 }
 
 func testCancel(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -794,7 +796,7 @@
 
 func testCancelNoIO(t *testing.T, e env) {
 	// Only allows 1 live stream per server transport.
-	s, addr := serverSetUp(t, nil, 1, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, 1, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -839,6 +841,28 @@
 	respSizes = []int{31415, 9, 2653, 58979}
 )
 
+func TestNoService(t *testing.T) {
+	for _, e := range listTestEnv() {
+		testNoService(t, e)
+	}
+}
+
+func testNoService(t *testing.T, e env) {
+	s, addr := serverSetUp(t, false, nil, math.MaxUint32, nil, nil, e)
+	cc := clientSetUp(t, addr, nil, nil, "", e)
+	tc := testpb.NewTestServiceClient(cc)
+	defer tearDown(s, cc)
+	// Make sure setting ack has been sent.
+	time.Sleep(2*time.Second)
+	stream, err := tc.FullDuplexCall(context.Background())
+	if err != nil {
+		t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
+	}
+	if _, err := stream.Recv(); grpc.Code(err) != codes.Unimplemented {
+		t.Fatalf("stream.Recv() = _, %v, want _, error code %d", err, codes.Unimplemented)
+	}
+}
+
 func TestPingPong(t *testing.T) {
 	for _, e := range listTestEnv() {
 		testPingPong(t, e)
@@ -846,7 +870,7 @@
 }
 
 func testPingPong(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -904,7 +928,7 @@
 }
 
 func testMetadataStreamingRPC(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -971,7 +995,7 @@
 }
 
 func testServerStreaming(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -1024,7 +1048,7 @@
 }
 
 func testFailedServerStreaming(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -1055,7 +1079,7 @@
 }
 
 func testClientStreaming(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -1096,7 +1120,7 @@
 
 func testExceedMaxStreamsLimit(t *testing.T, e env) {
 	// Only allows 1 live stream per server transport.
-	s, addr := serverSetUp(t, nil, 1, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, 1, nil, nil, e)
 	cc := clientSetUp(t, addr, nil, nil, "", e)
 	tc := testpb.NewTestServiceClient(cc)
 	defer tearDown(s, cc)
@@ -1126,7 +1150,7 @@
 }
 
 func testCompressServerHasNoSupport(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, nil, nil, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
 	cc := clientSetUp(t, addr, grpc.NewGZIPCompressor, nil, "", e)
 	// Unary call
 	tc := testpb.NewTestServiceClient(cc)
@@ -1179,7 +1203,7 @@
 }
 
 func testCompressOK(t *testing.T, e env) {
-	s, addr := serverSetUp(t, nil, math.MaxUint32, grpc.NewGZIPCompressor, grpc.NewGZIPDecompressor, e)
+	s, addr := serverSetUp(t, true, nil, math.MaxUint32, grpc.NewGZIPCompressor, grpc.NewGZIPDecompressor, e)
 	cc := clientSetUp(t, addr, grpc.NewGZIPCompressor, grpc.NewGZIPDecompressor, "", e)
 	// Unary call
 	tc := testpb.NewTestServiceClient(cc)
diff --git a/transport/http2_client.go b/transport/http2_client.go
index 7006cd8..7cf700f 100644
--- a/transport/http2_client.go
+++ b/transport/http2_client.go
@@ -353,6 +353,10 @@
 		} else {
 			endHeaders = true
 		}
+		var flush bool
+		if endHeaders && (hasMD || callHdr.Flush) {
+			flush = true
+		}
 		if first {
 			// Sends a HeadersFrame to server to start a new stream.
 			p := http2.HeadersFrameParam{
@@ -364,11 +368,11 @@
 			// Do a force flush for the buffered frames iff it is the last headers frame
 			// and there is header metadata to be sent. Otherwise, there is flushing until
 			// the corresponding data frame is written.
-			err = t.framer.writeHeaders(hasMD && endHeaders, p)
+			err = t.framer.writeHeaders(flush, p)
 			first = false
 		} else {
 			// Sends Continuation frames for the leftover headers.
-			err = t.framer.writeContinuation(hasMD && endHeaders, s.id, endHeaders, t.hBuf.Next(size))
+			err = t.framer.writeContinuation(flush, s.id, endHeaders, t.hBuf.Next(size))
 		}
 		if err != nil {
 			t.notifyError(err)
diff --git a/transport/transport.go b/transport/transport.go
index 7956479..6c3b943 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -369,6 +369,10 @@
 	RecvCompress string
 	// SendCompress specifies the compression algorithm applied on outbound message.
 	SendCompress string
+	// Flush indicates if new stream command should be sent to the peer without
+	// waiting for the first data. This is a hint though. The transport may modify
+	// the flush decision for performance purpose.
+	Flush bool
 }
 
 // ClientTransport is the common interface for all gRPC client side transport