Merge pull request #393 from iamqizhao/master
Part1: revise Picker API
diff --git a/call.go b/call.go
index a91892e..9d815af 100644
--- a/call.go
+++ b/call.go
@@ -144,21 +144,16 @@
err error
t transport.ClientTransport
stream *transport.Stream
- conn *Conn
)
// TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs.
if lastErr != nil && c.failFast {
return toRPCErr(lastErr)
}
- conn, err = cc.dopts.picker.Pick()
- if err != nil {
- return toRPCErr(err)
- }
callHdr := &transport.CallHdr{
- Host: conn.authority,
+ Host: cc.authority,
Method: method,
}
- t, err = conn.Wait(ctx)
+ t, err = cc.dopts.picker.Pick(ctx)
if err != nil {
if lastErr != nil {
// This was a retry; return the error from the last attempt.
@@ -169,7 +164,7 @@
if c.traceInfo.tr != nil {
c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
}
- stream, err = sendRequest(ctx, conn.dopts.codec, callHdr, t, args, topts)
+ stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts)
if err != nil {
if _, ok := err.(transport.ConnectionError); ok {
lastErr = err
@@ -181,7 +176,7 @@
return toRPCErr(err)
}
// Receive the response
- lastErr = recvResponse(conn.dopts.codec, t, &c, stream, reply)
+ lastErr = recvResponse(cc.dopts.codec, t, &c, stream, reply)
if _, ok := lastErr.(transport.ConnectionError); ok {
continue
}
diff --git a/clientconn.go b/clientconn.go
index a96fcb8..85a42d9 100644
--- a/clientconn.go
+++ b/clientconn.go
@@ -149,12 +149,21 @@
for _, opt := range opts {
opt(&cc.dopts)
}
+ if cc.dopts.codec == nil {
+ // Set the default codec.
+ cc.dopts.codec = protoCodec{}
+ }
if cc.dopts.picker == nil {
cc.dopts.picker = &unicastPicker{}
}
if err := cc.dopts.picker.Init(cc); err != nil {
return nil, err
}
+ colonPos := strings.LastIndex(target, ":")
+ if colonPos == -1 {
+ colonPos = len(target)
+ }
+ cc.authority = target[:colonPos]
return cc, nil
}
@@ -193,8 +202,9 @@
// ClientConn represents a client connection to an RPC service.
type ClientConn struct {
- target string
- dopts dialOptions
+ target string
+ authority string
+ dopts dialOptions
}
// State returns the connectivity state of cc.
@@ -218,7 +228,6 @@
// Conn is a client connection to a single destination.
type Conn struct {
target string
- authority string
dopts dialOptions
shutdownChan chan struct{}
events trace.EventLog
@@ -263,15 +272,6 @@
}
}
}
- colonPos := strings.LastIndex(c.target, ":")
- if colonPos == -1 {
- colonPos = len(c.target)
- }
- c.authority = c.target[:colonPos]
- if c.dopts.codec == nil {
- // Set the default codec.
- c.dopts.codec = protoCodec{}
- }
c.stateCV = sync.NewCond(&c.mu)
if c.dopts.block {
if err := c.resetTransport(false); err != nil {
diff --git a/picker.go b/picker.go
index 79f9886..d285504 100644
--- a/picker.go
+++ b/picker.go
@@ -35,6 +35,9 @@
import (
"time"
+
+ "golang.org/x/net/context"
+ "google.golang.org/grpc/transport"
)
// Picker picks a Conn for RPC requests.
@@ -42,9 +45,9 @@
type Picker interface {
// Init does initial processing for the Picker, e.g., initiate some connections.
Init(cc *ClientConn) error
- // Pick returns the Conn to use for the upcoming RPC. It may return different
- // Conn's up to the implementation.
- Pick() (*Conn, error)
+ // Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
+ // or some error happens.
+ Pick(ctx context.Context) (transport.ClientTransport, error)
// State returns the connectivity state of the underlying connections.
State() ConnectivityState
// WaitForStateChange blocks until the state changes to something other than
@@ -70,8 +73,8 @@
return nil
}
-func (p *unicastPicker) Pick() (*Conn, error) {
- return p.conn, nil
+func (p *unicastPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
+ return p.conn.Wait(ctx)
}
func (p *unicastPicker) State() ConnectivityState {
diff --git a/stream.go b/stream.go
index 55201c2..e72cd3d 100644
--- a/stream.go
+++ b/stream.go
@@ -97,29 +97,21 @@
// by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
var (
- conn *Conn
- t transport.ClientTransport
- err error
+ t transport.ClientTransport
+ err error
)
- for {
- conn, err = cc.dopts.picker.Pick()
- if err != nil {
- return nil, toRPCErr(err)
- }
- t, err = conn.Wait(ctx)
- if err != nil {
- return nil, toRPCErr(err)
- }
- break
+ t, err = cc.dopts.picker.Pick(ctx)
+ if err != nil {
+ return nil, toRPCErr(err)
}
// TODO(zhaoq): CallOption is omitted. Add support when it is needed.
callHdr := &transport.CallHdr{
- Host: conn.authority,
+ Host: cc.authority,
Method: method,
}
cs := &clientStream{
desc: desc,
- codec: conn.dopts.codec,
+ codec: cc.dopts.codec,
tracing: EnableTracing,
}
if cs.tracing {