Merge pull request #382 from iamqizhao/master
Refactor server side tracing
diff --git a/call.go b/call.go
index 8b68809..b5f9292 100644
--- a/call.go
+++ b/call.go
@@ -117,7 +117,7 @@
}
}()
if EnableTracing {
- c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
+ c.traceInfo.tr = trace.New("grpc.Sent."+transport.MethodFamily(method), method)
defer c.traceInfo.tr.Finish()
c.traceInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
diff --git a/server.go b/server.go
index ee44d1e..a5c8dc3 100644
--- a/server.go
+++ b/server.go
@@ -247,7 +247,7 @@
c.Close()
return nil
}
- st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
+ st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo, EnableTracing)
if err != nil {
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock()
@@ -285,19 +285,16 @@
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) {
- ctx, cancel := context.WithCancel(stream.Context())
- defer cancel()
var traceInfo traceInfo
if EnableTracing {
- traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
+ traceInfo.tr = stream.Trace()
defer traceInfo.tr.Finish()
traceInfo.firstLine.client = false
traceInfo.firstLine.remoteAddr = t.RemoteAddr()
- if dl, ok := ctx.Deadline(); ok {
+ if dl, ok := stream.Context().Deadline(); ok {
traceInfo.firstLine.deadline = dl.Sub(time.Now())
}
traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
- ctx = trace.NewContext(ctx, traceInfo.tr)
defer func() {
if err != nil && err != io.EOF {
traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@@ -338,7 +335,7 @@
}
return nil
}
- reply, appErr := md.Handler(srv.server, ctx, df)
+ reply, appErr := md.Handler(srv.server, stream.Context(), df)
if appErr != nil {
if err, ok := appErr.(rpcError); ok {
statusCode = err.code
@@ -389,25 +386,21 @@
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) {
- ctx, cancel := context.WithCancel(stream.Context())
- defer cancel()
ss := &serverStream{
t: t,
s: stream,
- ctx: ctx,
p: &parser{s: stream},
codec: s.opts.codec,
tracing: EnableTracing,
}
if ss.tracing {
- ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
+ ss.traceInfo.tr = stream.Trace()
ss.traceInfo.firstLine.client = false
ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr()
- if dl, ok := ctx.Deadline(); ok {
+ if dl, ok := stream.Context().Deadline(); ok {
ss.traceInfo.firstLine.deadline = dl.Sub(time.Now())
}
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
- ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr)
defer func() {
ss.mu.Lock()
if err != nil && err != io.EOF {
diff --git a/stream.go b/stream.go
index 34774f0..28938a3 100644
--- a/stream.go
+++ b/stream.go
@@ -126,7 +126,7 @@
tracing: EnableTracing,
}
if cs.tracing {
- cs.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
+ cs.traceInfo.tr = trace.New("grpc.Sent."+transport.MethodFamily(method), method)
cs.traceInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
@@ -294,7 +294,6 @@
type serverStream struct {
t transport.ServerTransport
s *transport.Stream
- ctx context.Context // provides trace.FromContext when tracing
p *parser
codec Codec
statusCode codes.Code
@@ -309,7 +308,7 @@
}
func (ss *serverStream) Context() context.Context {
- return ss.ctx
+ return ss.s.Context()
}
func (ss *serverStream) SendHeader(md metadata.MD) error {
diff --git a/trace.go b/trace.go
index cde04fb..9b88444 100644
--- a/trace.go
+++ b/trace.go
@@ -38,7 +38,6 @@
"fmt"
"io"
"net"
- "strings"
"time"
"golang.org/x/net/trace"
@@ -48,19 +47,6 @@
// This should only be set before any RPCs are sent or received by this program.
var EnableTracing = true
-// methodFamily returns the trace family for the given method.
-// It turns "/pkg.Service/GetFoo" into "pkg.Service".
-func methodFamily(m string) string {
- m = strings.TrimPrefix(m, "/") // remove leading slash
- if i := strings.Index(m, "/"); i >= 0 {
- m = m[:i] // remove everything from second slash
- }
- if i := strings.LastIndex(m, "."); i >= 0 {
- m = m[i+1:] // cut down to last dotted component
- }
- return m
-}
-
// traceInfo contains tracing information for an RPC.
type traceInfo struct {
tr trace.Trace
diff --git a/transport/http2_server.go b/transport/http2_server.go
index c9a2a36..89371c2 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -45,6 +45,7 @@
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
+ "golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
@@ -80,6 +81,8 @@
fc *inFlow
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool
+ // tracing indicates whether tracing is on for this http2Server transport.
+ tracing bool
mu sync.Mutex // guard the following
state transportState
@@ -90,7 +93,7 @@
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
-func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
+func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo, tracing bool) (_ ServerTransport, err error) {
framer := newFramer(conn)
// Send initial settings as connection preface to client.
var settings []http2.Setting
@@ -124,6 +127,7 @@
controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize),
+ tracing: tracing,
state: reachable,
writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}),
@@ -202,7 +206,10 @@
recv: s.buf,
}
s.method = hDec.state.method
-
+ if t.tracing {
+ s.tr = trace.New("grpc.Recv."+MethodFamily(s.method), s.method)
+ s.ctx = trace.NewContext(s.ctx, s.tr)
+ }
wg.Add(1)
go func() {
handle(s)
diff --git a/transport/transport.go b/transport/transport.go
index d33f2de..93efeae 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -43,15 +43,30 @@
"fmt"
"io"
"net"
+ "strings"
"sync"
"time"
"golang.org/x/net/context"
+ "golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
)
+// MethodFamily returns the trace family for the given method.
+// It turns "/pkg.Service/GetFoo" into "pkg.Service".
+func MethodFamily(m string) string {
+ m = strings.TrimPrefix(m, "/") // remove leading slash
+ if i := strings.Index(m, "/"); i >= 0 {
+ m = m[:i] // remove everything from second slash
+ }
+ if i := strings.LastIndex(m, "."); i >= 0 {
+ m = m[i+1:] // cut down to last dotted component
+ }
+ return m
+}
+
// recvMsg represents the received msg from the transport. All transport
// protocol specific info has been removed.
type recvMsg struct {
@@ -198,6 +213,8 @@
// the status received from the server.
statusCode codes.Code
statusDesc string
+ // tracing information
+ tr trace.Trace
}
// Header acquires the key-value pairs of header metadata once it
@@ -232,6 +249,11 @@
return s.ctx
}
+// Trace returns the trace.Trace of the stream.
+func (s *Stream) Trace() trace.Trace {
+ return s.tr
+}
+
// Method returns the method for the stream.
func (s *Stream) Method() string {
return s.method
@@ -308,8 +330,8 @@
// NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails.
-func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) {
- return newHTTP2Server(conn, maxStreams, authInfo)
+func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo, tracing bool) (ServerTransport, error) {
+ return newHTTP2Server(conn, maxStreams, authInfo, tracing)
}
// ConnectOptions covers all relevant options for dialing a server.
diff --git a/transport/transport_test.go b/transport/transport_test.go
index 07ba005..70d345a 100644
--- a/transport/transport_test.go
+++ b/transport/transport_test.go
@@ -150,7 +150,7 @@
if err != nil {
return
}
- transport, err := NewServerTransport("http2", conn, maxStreams, nil)
+ transport, err := NewServerTransport("http2", conn, maxStreams, nil, false)
if err != nil {
return
}