refactor naming package
diff --git a/naming/etcd/etcd.go b/naming/etcd/etcd.go
index 362649d..8a877ed 100644
--- a/naming/etcd/etcd.go
+++ b/naming/etcd/etcd.go
@@ -34,115 +34,49 @@
package etcd
import (
+ "sync"
+
etcdcl "github.com/coreos/etcd/client"
"golang.org/x/net/context"
"google.golang.org/grpc/naming"
)
-// update defines an etcd key-value update.
-type update struct {
- key, val string
-}
-// getNode reports the set of changes starting from node recursively.
-func getNode(node *etcdcl.Node) (updates []*update) {
- for _, v := range node.Nodes {
- updates = append(updates, getNode(v)...)
- }
+// getNode builds the key-value map starting from node recursively. It returns the
+// max etcdcl.Node.ModifiedIndex starting from that node.
+func getNode(node *etcdcl.Node, kv map[string]string) uint64 {
if !node.Dir {
- u := &update{
- key: node.Key,
- val: node.Value,
- }
- updates = []*update{u}
+ kv[node.Key] = node.Value
+ return node.ModifiedIndex
}
- return
-}
-
-type watcher struct {
- wr etcdcl.Watcher
- ctx context.Context
- cancel context.CancelFunc
- kv map[string]string
-}
-
-func (w *watcher) Next() (nu []*naming.Update, _ error) {
- for {
- resp, err := w.wr.Next(w.ctx)
- if err != nil {
- return nil, err
- }
- updates := getNode(resp.Node)
- for _, u := range updates {
- switch resp.Action {
- case "set":
- if resp.PrevNode == nil {
- w.kv[u.key] = u.val
- nu = append(nu, &naming.Update{
- Op: naming.Add,
- Addr: u.val,
- })
- } else {
- nu = append(nu, &naming.Update{
- Op: naming.Delete,
- Addr: w.kv[u.key],
- })
- nu = append(nu, &naming.Update{
- Op: naming.Add,
- Addr: u.val,
- })
- w.kv[u.key] = u.val
- }
- case "delete":
- nu = append(nu, &naming.Update{
- Op: naming.Delete,
- Addr: w.kv[u.key],
- })
- delete(w.kv, u.key)
- }
- }
- if len(nu) > 0 {
- break
+ var max uint64
+ for _, v := range node.Nodes {
+ i := getNode(v, kv)
+ if max < i {
+ max = i
}
}
- return nu, nil
-}
-
-func (w *watcher) Stop() {
- w.cancel()
+ return max
}
type resolver struct {
kapi etcdcl.KeysAPI
- kv map[string]string
}
-func (r *resolver) NewWatcher(target string) naming.Watcher {
- ctx, cancel := context.WithCancel(context.Background())
- w := &watcher{
- wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{Recursive: true}),
- ctx: ctx,
- cancel: cancel,
- }
- for k, v := range r.kv {
- w.kv[k] = v
- }
- return w
-}
-
-func (r *resolver) Resolve(target string) (nu []*naming.Update, _ error) {
+func (r *resolver) Resolve(target string) (naming.Watcher, error) {
resp, err := r.kapi.Get(context.Background(), target, &etcdcl.GetOptions{Recursive: true})
if err != nil {
return nil, err
}
- updates := getNode(resp.Node)
- for _, u := range updates {
- r.kv[u.key] = u.val
- nu = append(nu, &naming.Update{
- Op: naming.Add,
- Addr: u.val,
- })
- }
- return nu, nil
+ kv := make(map[string]string)
+ // Record the index in order to avoid missing updates between Get returning and
+ // watch starting.
+ index := getNode(resp.Node, kv)
+ return &watcher{
+ wr: r.kapi.Watcher(target, &etcdcl.WatcherOptions{
+ AfterIndex: index,
+ Recursive: true}),
+ kv: kv,
+ }, nil
}
// NewResolver creates an etcd-based naming.Resolver.
@@ -153,6 +87,68 @@
}
return &resolver{
kapi: etcdcl.NewKeysAPI(c),
- kv: make(map[string]string),
}, nil
}
+
+type watcher struct {
+ wr etcdcl.Watcher
+ kv map[string]string
+}
+
+var once sync.Once
+
+func (w *watcher) Next(ctx context.Context) (nu []*naming.Update, err error) {
+ once.Do(func() {
+ select {
+ case <-ctx.Done():
+ err = ctx.Err()
+ default:
+ for _, v := range w.kv {
+ nu = append(nu, &naming.Update{
+ Op: naming.Add,
+ Addr: v,
+ })
+ }
+ }
+ })
+ if len(nu) > 0 || err != nil {
+ // once.Do ran. Return directly.
+ return
+ }
+ for {
+ resp, err := w.wr.Next(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if resp.Node.Dir {
+ continue
+ }
+ switch resp.Action {
+ case "set":
+ if resp.PrevNode == nil {
+ nu = append(nu, &naming.Update{
+ Op: naming.Add,
+ Addr: resp.Node.Value,
+ })
+ w.kv[resp.Node.Key] = resp.Node.Value
+ } else {
+ nu = append(nu, &naming.Update{
+ Op: naming.Delete,
+ Addr: w.kv[resp.Node.Key],
+ })
+ nu = append(nu, &naming.Update{
+ Op: naming.Add,
+ Addr: resp.Node.Value,
+ })
+ w.kv[resp.Node.Key] = resp.Node.Value
+ }
+ case "delete":
+ nu = append(nu, &naming.Update{
+ Op: naming.Delete,
+ Addr: resp.Node.Value,
+ })
+ delete(w.kv, resp.Node.Key)
+ }
+ return nu, nil
+ }
+}
diff --git a/naming/naming.go b/naming/naming.go
index 610eb81..d0a4f46 100644
--- a/naming/naming.go
+++ b/naming/naming.go
@@ -35,40 +35,41 @@
// The interface is EXPERIMENTAL and may be suject to change.
package naming
-// OP defines the corresponding operations for a name resolution change.
-type OP uint8
+import (
+ "golang.org/x/net/context"
+)
+
+// Operation defines the corresponding operations for a name resolution change.
+type Operation uint8
const (
// Add indicates a new address is added.
- Add = iota
+ Add Operation = iota
// Delete indicates an exisiting address is deleted.
Delete
)
-type ServiceConfig interface{}
-
-// Update defines a name resolution change.
+// Update defines a name resolution update. Notice that it is not valid having both
+// empty string Addr and nil Metadata in an Update.
type Update struct {
// Op indicates the operation of the update.
- Op OP
- Addr string
- Config ServiceConfig
+ Op Operation
+ // Addr is the updated address. It is empty string if there is no address update.
+ Addr string
+ // Metadata is the updated metadata. It is nil if there is no metadata update.
+ // Metadata is not required for a custom naming implementation.
+ Metadata interface{}
}
-// Resolver does one-shot name resolution and creates a Watcher to
-// watch the future updates.
+// Resolver creates a Watcher for a target to track its resolution changes.
type Resolver interface {
- // Resolve returns the name resolution results.
- Resolve(target string) ([]*Update, error)
- // NewWatcher creates a Watcher to watch the changes on target.
- NewWatcher(target string) Watcher
+ // Resolve creates a Watcher for target.
+ Resolve(target string) (Watcher, error)
}
-// Watcher watches the updates for a particular target.
+// Watcher watches for the updates on the specified target.
type Watcher interface {
// Next blocks until an update or error happens. It may return one or more
- // updates.
- Next() ([]*Update, error)
- // Stop stops the Watcher.
- Stop()
+ // updates. The first call should get the full set of the results.
+ Next(ctx context.Context) ([]*Update, error)
}