Merge pull request #372 from mwitkow-io/bugfix/fix_stream_codec_errors

fix difference between unitary and stream codec error handling
diff --git a/.travis.yml b/.travis.yml
index 2503560..3f83776 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,4 +1,14 @@
 language: go
 
+before_install:
+  - go get github.com/axw/gocov/gocov
+  - go get github.com/mattn/goveralls
+  - go get golang.org/x/tools/cmd/cover
+
+install:
+  - mkdir -p "$GOPATH/src/google.golang.org"
+  - mv "$TRAVIS_BUILD_DIR" "$GOPATH/src/google.golang.org/grpc"
+
 script:
   - make test testrace
+  - make coverage
diff --git a/Makefile b/Makefile
index 0dc225f..5bc38be 100644
--- a/Makefile
+++ b/Makefile
@@ -45,3 +45,6 @@
 
 clean:
 	go clean google.golang.org/grpc/...
+
+coverage: testdeps
+	goveralls -v google.golang.org/grpc/...
diff --git a/README.md b/README.md
index 94dc739..37b05f0 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
 #gRPC-Go
 
-[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc) [![Coverage Status](https://coveralls.io/repos/grpc/grpc-go/badge.svg?branch=master&service=github)](https://coveralls.io/github/grpc/grpc-go?branch=master)
+[![Build Status](https://travis-ci.org/grpc/grpc-go.svg)](https://travis-ci.org/grpc/grpc-go) [![GoDoc](https://godoc.org/google.golang.org/grpc?status.svg)](https://godoc.org/google.golang.org/grpc)
 
 The Go implementation of [gRPC](http://www.grpc.io/): A high performance, open source, general RPC framework that puts mobile and HTTP/2 first. For more information see the [gRPC Quick Start](http://www.grpc.io/docs/) guide.
 
@@ -20,11 +20,11 @@
 
 Constraints
 -----------
-The grpc package should only depend on standard Go packages and a short list of exceptions. A new addition to the list requires a discussion with gRPC-Go authors and consultants.
+The grpc package should only depend on standard Go packages and a small number of exceptions. If your contribution introduces new dependencies which are NOT in the [list](http://godoc.org/google.golang.org/grpc?imports), you need a discussion with gRPC-Go authors and consultants.
 
 Documentation
 -------------
-You can find more detailed documentation and examples in the [examples directory](examples/).
+See [API documentation](https://godoc.org/google.golang.org/grpc) for package and API descriptions and find examples in the [examples directory](examples/).
 
 Status
 ------
diff --git a/benchmark/grpc_testing/test.pb.go b/benchmark/grpc_testing/test.pb.go
index 619c450..74e13c9 100644
--- a/benchmark/grpc_testing/test.pb.go
+++ b/benchmark/grpc_testing/test.pb.go
@@ -419,9 +419,9 @@
 	s.RegisterService(&_TestService_serviceDesc, srv)
 }
 
-func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
 	in := new(SimpleRequest)
-	if err := codec.Unmarshal(buf, in); err != nil {
+	if err := dec(in); err != nil {
 		return nil, err
 	}
 	out, err := srv.(TestServiceServer).UnaryCall(ctx, in)
diff --git a/examples/helloworld/helloworld/helloworld.pb.go b/examples/helloworld/helloworld/helloworld.pb.go
index 1ff931a..366b23b 100644
--- a/examples/helloworld/helloworld/helloworld.pb.go
+++ b/examples/helloworld/helloworld/helloworld.pb.go
@@ -84,9 +84,9 @@
 	s.RegisterService(&_Greeter_serviceDesc, srv)
 }
 
-func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
 	in := new(HelloRequest)
-	if err := codec.Unmarshal(buf, in); err != nil {
+	if err := dec(in); err != nil {
 		return nil, err
 	}
 	out, err := srv.(GreeterServer).SayHello(ctx, in)
diff --git a/examples/route_guide/routeguide/route_guide.pb.go b/examples/route_guide/routeguide/route_guide.pb.go
index fcf5c74..9ac9029 100644
--- a/examples/route_guide/routeguide/route_guide.pb.go
+++ b/examples/route_guide/routeguide/route_guide.pb.go
@@ -310,9 +310,9 @@
 	s.RegisterService(&_RouteGuide_serviceDesc, srv)
 }
 
-func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
 	in := new(Point)
-	if err := codec.Unmarshal(buf, in); err != nil {
+	if err := dec(in); err != nil {
 		return nil, err
 	}
 	out, err := srv.(RouteGuideServer).GetFeature(ctx, in)
diff --git a/health/grpc_health_v1alpha/health.pb.go b/health/grpc_health_v1alpha/health.pb.go
index c333a97..96eba6f 100644
--- a/health/grpc_health_v1alpha/health.pb.go
+++ b/health/grpc_health_v1alpha/health.pb.go
@@ -108,9 +108,9 @@
 	s.RegisterService(&_Health_serviceDesc, srv)
 }
 
-func _Health_Check_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
 	in := new(HealthCheckRequest)
-	if err := codec.Unmarshal(buf, in); err != nil {
+	if err := dec(in); err != nil {
 		return nil, err
 	}
 	out, err := srv.(HealthServer).Check(ctx, in)
diff --git a/interop/grpc_testing/test.pb.go b/interop/grpc_testing/test.pb.go
index b25e98b..bd492fe 100755
--- a/interop/grpc_testing/test.pb.go
+++ b/interop/grpc_testing/test.pb.go
@@ -539,9 +539,9 @@
 	s.RegisterService(&_TestService_serviceDesc, srv)
 }
 
-func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
 	in := new(Empty)
-	if err := codec.Unmarshal(buf, in); err != nil {
+	if err := dec(in); err != nil {
 		return nil, err
 	}
 	out, err := srv.(TestServiceServer).EmptyCall(ctx, in)
@@ -551,9 +551,9 @@
 	return out, nil
 }
 
-func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
 	in := new(SimpleRequest)
-	if err := codec.Unmarshal(buf, in); err != nil {
+	if err := dec(in); err != nil {
 		return nil, err
 	}
 	out, err := srv.(TestServiceServer).UnaryCall(ctx, in)
diff --git a/server.go b/server.go
index ba68a21..ee44d1e 100644
--- a/server.go
+++ b/server.go
@@ -42,6 +42,7 @@
 	"runtime"
 	"strings"
 	"sync"
+	"time"
 
 	"golang.org/x/net/context"
 	"golang.org/x/net/trace"
@@ -52,7 +53,7 @@
 	"google.golang.org/grpc/transport"
 )
 
-type methodHandler func(srv interface{}, ctx context.Context, codec Codec, buf []byte) (interface{}, error)
+type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error)
 
 // MethodDesc represents an RPC service's method specification.
 type MethodDesc struct {
@@ -291,6 +292,10 @@
 		traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
 		defer traceInfo.tr.Finish()
 		traceInfo.firstLine.client = false
+		traceInfo.firstLine.remoteAddr = t.RemoteAddr()
+		if dl, ok := ctx.Deadline(); ok {
+			traceInfo.firstLine.deadline = dl.Sub(time.Now())
+		}
 		traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
 		ctx = trace.NewContext(ctx, traceInfo.tr)
 		defer func() {
@@ -320,16 +325,20 @@
 			}
 			return err
 		}
-		if traceInfo.tr != nil {
-			// TODO: set payload.msg to something that
-			// prints usefully with %s; req is a []byte.
-			traceInfo.tr.LazyLog(&payload{sent: false}, true)
-		}
 		switch pf {
 		case compressionNone:
 			statusCode := codes.OK
 			statusDesc := ""
-			reply, appErr := md.Handler(srv.server, ctx, s.opts.codec, req)
+			df := func(v interface{}) error {
+				if err := s.opts.codec.Unmarshal(req, v); err != nil {
+					return err
+				}
+				if traceInfo.tr != nil {
+					traceInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
+				}
+				return nil
+			}
+			reply, appErr := md.Handler(srv.server, ctx, df)
 			if appErr != nil {
 				if err, ok := appErr.(rpcError); ok {
 					statusCode = err.code
@@ -393,6 +402,10 @@
 	if ss.tracing {
 		ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
 		ss.traceInfo.firstLine.client = false
+		ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr()
+		if dl, ok := ctx.Deadline(); ok {
+			ss.traceInfo.firstLine.deadline = dl.Sub(time.Now())
+		}
 		ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
 		ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr)
 		defer func() {
diff --git a/test/grpc_testing/test.pb.go b/test/grpc_testing/test.pb.go
index b25e98b..bd492fe 100644
--- a/test/grpc_testing/test.pb.go
+++ b/test/grpc_testing/test.pb.go
@@ -539,9 +539,9 @@
 	s.RegisterService(&_TestService_serviceDesc, srv)
 }
 
-func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
 	in := new(Empty)
-	if err := codec.Unmarshal(buf, in); err != nil {
+	if err := dec(in); err != nil {
 		return nil, err
 	}
 	out, err := srv.(TestServiceServer).EmptyCall(ctx, in)
@@ -551,9 +551,9 @@
 	return out, nil
 }
 
-func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
 	in := new(SimpleRequest)
-	if err := codec.Unmarshal(buf, in); err != nil {
+	if err := dec(in); err != nil {
 		return nil, err
 	}
 	out, err := srv.(TestServiceServer).UnaryCall(ctx, in)
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 057d936..c9a2a36 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -689,3 +689,7 @@
 	// other goroutines.
 	s.cancel()
 }
+
+func (t *http2Server) RemoteAddr() net.Addr {
+	return t.conn.RemoteAddr()
+}
diff --git a/transport/transport.go b/transport/transport.go
index 2dd38a8..d33f2de 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -390,6 +390,8 @@
 	// should not be accessed any more. All the pending streams and their
 	// handlers will be terminated asynchronously.
 	Close() error
+	// RemoteAddr returns the remote network address.
+	RemoteAddr() net.Addr
 }
 
 // StreamErrorf creates an StreamError with the specified error code and description.