Skip to content

Commit

Permalink
chore: prepare for 1.2.0
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Nov 8, 2023
1 parent 83f74f7 commit fa3cb73
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 53 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ You can adjust the average window time for the metrics by specifying the value o
| cbgo_process_latency_ms_current | The average process latency in milliseconds for the last metric.averageWindowSec | N/A | Gauge |
| cbgo_dcp_latency_ms_current | The latest consumed dcp message latency in milliseconds | N/A | Counter |
| cbgo_rebalance_current | The number of total rebalance | N/A | Gauge |
| cbgo_active_stream_current | The number of total active stream | N/A | Gauge |
| cbgo_total_members_current | The total number of members in the cluster | N/A | Gauge |
| cbgo_member_number_current | The number of the current member | N/A | Gauge |
| cbgo_membership_type_current | The type of membership of the current member | Membership type | Gauge |
Expand Down
8 changes: 1 addition & 7 deletions couchbase/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func (so *observer) needCatchup(vbID uint16, seqNo uint64) bool {
so.catchup.Delete(vbID)
so.catchupNeededVbIDCount--

logger.Log.Info("catchup completed for vbID: %d", vbID)
logger.Log.Info("catchup completed for vbID: %d, remaining catchup: %d", vbID, so.catchupNeededVbIDCount)

return seqNo == catchupSeqNo
}
Expand Down Expand Up @@ -269,12 +269,6 @@ func (so *observer) End(event models.DcpStreamEnd, err error) {
}
}()

if err != nil {
logger.Log.Error("end stream vbId: %v got error: %v", event.VbID, err)
} else {
logger.Log.Info("end stream vbId: %v", event.VbID)
}

