From 255b1073dc153267e6dcb2307760dcdd0aa83f60 Mon Sep 17 00:00:00 2001 From: Eray Arslan Date: Tue, 1 Oct 2024 10:56:18 +0300 Subject: [PATCH] feat: qol improvements --- api/api.go | 14 +++++++++++--- config/dcp.go | 10 +++++----- couchbase/membership.go | 10 ++++++++-- dcp.go | 10 ++-------- example/config_dynamic.yml | 2 +- example/config_k8s_default.yml | 2 +- example/config_k8s_leader_election.yml | 2 +- membership/dynamic_membership.go | 1 + membership/membership.go | 9 ++++++++- 9 files changed, 38 insertions(+), 22 deletions(-) diff --git a/api/api.go b/api/api.go index ed410c4..0bac0b8 100644 --- a/api/api.go +++ b/api/api.go @@ -38,6 +38,7 @@ type api struct { app *fiber.App config *dcp.Dcp registerer *metric.Registerer + membershipInfo *membership.Model bus EventBus.Bus } @@ -94,12 +95,19 @@ func (s *api) info(c *fiber.Ctx) error { if err := c.BodyParser(&req); err != nil { return c.Status(fiber.StatusBadRequest).SendString("invalid request body") } - logger.Log.Debug("new info arrived for member: %v/%v", req.MemberNumber, req.TotalMembers) - s.bus.Publish(helpers.MembershipChangedBusEventName, &membership.Model{ + newInfo := &membership.Model{ MemberNumber: req.MemberNumber, TotalMembers: req.TotalMembers, - }) + } + + if newInfo.IsChanged(s.membershipInfo) { + s.membershipInfo = newInfo + + logger.Log.Debug("new info arrived for member: %v/%v", newInfo.MemberNumber, newInfo.TotalMembers) + + s.bus.Publish(helpers.MembershipChangedBusEventName, newInfo) + } return c.SendString("OK") } diff --git a/config/dcp.go b/config/dcp.go index 3cabde0..aef8c29 100644 --- a/config/dcp.go +++ b/config/dcp.go @@ -50,8 +50,8 @@ type DCPGroup struct { } type DCPListener struct { - BufferSize uint `yaml:"bufferSize"` SkipUntil *time.Time `yaml:"skipUntil"` + BufferSize uint `yaml:"bufferSize"` } type ExternalDcpConfig struct { @@ -61,9 +61,9 @@ type ExternalDcpConfig struct { type ExternalDcp struct { BufferSize any `yaml:"bufferSize"` ConnectionBufferSize any `yaml:"connectionBufferSize"` + Listener DCPListener `yaml:"listener"` Group DCPGroup `yaml:"group"` ConnectionTimeout time.Duration `yaml:"connectionTimeout"` - Listener DCPListener `yaml:"listener"` Config ExternalDcpConfig `yaml:"config"` } @@ -118,16 +118,16 @@ type Logging struct { type Dcp struct { ConnectionBufferSize any `yaml:"connectionBufferSize"` + Metric Metric `yaml:"metric"` BucketName string `yaml:"bucketName"` - ScopeName string `yaml:"scopeName"` - Password string `yaml:"password"` RootCAPath string `yaml:"rootCAPath"` Username string `yaml:"username"` Logging Logging `yaml:"logging"` + ScopeName string `yaml:"scopeName"` + Password string `yaml:"password"` Metadata Metadata `yaml:"metadata"` CollectionNames []string `yaml:"collectionNames"` Hosts []string `yaml:"hosts"` - Metric Metric `yaml:"metric"` Checkpoint Checkpoint `yaml:"checkpoint"` LeaderElection LeaderElection `yaml:"leaderElection"` Dcp ExternalDcp `yaml:"dcp"` diff --git a/couchbase/membership.go b/couchbase/membership.go index 7e08ecf..cda1176 100644 --- a/couchbase/membership.go +++ b/couchbase/membership.go @@ -266,10 +266,16 @@ func (h *cbMembership) rebalance(instances []Instance) { logger.Log.Error("error while rebalance, self = %v, err: %v", string(h.id), err) panic(err) } else { - h.bus.Publish(helpers.MembershipChangedBusEventName, &membership.Model{ + newInfo := &membership.Model{ MemberNumber: selfOrder, TotalMembers: len(instances), - }) + } + + if newInfo.IsChanged(h.info) { + logger.Log.Debug("new info arrived for member: %v/%v", newInfo.MemberNumber, newInfo.TotalMembers) + + h.bus.Publish(helpers.MembershipChangedBusEventName, newInfo) + } h.lastActiveInstances = instances } diff --git a/dcp.go b/dcp.go index 3b51fed..f291fa3 100644 --- a/dcp.go +++ b/dcp.go @@ -84,14 +84,8 @@ func (s *dcp) SetEventHandler(eventHandler models.EventHandler) { s.eventHandler = eventHandler } -func (s *dcp) membershipChangedListener(m *membership.Model) { - currentInfo := &membership.Model{ - MemberNumber: s.vBucketDiscovery.GetMetric().MemberNumber, - TotalMembers: s.vBucketDiscovery.GetMetric().TotalMembers, - } - if s.stream.IsOpen() && currentInfo.IsChanged(m) { - s.stream.Rebalance() - } +func (s *dcp) membershipChangedListener(_ *membership.Model) { + s.stream.Rebalance() } //nolint:funlen diff --git a/example/config_dynamic.yml b/example/config_dynamic.yml index 98c4604..65badf6 100644 --- a/example/config_dynamic.yml +++ b/example/config_dynamic.yml @@ -1,7 +1,7 @@ hosts: - localhost:8091 username: user -password: password +password: 123456 bucketName: dcp-test dcp: group: diff --git a/example/config_k8s_default.yml b/example/config_k8s_default.yml index 502fb42..30ab9db 100644 --- a/example/config_k8s_default.yml +++ b/example/config_k8s_default.yml @@ -1,7 +1,7 @@ hosts: - couchbase.couchbase.svc.cluster.local:8091 username: user -password: password +password: 123456 bucketName: dcp-test dcp: group: diff --git a/example/config_k8s_leader_election.yml b/example/config_k8s_leader_election.yml index c90d25f..f9be545 100644 --- a/example/config_k8s_leader_election.yml +++ b/example/config_k8s_leader_election.yml @@ -1,7 +1,7 @@ hosts: - couchbase.couchbase.svc.cluster.local:8091 username: user -password: password +password: 123456 bucketName: dcp-test dcp: group: diff --git a/membership/dynamic_membership.go b/membership/dynamic_membership.go index 17521e5..37a701a 100644 --- a/membership/dynamic_membership.go +++ b/membership/dynamic_membership.go @@ -16,6 +16,7 @@ func (d *dynamicMembership) GetInfo() *Model { if d.info != nil { return d.info } + logger.Log.Info("dynamic membership waiting first request") return <-d.infoChan } diff --git a/membership/membership.go b/membership/membership.go index e5b9c50..e5f5e50 100644 --- a/membership/membership.go +++ b/membership/membership.go @@ -1,5 +1,7 @@ package membership +import "github.com/Trendyol/go-dcp/logger" + type Membership interface { GetInfo() *Model Close() @@ -23,5 +25,10 @@ func (s *Model) IsChanged(other *Model) bool { return true } - return s.MemberNumber != other.MemberNumber || s.TotalMembers != other.TotalMembers + res := s.MemberNumber != other.MemberNumber || s.TotalMembers != other.TotalMembers + if !res { + logger.Log.Info("membership info not changed") + } + + return res }