fix travis
diff --git a/.travis.yml b/.travis.yml
index b0ce1f7..3f83776 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -6,7 +6,6 @@
- go get golang.org/x/tools/cmd/cover
install:
- - export GOPATH="$HOME/gopath"
- mkdir -p "$GOPATH/src/google.golang.org"
- mv "$TRAVIS_BUILD_DIR" "$GOPATH/src/google.golang.org/grpc"
diff --git a/README.md b/README.md
index 60d5c5d..94dc739 100644
--- a/README.md
+++ b/README.md
@@ -18,6 +18,10 @@
This requires Go 1.4 or above.
+Constraints
+-----------
+The grpc package should only depend on standard Go packages and a short list of exceptions. A new addition to the list requires a discussion with gRPC-Go authors and consultants.
+
Documentation
-------------
You can find more detailed documentation and examples in the [examples directory](examples/).
diff --git a/benchmark/grpc_testing/test.pb.go b/benchmark/grpc_testing/test.pb.go
index 619c450..74e13c9 100644
--- a/benchmark/grpc_testing/test.pb.go
+++ b/benchmark/grpc_testing/test.pb.go
@@ -419,9 +419,9 @@
s.RegisterService(&_TestService_serviceDesc, srv)
}
-func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(SimpleRequest)
- if err := codec.Unmarshal(buf, in); err != nil {
+ if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).UnaryCall(ctx, in)
diff --git a/call.go b/call.go
index 0115a28..8b68809 100644
--- a/call.go
+++ b/call.go
@@ -116,7 +116,6 @@
o.after(&c)
}
}()
-
if EnableTracing {
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
defer c.traceInfo.tr.Finish()
@@ -133,10 +132,6 @@
}
}()
}
- callHdr := &transport.CallHdr{
- Host: cc.authority,
- Method: method,
- }
topts := &transport.Options{
Last: true,
Delay: false,
@@ -149,13 +144,25 @@
err error
t transport.ClientTransport
stream *transport.Stream
+ conn *Conn
)
// TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs.
if lastErr != nil && c.failFast {
return toRPCErr(lastErr)
}
- t, err = cc.wait(ctx)
+ conn, err = cc.dopts.picker.Pick()
if err != nil {
+ return toRPCErr(err)
+ }
+ callHdr := &transport.CallHdr{
+ Host: conn.authority,
+ Method: method,
+ }
+ t, err = conn.Wait(ctx)
+ if err != nil {
+ if err == ErrTransientFailure {
+ continue
+ }
if lastErr != nil {
// This was a retry; return the error from the last attempt.
return toRPCErr(lastErr)
@@ -165,7 +172,7 @@
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
- stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts)
+ stream, err = sendRequest(ctx, conn.dopts.codec, callHdr, t, args, topts)
if err != nil {
if _, ok := err.(transport.ConnectionError); ok {
lastErr = err
@@ -177,7 +184,7 @@
return toRPCErr(err)
}
// Receive the response
- lastErr = recvResponse(cc.dopts.codec, t, &c, stream, reply)
+ lastErr = recvResponse(conn.dopts.codec, t, &c, stream, reply)
if _, ok := lastErr.(transport.ConnectionError); ok {
continue
}
diff --git a/clientconn.go b/clientconn.go
index 87f302f..ea3ccd0 100644
--- a/clientconn.go
+++ b/clientconn.go
@@ -65,6 +65,8 @@
// ErrClientConnTimeout indicates that the connection could not be
// established or re-established within the specified timeout.
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
+ // ErrTransientFailure indicates the connection failed due to a transient error.
+ ErrTransientFailure = errors.New("grpc: transient connection failure")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@@ -73,6 +75,7 @@
// values passed to Dial.
type dialOptions struct {
codec Codec
+ picker Picker
block bool
insecure bool
copts transport.ConnectOptions
@@ -142,90 +145,21 @@
// Dial creates a client connection the given target.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
- if target == "" {
- return nil, ErrUnspecTarget
- }
cc := &ClientConn{
- target: target,
- shutdownChan: make(chan struct{}),
- }
- if EnableTracing {
- cc.events = trace.NewEventLog("grpc.ClientConn", target)
+ target: target,
}
for _, opt := range opts {
opt(&cc.dopts)
}
- if !cc.dopts.insecure {
- var ok bool
- for _, c := range cc.dopts.copts.AuthOptions {
- if _, ok := c.(credentials.TransportAuthenticator); !ok {
- continue
- }
- ok = true
- }
- if !ok {
- return nil, ErrNoTransportSecurity
- }
- } else {
- for _, c := range cc.dopts.copts.AuthOptions {
- if c.RequireTransportSecurity() {
- return nil, ErrCredentialsMisuse
- }
- }
+ if cc.dopts.picker == nil {
+ cc.dopts.picker = &unicastPicker{}
}
- colonPos := strings.LastIndex(target, ":")
- if colonPos == -1 {
- colonPos = len(target)
- }
- cc.authority = target[:colonPos]
- if cc.dopts.codec == nil {
- // Set the default codec.
- cc.dopts.codec = protoCodec{}
- }
- cc.stateCV = sync.NewCond(&cc.mu)
- if cc.dopts.block {
- if err := cc.resetTransport(false); err != nil {
- cc.mu.Lock()
- cc.errorf("dial failed: %v", err)
- cc.mu.Unlock()
- cc.Close()
- return nil, err
- }
- // Start to monitor the error status of transport.
- go cc.transportMonitor()
- } else {
- // Start a goroutine connecting to the server asynchronously.
- go func() {
- if err := cc.resetTransport(false); err != nil {
- cc.mu.Lock()
- cc.errorf("dial failed: %v", err)
- cc.mu.Unlock()
- grpclog.Printf("Failed to dial %s: %v; please retry.", target, err)
- cc.Close()
- return
- }
- go cc.transportMonitor()
- }()
+ if err := cc.dopts.picker.Init(cc); err != nil {
+ return nil, err
}
return cc, nil
}
-// printf records an event in cc's event log, unless cc has been closed.
-// REQUIRES cc.mu is held.
-func (cc *ClientConn) printf(format string, a ...interface{}) {
- if cc.events != nil {
- cc.events.Printf(format, a...)
- }
-}
-
-// errorf records an error in cc's event log, unless cc has been closed.
-// REQUIRES cc.mu is held.
-func (cc *ClientConn) errorf(format string, a ...interface{}) {
- if cc.events != nil {
- cc.events.Errorf(format, a...)
- }
-}
-
// ConnectivityState indicates the state of a client connection.
type ConnectivityState int
@@ -261,6 +195,30 @@
// ClientConn represents a client connection to an RPC service.
type ClientConn struct {
+ target string
+ dopts dialOptions
+}
+
+// State returns the connectivity state of cc.
+// This is EXPERIMENTAL API.
+func (cc *ClientConn) State() ConnectivityState {
+ return cc.dopts.picker.State()
+}
+
+// WaitForStateChange blocks until the state changes to something other than the sourceState
+// or timeout fires on cc. It returns false if timeout fires, and true otherwise.
+// This is EXPERIMENTAL API.
+func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
+ return cc.dopts.picker.WaitForStateChange(timeout, sourceState)
+}
+
+// Close starts to tear down the ClientConn.
+func (cc *ClientConn) Close() error {
+ return cc.dopts.picker.Close()
+}
+
+// Conn is a client connection to a single destination.
+type Conn struct {
target string
authority string
dopts dialOptions
@@ -276,8 +234,86 @@
transport transport.ClientTransport
}
-// State returns the connectivity state of the ClientConn
-func (cc *ClientConn) State() ConnectivityState {
+// NewConn creates a Conn.
+func NewConn(cc *ClientConn) (*Conn, error) {
+ if cc.target == "" {
+ return nil, ErrUnspecTarget
+ }
+ c := &Conn{
+ target: cc.target,
+ dopts: cc.dopts,
+ shutdownChan: make(chan struct{}),
+ }
+ if EnableTracing {
+ c.events = trace.NewEventLog("grpc.ClientConn", c.target)
+ }
+ if !c.dopts.insecure {
+ var ok bool
+ for _, cd := range c.dopts.copts.AuthOptions {
+ if _, ok := cd.(credentials.TransportAuthenticator); !ok {
+ continue
+ }
+ ok = true
+ }
+ if !ok {
+ return nil, ErrNoTransportSecurity
+ }
+ } else {
+ for _, cd := range c.dopts.copts.AuthOptions {
+ if cd.RequireTransportSecurity() {
+ return nil, ErrCredentialsMisuse
+ }
+ }
+ }
+ colonPos := strings.LastIndex(c.target, ":")
+ if colonPos == -1 {
+ colonPos = len(c.target)
+ }
+ c.authority = c.target[:colonPos]
+ if c.dopts.codec == nil {
+ // Set the default codec.
+ c.dopts.codec = protoCodec{}
+ }
+ c.stateCV = sync.NewCond(&c.mu)
+ if c.dopts.block {
+ if err := c.resetTransport(false); err != nil {
+ c.Close()
+ return nil, err
+ }
+ // Start to monitor the error status of transport.
+ go c.transportMonitor()
+ } else {
+ // Start a goroutine connecting to the server asynchronously.
+ go func() {
+ if err := c.resetTransport(false); err != nil {
+ grpclog.Printf("Failed to dial %s: %v; please retry.", c.target, err)
+ c.Close()
+ return
+ }
+ go c.transportMonitor()
+ }()
+ }
+ return c, nil
+}
+
+// printf records an event in cc's event log, unless cc has been closed.
+// REQUIRES cc.mu is held.
+func (cc *Conn) printf(format string, a ...interface{}) {
+ if cc.events != nil {
+ cc.events.Printf(format, a...)
+ }
+}
+
+// errorf records an error in cc's event log, unless cc has been closed.
+// REQUIRES cc.mu is held.
+func (cc *Conn) errorf(format string, a ...interface{}) {
+ if cc.events != nil {
+ cc.events.Errorf(format, a...)
+ }
+}
+
+// State returns the connectivity state of the Conn
+func (cc *Conn) State() ConnectivityState {
cc.mu.Lock()
defer cc.mu.Unlock()
return cc.state
@@ -285,7 +321,8 @@
// WaitForStateChange blocks until the state changes to something other than the sourceState
// or timeout fires. It returns false if timeout fires and true otherwise.
-func (cc *ClientConn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
+// TODO(zhaoq): Rewrite for complex Picker.
+func (cc *Conn) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
start := time.Now()
cc.mu.Lock()
defer cc.mu.Unlock()
@@ -317,7 +354,7 @@
return true
}
-func (cc *ClientConn) resetTransport(closeTransport bool) error {
+func (cc *Conn) resetTransport(closeTransport bool) error {
var retries int
start := time.Now()
for {
@@ -361,6 +398,10 @@
cc.errorf("transient failure: %v", err)
cc.state = TransientFailure
cc.stateCV.Broadcast()
+ if cc.ready != nil {
+ close(cc.ready)
+ cc.ready = nil
+ }
cc.mu.Unlock()
sleepTime -= time.Since(connectTime)
if sleepTime < 0 {
@@ -402,7 +443,7 @@
// Run in a goroutine to track the error in transport and create the
// new transport if an error happens. It returns when the channel is closing.
-func (cc *ClientConn) transportMonitor() {
+func (cc *Conn) transportMonitor() {
for {
select {
// shutdownChan is needed to detect the teardown when
@@ -427,9 +468,8 @@
}
}
-// When wait returns, either the new transport is up or ClientConn is
-// closing.
-func (cc *ClientConn) wait(ctx context.Context) (transport.ClientTransport, error) {
+// Wait blocks until i) the new transport is up or ii) ctx is done or iii)
+func (cc *Conn) Wait(ctx context.Context) (transport.ClientTransport, error) {
for {
cc.mu.Lock()
switch {
@@ -439,6 +479,11 @@
case cc.state == Ready:
cc.mu.Unlock()
return cc.transport, nil
+ case cc.state == TransientFailure:
+ cc.mu.Unlock()
+ // Break out so that the caller gets chance to pick another transport to
+ // perform rpc instead of sticking to this transport.
+ return nil, ErrTransientFailure
default:
ready := cc.ready
if ready == nil {
@@ -456,12 +501,12 @@
}
}
-// Close starts to tear down the ClientConn. Returns ErrClientConnClosing if
+// Close starts to tear down the Conn. Returns ErrClientConnClosing if
// it has been closed (mostly due to dial time-out).
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many ClientConn's in a
// tight loop.
-func (cc *ClientConn) Close() error {
+func (cc *Conn) Close() error {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.state == Shutdown {
diff --git a/examples/helloworld/helloworld/helloworld.pb.go b/examples/helloworld/helloworld/helloworld.pb.go
index 1ff931a..366b23b 100644
--- a/examples/helloworld/helloworld/helloworld.pb.go
+++ b/examples/helloworld/helloworld/helloworld.pb.go
@@ -84,9 +84,9 @@
s.RegisterService(&_Greeter_serviceDesc, srv)
}
-func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _Greeter_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(HelloRequest)
- if err := codec.Unmarshal(buf, in); err != nil {
+ if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(GreeterServer).SayHello(ctx, in)
diff --git a/examples/route_guide/routeguide/route_guide.pb.go b/examples/route_guide/routeguide/route_guide.pb.go
index fcf5c74..9ac9029 100644
--- a/examples/route_guide/routeguide/route_guide.pb.go
+++ b/examples/route_guide/routeguide/route_guide.pb.go
@@ -310,9 +310,9 @@
s.RegisterService(&_RouteGuide_serviceDesc, srv)
}
-func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _RouteGuide_GetFeature_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(Point)
- if err := codec.Unmarshal(buf, in); err != nil {
+ if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(RouteGuideServer).GetFeature(ctx, in)
diff --git a/health/grpc_health_v1alpha/health.pb.go b/health/grpc_health_v1alpha/health.pb.go
index c333a97..96eba6f 100644
--- a/health/grpc_health_v1alpha/health.pb.go
+++ b/health/grpc_health_v1alpha/health.pb.go
@@ -108,9 +108,9 @@
s.RegisterService(&_Health_serviceDesc, srv)
}
-func _Health_Check_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _Health_Check_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(HealthCheckRequest)
- if err := codec.Unmarshal(buf, in); err != nil {
+ if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(HealthServer).Check(ctx, in)
diff --git a/interop/grpc_testing/test.pb.go b/interop/grpc_testing/test.pb.go
index b25e98b..bd492fe 100755
--- a/interop/grpc_testing/test.pb.go
+++ b/interop/grpc_testing/test.pb.go
@@ -539,9 +539,9 @@
s.RegisterService(&_TestService_serviceDesc, srv)
}
-func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(Empty)
- if err := codec.Unmarshal(buf, in); err != nil {
+ if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).EmptyCall(ctx, in)
@@ -551,9 +551,9 @@
return out, nil
}
-func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(SimpleRequest)
- if err := codec.Unmarshal(buf, in); err != nil {
+ if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).UnaryCall(ctx, in)
diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go
index e140068..362649d 100644
--- a/naming/etcd/etcd.go
+++ b/naming/etcd/etcd.go
@@ -1,3 +1,36 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
package etcd
import (
@@ -5,41 +38,73 @@
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
)
+// update defines an etcd key-value update.
+type update struct {
+ key, val string
+}
+
+// getNode reports the set of changes starting from node recursively.
+func getNode(node *etcdcl.Node) (updates []*update) {
+ for _, v := range node.Nodes {
+ updates = append(updates, getNode(v)...)
+ }
+ if !node.Dir {
+ u := &update{
+ key: node.Key,
+ val: node.Value,
+ }
+ updates = []*update{u}
+ }
+ return
+}
type watcher struct {
wr etcdcl.Watcher
ctx context.Context
cancel context.CancelFunc
+ kv map[string]string
}
-func (w *watcher) Next() (*naming.Update, error) {
+func (w *watcher) Next() (nu []*naming.Update, _ error) {
for {
resp, err := w.wr.Next(w.ctx)
if err != nil {
return nil, err
}
- if resp.Node.Dir {
- continue
- }
- var act naming.OP
- if resp.Action == "set" {
- if resp.PrevNode == nil {
- act = naming.Add
- } else {
- act = naming.Modify
+ updates := getNode(resp.Node)
+ for _, u := range updates {
+ switch resp.Action {
+ case "set":
+ if resp.PrevNode == nil {
+ w.kv[u.key] = u.val
+ nu = append(nu, &naming.Update{
+ Op: naming.Add,
+ Addr: u.val,
+ })
+ } else {
+ nu = append(nu, &naming.Update{
+ Op: naming.Delete,
+ Addr: w.kv[u.key],
+ })
+ nu = append(nu, &naming.Update{
+ Op: naming.Add,
+ Addr: u.val,
+ })
+ w.kv[u.key] = u.val
+ }
+ case "delete":
+ nu = append(nu, &naming.Update{
+ Op: naming.Delete,
+ Addr: w.kv[u.key],
+ })
+ delete(w.kv, u.key)
}
- } else if resp.Action == "delete" {
- act = naming.Delete
}
- if act == naming.No {
- continue
+ if len(nu) > 0 {
+ break
}
- return &naming.Update{
- Op: act,
- Key: resp.Node.Key,
- Val: resp.Node.Value,
- }, nil
}
+ return nu, nil
}
func (w *watcher) Stop() {
@@ -48,41 +113,36 @@
type resolver struct {
kapi etcdcl.KeysAPI
+ kv map[string]string
}
func (r *resolver) NewWatcher(target string) naming.Watcher {
ctx, cancel := context.WithCancel(context.Background())
- return &watcher{
+ w := &watcher{
wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}),
ctx: ctx,
cancel: cancel,
}
-
+ for k, v := range r.kv {
+ w.kv[k] = v
+ }
+ return w
}
-// getNode reports the naming.Update starting from node recursively.
-func getNode(node *etcdcl.Node) (updates []*naming.Update) {
- for _, v := range node.Nodes {
- updates = append(updates, getNode(v)...)
- }
- if !node.Dir {
- entry := &naming.Update{
- Op: naming.Add,
- Key: node.Key,
- Val: node.Value,
- }
- updates = []*naming.Update{entry}
- }
- return
-}
-
-func (r *resolver) Resolve(target string) ([]*naming.Update, error) {
+func (r *resolver) Resolve(target string) (nu []*naming.Update, _ error) {
resp, err := r.kapi.Get(context.Background(), target, &etcdcl.GetOptions{Recursive: true})
if err != nil {
return nil, err
}
updates := getNode(resp.Node)
- return updates, nil
+ for _, u := range updates {
+ r.kv[u.key] = u.val
+ nu = append(nu, &naming.Update{
+ Op: naming.Add,
+ Addr: u.val,
+ })
+ }
+ return nu, nil
}
// NewResolver creates an etcd-based naming.Resolver.
@@ -93,5 +153,6 @@
}
return &resolver{
kapi: etcdcl.NewKeysAPI(c),
+ kv: make(map[string]string),
}, nil
}
diff --git a/naming/naming.go b/naming/naming.go
index a1fd335..610eb81 100644
--- a/naming/naming.go
+++ b/naming/naming.go
@@ -1,17 +1,48 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+// Package naming defines the naming API and related data structures for gRPC.
+// The interface is EXPERIMENTAL and may be suject to change.
package naming
// OP defines the corresponding operations for a name resolution change.
type OP uint8
const (
- // No indicates there are no changes.
- No OP = iota
// Add indicates a new address is added.
- Add
+ Add = iota
// Delete indicates an exisiting address is deleted.
Delete
- // Modify indicates an existing address is modified.
- Modify
)
type ServiceConfig interface{}
@@ -20,8 +51,7 @@
type Update struct {
// Op indicates the operation of the update.
Op OP
- Key string
- Val string
+ Addr string
Config ServiceConfig
}
@@ -36,8 +66,9 @@
// Watcher watches the updates for a particular target.
type Watcher interface {
- // Next blocks until an update or error happens.
- Next() (*Update, error)
+ // Next blocks until an update or error happens. It may return one or more
+ // updates.
+ Next() ([]*Update, error)
// Stop stops the Watcher.
Stop()
}
diff --git a/picker.go b/picker.go
new file mode 100644
index 0000000..79f9886
--- /dev/null
+++ b/picker.go
@@ -0,0 +1,90 @@
+/*
+ *
+ * Copyright 2014, Google Inc.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+package grpc
+
+import (
+ "time"
+)
+
+// Picker picks a Conn for RPC requests.
+// This is EXPERIMENTAL and Please do not implement your own Picker for now.
+type Picker interface {
+ // Init does initial processing for the Picker, e.g., initiate some connections.
+ Init(cc *ClientConn) error
+ // Pick returns the Conn to use for the upcoming RPC. It may return different
+ // Conn's up to the implementation.
+ Pick() (*Conn, error)
+ // State returns the connectivity state of the underlying connections.
+ State() ConnectivityState
+ // WaitForStateChange blocks until the state changes to something other than
+ // the sourceState or timeout fires on cc. It returns false if timeout fires,
+ // and true otherwise.
+ WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool
+ // Close closes all the Conn's owned by this Picker.
+ Close() error
+}
+
+// unicastPicker is the default Picker which is used when there is no custom Picker
+// specified by users. It always picks the same Conn.
+type unicastPicker struct {
+ conn *Conn
+}
+
+func (p *unicastPicker) Init(cc *ClientConn) error {
+ c, err := NewConn(cc)
+ if err != nil {
+ return err
+ }
+ p.conn = c
+ return nil
+}
+
+func (p *unicastPicker) Pick() (*Conn, error) {
+ return p.conn, nil
+}
+
+func (p *unicastPicker) State() ConnectivityState {
+ return p.conn.State()
+}
+
+func (p *unicastPicker) WaitForStateChange(timeout time.Duration, sourceState ConnectivityState) bool {
+ return p.conn.WaitForStateChange(timeout, sourceState)
+}
+
+func (p *unicastPicker) Close() error {
+ if p.conn != nil {
+ return p.conn.Close()
+ }
+ return nil
+}
diff --git a/server.go b/server.go
index 274f732..ee44d1e 100644
--- a/server.go
+++ b/server.go
@@ -42,6 +42,7 @@
"runtime"
"strings"
"sync"
+ "time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
@@ -52,7 +53,7 @@
"google.golang.org/grpc/transport"
)
-type methodHandler func(srv interface{}, ctx context.Context, codec Codec, buf []byte) (interface{}, error)
+type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error)
// MethodDesc represents an RPC service's method specification.
type MethodDesc struct {
@@ -284,12 +285,19 @@
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) {
+ ctx, cancel := context.WithCancel(stream.Context())
+ defer cancel()
var traceInfo traceInfo
if EnableTracing {
traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
defer traceInfo.tr.Finish()
traceInfo.firstLine.client = false
+ traceInfo.firstLine.remoteAddr = t.RemoteAddr()
+ if dl, ok := ctx.Deadline(); ok {
+ traceInfo.firstLine.deadline = dl.Sub(time.Now())
+ }
traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
+ ctx = trace.NewContext(ctx, traceInfo.tr)
defer func() {
if err != nil && err != io.EOF {
traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@@ -317,14 +325,20 @@
}
return err
}
- if traceInfo.tr != nil {
- traceInfo.tr.LazyLog(&payload{sent: false, msg: req}, true)
- }
switch pf {
case compressionNone:
statusCode := codes.OK
statusDesc := ""
- reply, appErr := md.Handler(srv.server, stream.Context(), s.opts.codec, req)
+ df := func(v interface{}) error {
+ if err := s.opts.codec.Unmarshal(req, v); err != nil {
+ return err
+ }
+ if traceInfo.tr != nil {
+ traceInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
+ }
+ return nil
+ }
+ reply, appErr := md.Handler(srv.server, ctx, df)
if appErr != nil {
if err, ok := appErr.(rpcError); ok {
statusCode = err.code
@@ -333,12 +347,20 @@
statusCode = convertCode(appErr)
statusDesc = appErr.Error()
}
+ if traceInfo.tr != nil && statusCode != codes.OK {
+ traceInfo.tr.LazyLog(stringer(statusDesc), true)
+ traceInfo.tr.SetError()
+ }
+
if err := t.WriteStatus(stream, statusCode, statusDesc); err != nil {
grpclog.Printf("grpc: Server.processUnaryRPC failed to write status: %v", err)
return err
}
return nil
}
+ if traceInfo.tr != nil {
+ traceInfo.tr.LazyLog(stringer("OK"), false)
+ }
opts := &transport.Options{
Last: true,
Delay: false,
@@ -367,9 +389,12 @@
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) {
+ ctx, cancel := context.WithCancel(stream.Context())
+ defer cancel()
ss := &serverStream{
t: t,
s: stream,
+ ctx: ctx,
p: &parser{s: stream},
codec: s.opts.codec,
tracing: EnableTracing,
@@ -377,7 +402,12 @@
if ss.tracing {
ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
ss.traceInfo.firstLine.client = false
+ ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr()
+ if dl, ok := ctx.Deadline(); ok {
+ ss.traceInfo.firstLine.deadline = dl.Sub(time.Now())
+ }
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
+ ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr)
defer func() {
ss.mu.Lock()
if err != nil && err != io.EOF {
@@ -398,6 +428,16 @@
ss.statusDesc = appErr.Error()
}
}
+ if ss.tracing {
+ ss.mu.Lock()
+ if ss.statusCode != codes.OK {
+ ss.traceInfo.tr.LazyLog(stringer(ss.statusDesc), true)
+ ss.traceInfo.tr.SetError()
+ } else {
+ ss.traceInfo.tr.LazyLog(stringer("OK"), false)
+ }
+ ss.mu.Unlock()
+ }
return t.WriteStatus(ss.s, ss.statusCode, ss.statusDesc)
}
diff --git a/stream.go b/stream.go
index e14664c..34774f0 100644
--- a/stream.go
+++ b/stream.go
@@ -96,14 +96,33 @@
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
+ var (
+ conn *Conn
+ t transport.ClientTransport
+ err error
+ )
+ for {
+ conn, err = cc.dopts.picker.Pick()
+ if err != nil {
+ return nil, toRPCErr(err)
+ }
+ t, err = conn.Wait(ctx)
+ if err != nil {
+ if err == ErrTransientFailure {
+ continue
+ }
+ return nil, toRPCErr(err)
+ }
+ break
+ }
// TODO(zhaoq): CallOption is omitted. Add support when it is needed.
callHdr := &transport.CallHdr{
- Host: cc.authority,
+ Host: conn.authority,
Method: method,
}
cs := &clientStream{
desc: desc,
- codec: cc.dopts.codec,
+ codec: conn.dopts.codec,
tracing: EnableTracing,
}
if cs.tracing {
@@ -113,10 +132,7 @@
cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
}
cs.traceInfo.tr.LazyLog(&cs.traceInfo.firstLine, false)
- }
- t, err := cc.wait(ctx)
- if err != nil {
- return nil, toRPCErr(err)
+ ctx = trace.NewContext(ctx, cs.traceInfo.tr)
}
s, err := t.NewStream(ctx, callHdr)
if err != nil {
@@ -278,6 +294,7 @@
type serverStream struct {
t transport.ServerTransport
s *transport.Stream
+ ctx context.Context // provides trace.FromContext when tracing
p *parser
codec Codec
statusCode codes.Code
@@ -292,7 +309,7 @@
}
func (ss *serverStream) Context() context.Context {
- return ss.s.Context()
+ return ss.ctx
}
func (ss *serverStream) SendHeader(md metadata.MD) error {
@@ -317,7 +334,6 @@
ss.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
ss.traceInfo.tr.SetError()
}
-
ss.mu.Unlock()
}
}()
diff --git a/test/grpc_testing/test.pb.go b/test/grpc_testing/test.pb.go
index b25e98b..bd492fe 100644
--- a/test/grpc_testing/test.pb.go
+++ b/test/grpc_testing/test.pb.go
@@ -539,9 +539,9 @@
s.RegisterService(&_TestService_serviceDesc, srv)
}
-func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_EmptyCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(Empty)
- if err := codec.Unmarshal(buf, in); err != nil {
+ if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).EmptyCall(ctx, in)
@@ -551,9 +551,9 @@
return out, nil
}
-func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, codec grpc.Codec, buf []byte) (interface{}, error) {
+func _TestService_UnaryCall_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(SimpleRequest)
- if err := codec.Unmarshal(buf, in); err != nil {
+ if err := dec(in); err != nil {
return nil, err
}
out, err := srv.(TestServiceServer).UnaryCall(ctx, in)
diff --git a/trace.go b/trace.go
index 2463574..cde04fb 100644
--- a/trace.go
+++ b/trace.go
@@ -114,3 +114,7 @@
func (f *fmtStringer) String() string {
return fmt.Sprintf(f.format, f.a...)
}
+
+type stringer string
+
+func (s stringer) String() string { return string(s) }
diff --git a/transport/http2_server.go b/transport/http2_server.go
index 057d936..c9a2a36 100644
--- a/transport/http2_server.go
+++ b/transport/http2_server.go
@@ -689,3 +689,7 @@
// other goroutines.
s.cancel()
}
+
+func (t *http2Server) RemoteAddr() net.Addr {
+ return t.conn.RemoteAddr()
+}
diff --git a/transport/transport.go b/transport/transport.go
index 2dd38a8..d33f2de 100644
--- a/transport/transport.go
+++ b/transport/transport.go
@@ -390,6 +390,8 @@
// should not be accessed any more. All the pending streams and their
// handlers will be terminated asynchronously.
Close() error
+ // RemoteAddr returns the remote network address.
+ RemoteAddr() net.Addr
}
// StreamErrorf creates an StreamError with the specified error code and description.