Skip to content

Commit

Permalink
fixup! fixup! fixup! fixup! fixup! fix: etcd discovery mechanism on g…
Browse files Browse the repository at this point in the history
…rpc with idle manager
  • Loading branch information
kevwan committed Jan 21, 2025
1 parent c4d68ce commit 107a9f4
Showing 1 changed file with 65 additions and 45 deletions.
110 changes: 65 additions & 45 deletions core/discov/internal/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,22 @@ import (

"github.com/zeromicro/go-zero/core/lang"
"github.com/zeromicro/go-zero/core/logx"
"github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/threading"
"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
clientv3 "go.etcd.io/etcd/client/v3"
)

const coolDownDeviation = 0.05

var (
registry = Registry{
clusters: make(map[string]*cluster),
}
connManager = syncx.NewResourceManager()
errClosed = errors.New("etcd monitor chan has been closed")
connManager = syncx.NewResourceManager()
coolDownUnstable = mathx.NewUnstable(coolDownDeviation)
errClosed = errors.New("etcd monitor chan has been closed")
)

// A Registry is a registry that manages the etcd client connections.
Expand Down Expand Up @@ -200,9 +204,6 @@ func (c *cluster) getCurrent(key watchKey) []KV {
}

func (c *cluster) handleChanges(key watchKey, kvs []KV) {
var add []KV
var remove []KV

c.lock.Lock()
watcher, ok := c.watchers[key]
if !ok {
Expand All @@ -217,22 +218,7 @@ func (c *cluster) handleChanges(key watchKey, kvs []KV) {
for _, kv := range kvs {
newVals[kv.Key] = kv.Val
}
for k, v := range vals {
if val, ok := newVals[k]; !ok || v != val {
remove = append(remove, KV{
Key: k,
Val: v,
})
}
}
for k, v := range newVals {
if val, ok := vals[k]; !ok || v != val {
add = append(add, KV{
Key: k,
Val: v,
})
}
}
add, remove := calculateChanges(vals, newVals)
watcher.values = newVals
c.lock.Unlock()

Expand Down Expand Up @@ -304,7 +290,7 @@ func (c *cluster) load(cli EtcdClient, key watchKey) int64 {
}

logx.Errorf("%s, key: %s, exactMatch: %t", err.Error(), key.key, key.exactMatch)
time.Sleep(coolDownInterval)
time.Sleep(coolDownUnstable.AroundDuration(coolDownInterval))
}

var kvs []KV
Expand Down Expand Up @@ -360,17 +346,22 @@ func (c *cluster) newClient() (EtcdClient, error) {

func (c *cluster) reload(cli EtcdClient) {
c.lock.Lock()
// cancel the previous watches
close(c.done)
c.watchGroup.Wait()
c.done = make(chan lang.PlaceholderType)
c.watchGroup = threading.NewRoutineGroup()
var keys []watchKey
for wk, wval := range c.watchers {
keys = append(keys, wk)
wval.cancel()
if wval.cancel != nil {
wval.cancel()
}
}

c.done = make(chan lang.PlaceholderType)
c.watchGroup = threading.NewRoutineGroup()
c.lock.Unlock()

// start new watches
for _, key := range keys {
k := key
c.watchGroup.Run(func() {
Expand Down Expand Up @@ -398,11 +389,37 @@ func (c *cluster) watch(cli EtcdClient, key watchKey, rev int64) {
}

func (c *cluster) watchStream(cli EtcdClient, key watchKey, rev int64) error {
ctx, rch := c.setupWatch(cli, key, rev)

for {
select {
case wresp, ok := <-rch:
if !ok {
return errClosed
}
if wresp.Canceled {
return fmt.Errorf("etcd monitor chan has been canceled, error: %w", wresp.Err())
}
if wresp.Err() != nil {
return fmt.Errorf("etcd monitor chan error: %w", wresp.Err())
}

c.handleWatchEvents(key, wresp.Events)
case <-ctx.Done():
return nil
case <-c.done:
return nil
}
}
}

func (c *cluster) setupWatch(cli EtcdClient, key watchKey, rev int64) (context.Context, clientv3.WatchChan) {
var (
rch clientv3.WatchChan
ops []clientv3.OpOption
wkey = key.key
)

if !key.exactMatch {
wkey = makeKeyPrefix(key.key)
ops = append(ops, clientv3.WithPrefix())
Expand All @@ -425,26 +442,7 @@ func (c *cluster) watchStream(cli EtcdClient, key watchKey, rev int64) error {

rch = cli.Watch(clientv3.WithRequireLeader(ctx), wkey, ops...)

for {
select {
case wresp, ok := <-rch:
if !ok {
return errClosed
}
if wresp.Canceled {
return fmt.Errorf("etcd monitor chan has been canceled, error: %w", wresp.Err())
}
if wresp.Err() != nil {
return fmt.Errorf("etcd monitor chan error: %w", wresp.Err())
}

c.handleWatchEvents(key, wresp.Events)
case <-ctx.Done():
return nil
case <-c.done:
return nil
}
}
return ctx, rch
}

func (c *cluster) watchConnState(cli EtcdClient) {
Expand Down Expand Up @@ -475,6 +473,28 @@ func DialClient(endpoints []string) (EtcdClient, error) {
return clientv3.New(cfg)
}

func calculateChanges(oldVals, newVals map[string]string) (add, remove []KV) {
for k, v := range newVals {
if val, ok := oldVals[k]; !ok || v != val {
add = append(add, KV{
Key: k,
Val: v,
})
}
}

for k, v := range oldVals {
if val, ok := newVals[k]; !ok || v != val {
remove = append(remove, KV{
Key: k,
Val: v,
})
}
}

return add, remove
}

func getClusterKey(endpoints []string) string {
sort.Strings(endpoints)
return strings.Join(endpoints, endpointsSeparator)
Expand Down

0 comments on commit 107a9f4

Please sign in to comment.