From 8f62a586dd38d0901ed46374ad12ddf2fa5b4b1f Mon Sep 17 00:00:00 2001 From: lidengfu Date: Mon, 11 Dec 2023 15:09:25 +0800 Subject: [PATCH] fix endless loop caused by ErrCompacted --- core/discov/internal/registry.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index 97dd97aa8113..da5d6fef44a3 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -2,6 +2,7 @@ package internal import ( "context" + "errors" "fmt" "io" "sort" @@ -14,6 +15,7 @@ import ( "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/syncx" "github.com/zeromicro/go-zero/core/threading" + v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" ) @@ -288,13 +290,18 @@ func (c *cluster) reload(cli EtcdClient) { func (c *cluster) watch(cli EtcdClient, key string, rev int64) { for { - if c.watchStream(cli, key, rev) { + err := c.watchStream(cli, key, rev) + if err == nil { return } + if errors.Is(err, v3rpc.ErrCompacted) { + logx.Errorf("etcd watch stream has been compacted, try to reload, rev %v", rev) + c.reload(cli) + } } } -func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool { +func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) error { var rch clientv3.WatchChan if rev != 0 { rch = cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix(), @@ -308,20 +315,20 @@ func (c *cluster) watchStream(cli EtcdClient, key string, rev int64) bool { case wresp, ok := <-rch: if !ok { logx.Error("etcd monitor chan has been closed") - return false + return errors.New("etcd monitor chan has been closed") } if wresp.Canceled { logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err()) - return false + return wresp.Err() } if wresp.Err() != nil { logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err())) - return false + return wresp.Err() } c.handleWatchEvents(key, wresp.Events) case <-c.done: - return true + return nil } } }