diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index c12a6d6fb221..ab13bd0f140a 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -12,18 +12,22 @@ import ( "github.com/zeromicro/go-zero/core/lang" "github.com/zeromicro/go-zero/core/logx" + "github.com/zeromicro/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/threading" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" ) +const coolDownDeviation = 0.05 + var ( registry = Registry{ clusters: make(map[string]*cluster), } - connManager = syncx.NewResourceManager() - errClosed = errors.New("etcd monitor chan has been closed") + connManager = syncx.NewResourceManager() + coolDownUnstable = mathx.NewUnstable(coolDownDeviation) + errClosed = errors.New("etcd monitor chan has been closed") ) // A Registry is a registry that manages the etcd client connections. @@ -200,9 +204,6 @@ func (c *cluster) getCurrent(key watchKey) []KV { } func (c *cluster) handleChanges(key watchKey, kvs []KV) { - var add []KV - var remove []KV - c.lock.Lock() watcher, ok := c.watchers[key] if !ok { @@ -217,22 +218,7 @@ func (c *cluster) handleChanges(key watchKey, kvs []KV) { for _, kv := range kvs { newVals[kv.Key] = kv.Val } - for k, v := range vals { - if val, ok := newVals[k]; !ok || v != val { - remove = append(remove, KV{ - Key: k, - Val: v, - }) - } - } - for k, v := range newVals { - if val, ok := vals[k]; !ok || v != val { - add = append(add, KV{ - Key: k, - Val: v, - }) - } - } + add, remove := calculateChanges(vals, newVals) watcher.values = newVals c.lock.Unlock() @@ -304,7 +290,7 @@ func (c *cluster) load(cli EtcdClient, key watchKey) int64 { } logx.Errorf("%s, key: %s, exactMatch: %t", err.Error(), key.key, key.exactMatch) - time.Sleep(coolDownInterval) + time.Sleep(coolDownUnstable.AroundDuration(coolDownInterval)) } var kvs []KV @@ -360,17 +346,22 @@ func (c *cluster) newClient() (EtcdClient, error) { func (c *cluster) reload(cli EtcdClient) { c.lock.Lock() + // cancel the previous watches close(c.done) c.watchGroup.Wait() - c.done = make(chan lang.PlaceholderType) - c.watchGroup = threading.NewRoutineGroup() var keys []watchKey for wk, wval := range c.watchers { keys = append(keys, wk) - wval.cancel() + if wval.cancel != nil { + wval.cancel() + } } + + c.done = make(chan lang.PlaceholderType) + c.watchGroup = threading.NewRoutineGroup() c.lock.Unlock() + // start new watches for _, key := range keys { k := key c.watchGroup.Run(func() { @@ -398,11 +389,37 @@ func (c *cluster) watch(cli EtcdClient, key watchKey, rev int64) { } func (c *cluster) watchStream(cli EtcdClient, key watchKey, rev int64) error { + ctx, rch := c.setupWatch(cli, key, rev) + + for { + select { + case wresp, ok := <-rch: + if !ok { + return errClosed + } + if wresp.Canceled { + return fmt.Errorf("etcd monitor chan has been canceled, error: %w", wresp.Err()) + } + if wresp.Err() != nil { + return fmt.Errorf("etcd monitor chan error: %w", wresp.Err()) + } + + c.handleWatchEvents(key, wresp.Events) + case <-ctx.Done(): + return nil + case <-c.done: + return nil + } + } +} + +func (c *cluster) setupWatch(cli EtcdClient, key watchKey, rev int64) (context.Context, clientv3.WatchChan) { var ( rch clientv3.WatchChan ops []clientv3.OpOption wkey = key.key ) + if !key.exactMatch { wkey = makeKeyPrefix(key.key) ops = append(ops, clientv3.WithPrefix()) @@ -425,26 +442,7 @@ func (c *cluster) watchStream(cli EtcdClient, key watchKey, rev int64) error { rch = cli.Watch(clientv3.WithRequireLeader(ctx), wkey, ops...) - for { - select { - case wresp, ok := <-rch: - if !ok { - return errClosed - } - if wresp.Canceled { - return fmt.Errorf("etcd monitor chan has been canceled, error: %w", wresp.Err()) - } - if wresp.Err() != nil { - return fmt.Errorf("etcd monitor chan error: %w", wresp.Err()) - } - - c.handleWatchEvents(key, wresp.Events) - case <-ctx.Done(): - return nil - case <-c.done: - return nil - } - } + return ctx, rch } func (c *cluster) watchConnState(cli EtcdClient) { @@ -475,6 +473,28 @@ func DialClient(endpoints []string) (EtcdClient, error) { return clientv3.New(cfg) } +func calculateChanges(oldVals, newVals map[string]string) (add, remove []KV) { + for k, v := range newVals { + if val, ok := oldVals[k]; !ok || v != val { + add = append(add, KV{ + Key: k, + Val: v, + }) + } + } + + for k, v := range oldVals { + if val, ok := newVals[k]; !ok || v != val { + remove = append(remove, KV{ + Key: k, + Val: v, + }) + } + } + + return add, remove +} + func getClusterKey(endpoints []string) string { sort.Strings(endpoints) return strings.Join(endpoints, endpointsSeparator)