Skip to content

Commit

Permalink
feat: qol improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Oct 1, 2024
1 parent 0b22765 commit 255b107
Show file tree
Hide file tree
Showing 9 changed files with 38 additions and 22 deletions.
14 changes: 11 additions & 3 deletions api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type api struct {
app *fiber.App
config *dcp.Dcp
registerer *metric.Registerer
membershipInfo *membership.Model
bus EventBus.Bus
}

Expand Down Expand Up @@ -94,12 +95,19 @@ func (s *api) info(c *fiber.Ctx) error {
if err := c.BodyParser(&req); err != nil {
return c.Status(fiber.StatusBadRequest).SendString("invalid request body")
}
logger.Log.Debug("new info arrived for member: %v/%v", req.MemberNumber, req.TotalMembers)

s.bus.Publish(helpers.MembershipChangedBusEventName, &membership.Model{
newInfo := &membership.Model{
MemberNumber: req.MemberNumber,
TotalMembers: req.TotalMembers,
})
}

if newInfo.IsChanged(s.membershipInfo) {
s.membershipInfo = newInfo

logger.Log.Debug("new info arrived for member: %v/%v", newInfo.MemberNumber, newInfo.TotalMembers)

s.bus.Publish(helpers.MembershipChangedBusEventName, newInfo)
}

return c.SendString("OK")
}
Expand Down
10 changes: 5 additions & 5 deletions config/dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ type DCPGroup struct {
}

type DCPListener struct {
BufferSize uint `yaml:"bufferSize"`
SkipUntil *time.Time `yaml:"skipUntil"`
BufferSize uint `yaml:"bufferSize"`
}

type ExternalDcpConfig struct {
Expand All @@ -61,9 +61,9 @@ type ExternalDcpConfig struct {
type ExternalDcp struct {
BufferSize any `yaml:"bufferSize"`
ConnectionBufferSize any `yaml:"connectionBufferSize"`
Listener DCPListener `yaml:"listener"`
Group DCPGroup `yaml:"group"`
ConnectionTimeout time.Duration `yaml:"connectionTimeout"`
Listener DCPListener `yaml:"listener"`
Config ExternalDcpConfig `yaml:"config"`
}

Expand Down Expand Up @@ -118,16 +118,16 @@ type Logging struct {

type Dcp struct {
ConnectionBufferSize any `yaml:"connectionBufferSize"`
Metric Metric `yaml:"metric"`
BucketName string `yaml:"bucketName"`
ScopeName string `yaml:"scopeName"`
Password string `yaml:"password"`
RootCAPath string `yaml:"rootCAPath"`
Username string `yaml:"username"`
Logging Logging `yaml:"logging"`
ScopeName string `yaml:"scopeName"`
Password string `yaml:"password"`
Metadata Metadata `yaml:"metadata"`
CollectionNames []string `yaml:"collectionNames"`
Hosts []string `yaml:"hosts"`
Metric Metric `yaml:"metric"`
Checkpoint Checkpoint `yaml:"checkpoint"`
LeaderElection LeaderElection `yaml:"leaderElection"`
Dcp ExternalDcp `yaml:"dcp"`
Expand Down
10 changes: 8 additions & 2 deletions couchbase/membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,16 @@ func (h *cbMembership) rebalance(instances []Instance) {
logger.Log.Error("error while rebalance, self = %v, err: %v", string(h.id), err)
panic(err)
} else {
h.bus.Publish(helpers.MembershipChangedBusEventName, &membership.Model{
newInfo := &membership.Model{
MemberNumber: selfOrder,
TotalMembers: len(instances),
})
}

if newInfo.IsChanged(h.info) {
logger.Log.Debug("new info arrived for member: %v/%v", newInfo.MemberNumber, newInfo.TotalMembers)

h.bus.Publish(helpers.MembershipChangedBusEventName, newInfo)
}

h.lastActiveInstances = instances
}
Expand Down
10 changes: 2 additions & 8 deletions dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,8 @@ func (s *dcp) SetEventHandler(eventHandler models.EventHandler) {
s.eventHandler = eventHandler
}

func (s *dcp) membershipChangedListener(m *membership.Model) {
currentInfo := &membership.Model{
MemberNumber: s.vBucketDiscovery.GetMetric().MemberNumber,
TotalMembers: s.vBucketDiscovery.GetMetric().TotalMembers,
}
if s.stream.IsOpen() && currentInfo.IsChanged(m) {
s.stream.Rebalance()
}
func (s *dcp) membershipChangedListener(_ *membership.Model) {
s.stream.Rebalance()
}

//nolint:funlen
Expand Down
2 changes: 1 addition & 1 deletion example/config_dynamic.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
hosts:
- localhost:8091
username: user
password: password
password: 123456
bucketName: dcp-test
dcp:
group:
Expand Down
2 changes: 1 addition & 1 deletion example/config_k8s_default.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
hosts:
- couchbase.couchbase.svc.cluster.local:8091
username: user
password: password
password: 123456
bucketName: dcp-test
dcp:
group:
Expand Down
2 changes: 1 addition & 1 deletion example/config_k8s_leader_election.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
hosts:
- couchbase.couchbase.svc.cluster.local:8091
username: user
password: password
password: 123456
bucketName: dcp-test
dcp:
group:
Expand Down
1 change: 1 addition & 0 deletions membership/dynamic_membership.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ func (d *dynamicMembership) GetInfo() *Model {
if d.info != nil {
return d.info
}
logger.Log.Info("dynamic membership waiting first request")
return <-d.infoChan
}

Expand Down
9 changes: 8 additions & 1 deletion membership/membership.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package membership

import "github.com/Trendyol/go-dcp/logger"

type Membership interface {
GetInfo() *Model
Close()
Expand All @@ -23,5 +25,10 @@ func (s *Model) IsChanged(other *Model) bool {
return true
}

return s.MemberNumber != other.MemberNumber || s.TotalMembers != other.TotalMembers
res := s.MemberNumber != other.MemberNumber || s.TotalMembers != other.TotalMembers
if !res {
logger.Log.Info("membership info not changed")
}

return res
}

0 comments on commit 255b107

Please sign in to comment.