Merge pull request #393 from iamqizhao/master

Part1: revise Picker API
diff --git a/call.go b/call.go
index a91892e..9d815af 100644
--- a/call.go
+++ b/call.go
@@ -144,21 +144,16 @@
 			err    error
 			t      transport.ClientTransport
 			stream *transport.Stream
-			conn   *Conn
 		)
 		// TODO(zhaoq): Need a formal spec of retry strategy for non-failfast rpcs.
 		if lastErr != nil && c.failFast {
 			return toRPCErr(lastErr)
 		}
-		conn, err = cc.dopts.picker.Pick()
-		if err != nil {
-			return toRPCErr(err)
-		}
 		callHdr := &transport.CallHdr{
-			Host:   conn.authority,
+			Host:   cc.authority,
 			Method: method,
 		}
-		t, err = conn.Wait(ctx)
+		t, err = cc.dopts.picker.Pick(ctx)
 		if err != nil {
 			if lastErr != nil {
 				// This was a retry; return the error from the last attempt.
@@ -169,7 +164,7 @@
 		if c.traceInfo.tr != nil {
 			c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true)
 		}
-		stream, err = sendRequest(ctx, conn.dopts.codec, callHdr, t, args, topts)
+		stream, err = sendRequest(ctx, cc.dopts.codec, callHdr, t, args, topts)
 		if err != nil {
 			if _, ok := err.(transport.ConnectionError); ok {
 				lastErr = err
@@ -181,7 +176,7 @@
 			return toRPCErr(err)
 		}
 		// Receive the response
-		lastErr = recvResponse(conn.dopts.codec, t, &c, stream, reply)
+		lastErr = recvResponse(cc.dopts.codec, t, &c, stream, reply)
 		if _, ok := lastErr.(transport.ConnectionError); ok {
 			continue
 		}
diff --git a/clientconn.go b/clientconn.go
index a96fcb8..85a42d9 100644
--- a/clientconn.go
+++ b/clientconn.go
@@ -149,12 +149,21 @@
 	for _, opt := range opts {
 		opt(&cc.dopts)
 	}
+	if cc.dopts.codec == nil {
+		// Set the default codec.
+		cc.dopts.codec = protoCodec{}
+	}
 	if cc.dopts.picker == nil {
 		cc.dopts.picker = &unicastPicker{}
 	}
 	if err := cc.dopts.picker.Init(cc); err != nil {
 		return nil, err
 	}
+	colonPos := strings.LastIndex(target, ":")
+	if colonPos == -1 {
+		colonPos = len(target)
+	}
+	cc.authority = target[:colonPos]
 	return cc, nil
 }
 
@@ -193,8 +202,9 @@
 
 // ClientConn represents a client connection to an RPC service.
 type ClientConn struct {
-	target string
-	dopts  dialOptions
+	target    string
+	authority string
+	dopts     dialOptions
 }
 
 // State returns the connectivity state of cc.
@@ -218,7 +228,6 @@
 // Conn is a client connection to a single destination.
 type Conn struct {
 	target       string
-	authority    string
 	dopts        dialOptions
 	shutdownChan chan struct{}
 	events       trace.EventLog
@@ -263,15 +272,6 @@
 			}
 		}
 	}
-	colonPos := strings.LastIndex(c.target, ":")
-	if colonPos == -1 {
-		colonPos = len(c.target)
-	}
-	c.authority = c.target[:colonPos]
-	if c.dopts.codec == nil {
-		// Set the default codec.
-		c.dopts.codec = protoCodec{}
-	}
 	c.stateCV = sync.NewCond(&c.mu)
 	if c.dopts.block {
 		if err := c.resetTransport(false); err != nil {
diff --git a/picker.go b/picker.go
index 79f9886..d285504 100644
--- a/picker.go
+++ b/picker.go
@@ -35,6 +35,9 @@
 
 import (
 	"time"
+
+	"golang.org/x/net/context"
+	"google.golang.org/grpc/transport"
 )
 
 // Picker picks a Conn for RPC requests.
@@ -42,9 +45,9 @@
 type Picker interface {
 	// Init does initial processing for the Picker, e.g., initiate some connections.
 	Init(cc *ClientConn) error
-	// Pick returns the Conn to use for the upcoming RPC. It may return different
-	// Conn's up to the implementation.
-	Pick() (*Conn, error)
+	// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
+	// or some error happens.
+	Pick(ctx context.Context) (transport.ClientTransport, error)
 	// State returns the connectivity state of the underlying connections.
 	State() ConnectivityState
 	// WaitForStateChange blocks until the state changes to something other than
@@ -70,8 +73,8 @@
 	return nil
 }
 
-func (p *unicastPicker) Pick() (*Conn, error) {
-	return p.conn, nil
+func (p *unicastPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
+	return p.conn.Wait(ctx)
 }
 
 func (p *unicastPicker) State() ConnectivityState {
diff --git a/stream.go b/stream.go
index 55201c2..e72cd3d 100644
--- a/stream.go
+++ b/stream.go
@@ -97,29 +97,21 @@
 // by generated code.
 func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
 	var (
-		conn *Conn
-		t    transport.ClientTransport
-		err  error
+		t   transport.ClientTransport
+		err error
 	)
-	for {
-		conn, err = cc.dopts.picker.Pick()
-		if err != nil {
-			return nil, toRPCErr(err)
-		}
-		t, err = conn.Wait(ctx)
-		if err != nil {
-			return nil, toRPCErr(err)
-		}
-		break
+	t, err = cc.dopts.picker.Pick(ctx)
+	if err != nil {
+		return nil, toRPCErr(err)
 	}
 	// TODO(zhaoq): CallOption is omitted. Add support when it is needed.
 	callHdr := &transport.CallHdr{
-		Host:   conn.authority,
+		Host:   cc.authority,
 		Method: method,
 	}
 	cs := &clientStream{
 		desc:    desc,
-		codec:   conn.dopts.codec,
+		codec:   cc.dopts.codec,
 		tracing: EnableTracing,
 	}
 	if cs.tracing {