so.listenerEndCh <- models.DcpStreamEndContext{
Event: event,
Err: err,
Expand Down
41 changes: 14 additions & 27 deletions couchbase/rollback_mitigation.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ type rollbackMitigation struct {
configSnapshot *gocbcore.ConfigSnapshot
persistedSeqNos *wrapper.ConcurrentSwissMap[uint16, []*vbUUIDAndSeqNo]
vbUUIDMap *wrapper.ConcurrentSwissMap[uint16, gocbcore.VbUUID]
failOverUUIDTimer *time.Ticker
configWatchTimer *time.Ticker
observeTimer *time.Ticker
observeCloseCh chan struct{}
Expand Down Expand Up @@ -107,13 +106,12 @@ func (r *rollbackMitigation) observeVbID(
callback func(*gocbcore.ObserveVbResult, error),
) { //nolint:unused
_, err := r.client.GetAgent().ObserveVb(gocbcore.ObserveVbOptions{
VbID: vbID,
ReplicaIdx: replica,
VbUUID: vbUUID,
RetryStrategy: gocbcore.NewBestEffortRetryStrategy(nil),
VbID: vbID,
ReplicaIdx: replica,
VbUUID: vbUUID,
}, callback)
if err != nil {
logger.Log.Error("ObserveVBID error for vbId: %v, replica:%v, vbUUID: %v, err: %v", vbID, replica, vbUUID, err)
logger.Log.Error("observeVBID error for vbId: %v, replica:%v, vbUUID: %v, err: %v", vbID, replica, vbUUID, err)
callback(nil, err)
}
}
Expand Down Expand Up @@ -164,15 +162,15 @@ func (r *rollbackMitigation) markAbsentInstances() error { //nolint:unused
serverIndex, err := r.configSnapshot.VbucketToServer(vbID, uint32(idx))
if err != nil {
if errors.Is(err, gocbcore.ErrInvalidReplica) {
logger.Log.Error("invalid replica of vbId: %v, replica: %v, err: %v", vbID, idx, err)
logger.Log.Debug("invalid replica of vbId: %v, replica: %v, err: %v", vbID, idx, err)
replica.SetAbsent()
} else {
outerError = err
return false
}
} else {
if serverIndex < 0 {
logger.Log.Error("invalid server index of vbId: %v, replica: %v, serverIndex: %v", vbID, idx, serverIndex)
logger.Log.Debug("invalid server index of vbId: %v, replica: %v, serverIndex: %v", vbID, idx, serverIndex)
replica.SetAbsent()
}
}
Expand All @@ -187,8 +185,7 @@ func (r *rollbackMitigation) markAbsentInstances() error { //nolint:unused
func (r *rollbackMitigation) startObserve(groupID int) {
r.vbUUIDMap = wrapper.CreateConcurrentSwissMap[uint16, gocbcore.VbUUID](1024)

r.LoadVbUUIDMap()
go r.observeVbUUIDMap()
r.loadVbUUIDMap()

r.observeTimer = time.NewTicker(r.config.RollbackMitigation.Interval)
for {
Expand Down Expand Up @@ -219,24 +216,15 @@ func (r *rollbackMitigation) startObserve(groupID int) {
}
}

func (r *rollbackMitigation) observeVbUUIDMap() {
if r.failOverUUIDTimer != nil {
return
}

r.failOverUUIDTimer = time.NewTicker(time.Minute)
for range r.failOverUUIDTimer.C {
r.LoadVbUUIDMap()
}
}

func (r *rollbackMitigation) LoadVbUUIDMap() {
func (r *rollbackMitigation) loadVbUUIDMap() {
r.persistedSeqNos.Range(func(vbID uint16, _ []*vbUUIDAndSeqNo) bool {
failoverLogs, err := r.client.GetFailoverLogs(vbID)
if err != nil {
panic(err)
}

r.vbUUIDMap.Store(vbID, failoverLogs[0].VbUUID)

var failoverInfos []string
for index, failoverLog := range failoverLogs {
failoverInfos = append(
Expand Down Expand Up @@ -305,6 +293,10 @@ func (r *rollbackMitigation) observe(vbID uint16, replica int, groupID int, vbUU
VbID: vbID,
SeqNo: r.getMinSeqNo(vbID),
})

if vbUUID != result.VbUUID {
r.vbUUIDMap.Store(vbID, result.VbUUID)
}
} else {
logger.Log.Error("replica: %v not found", replica)
}
Expand Down Expand Up @@ -348,7 +340,6 @@ func (r *rollbackMitigation) waitFirstConfig() error {

err = opm.Wait(op, err)
if err != nil {
logger.Log.Error("Error occurred while waiting first config err: %v", err)
return err
}

Expand Down Expand Up @@ -386,10 +377,6 @@ func (r *rollbackMitigation) Stop() {
r.configWatchTimer.Stop()
}

if r.failOverUUIDTimer != nil {
r.failOverUUIDTimer.Stop()
}

logger.Log.Info("rollback mitigation stopped")
}

Expand Down
4 changes: 3 additions & 1 deletion dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ type dcp struct {
readyCh chan struct{}
cancelCh chan os.Signal
metricCollectors []prometheus.Collector
closeWithCancel bool
}

func (s *dcp) startHealthCheck() {
Expand Down Expand Up @@ -164,6 +165,7 @@ func (s *dcp) Start() {
select {
case <-s.stopCh:
case <-s.cancelCh:
s.closeWithCancel = true
case <-s.healCheckFailedCh:
}
}
Expand All @@ -181,7 +183,7 @@ func (s *dcp) Close() {
if s.config.Checkpoint.Type == stream.CheckpointTypeAuto {
s.stream.Save()
}
s.stream.Close()
s.stream.Close(s.closeWithCancel)

if s.config.LeaderElection.Enabled {
s.leaderElection.Stop()
Expand Down
6 changes: 3 additions & 3 deletions example/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import (
func listener(ctx *models.ListenerContext) {
switch event := ctx.Event.(type) {
case models.DcpMutation:
logger.Log.Debug(
logger.Log.Info(
"mutated(vb=%v,eventTime=%v) | id: %v, value: %v | isCreated: %v",
event.VbID, event.EventTime, string(event.Key), string(event.Value), event.IsCreated(),
)
case models.DcpDeletion:
logger.Log.Debug(
logger.Log.Info(
"deleted(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
case models.DcpExpiration:
logger.Log.Debug(
logger.Log.Info(
"expired(vb=%v,eventTime=%v) | id: %v",
event.VbID, event.EventTime, string(event.Key),
)
Expand Down
47 changes: 32 additions & 15 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ type Stream interface {
Open()
Rebalance()
Save()
Close()
Close(bool)
GetOffsets() (*wrapper.ConcurrentSwissMap[uint16, *models.Offset], *wrapper.ConcurrentSwissMap[uint16, bool], bool)
GetObserver() couchbase.Observer
GetMetric() (*Metric, int)
Expand Down Expand Up @@ -64,6 +64,7 @@ type stream struct {
rebalanceLock sync.Mutex
anyDirtyOffset bool
balancing bool
closeWithCancel bool
}

func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) {
Expand Down Expand Up @@ -113,20 +114,34 @@ func (s *stream) listen() {
}
}

func (s *stream) reopenStream(vbID uint16) {
go func(innerVbID uint16) {
for {
err := s.openStream(innerVbID)
if err == nil {
logger.Log.Info("re-open stream, vbID: %d", innerVbID)
break
} else {
logger.Log.Error("cannot re-open stream, vbID: %d, err: %v", innerVbID, err)
}

time.Sleep(time.Second)
}
}(vbID)
}

func (s *stream) listenEnd() {
for endContext := range s.observer.ListenEnd() {
if endContext.Err != nil && errors.Is(endContext.Err, gocbcore.ErrSocketClosed) {
go func(vbID uint16) {
for {
err := s.openStream(vbID)
if err == nil {
logger.Log.Info("re-open stream, vbID: %d", vbID)
break
} else {
logger.Log.Error("cannot re-open stream, vbID: %d, err: %v", vbID, err)
}
}
}(endContext.Event.VbID)
if !s.closeWithCancel && endContext.Err != nil {
logger.Log.Error("end stream vbId: %v got error: %v", endContext.Event.VbID, endContext.Err)
}

if endContext.Err == nil {
logger.Log.Debug("end stream vbId: %v", endContext.Event.VbID)
}

if !s.closeWithCancel && endContext.Err != nil && errors.Is(endContext.Err, gocbcore.ErrSocketClosed) {
s.reopenStream(endContext.Event.VbID)
} else {
s.activeStreams--
if s.activeStreams == 0 {
Expand Down Expand Up @@ -185,7 +200,7 @@ func (s *stream) Rebalance() {
if !s.balancing {
s.balancing = true
s.Save()
s.Close()
s.Close(false)
}

s.eventHandler.AfterRebalanceStart()
Expand Down Expand Up @@ -282,7 +297,9 @@ func (s *stream) wait() {
}
}

func (s *stream) Close() {
func (s *stream) Close(closeWithCancel bool) {
s.closeWithCancel = closeWithCancel

s.eventHandler.BeforeStreamStop()

if !s.config.RollbackMitigation.Disabled {
Expand Down

0 comments on commit fa3cb73

Please sign in to comment.