Skip to content

Commit

Permalink
fix: prevent storing old vbID when .Ack() trigger in future cases
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Dec 6, 2023
1 parent a43ebc8 commit cf76bb4
Showing 1 changed file with 15 additions and 6 deletions.
21 changes: 15 additions & 6 deletions stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,17 @@ type stream struct {
vBucketDiscovery VBucketDiscovery
bus EventBus.Bus
eventHandler models.EventHandler
stopCh chan struct{}
finishStreamWithCloseCh chan struct{}
listener models.Listener
finishStreamWithEndEventCh chan struct{}
rebalanceTimer *time.Timer
dirtyOffsets *wrapper.ConcurrentSwissMap[uint16, bool]
listener models.Listener
stopCh chan struct{}
config *config.Dcp
metric *Metric
finishStreamWithEndEventCh chan struct{}
finishStreamWithCloseCh chan struct{}
collectionIDs map[uint32]string
offsets *wrapper.ConcurrentSwissMap[uint16, *models.Offset]
vbIds *wrapper.ConcurrentSwissMap[uint16, struct{}]
activeStreams int
rebalanceLock sync.Mutex
anyDirtyOffset bool
Expand All @@ -70,8 +71,12 @@ type stream struct {
}

func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) {
s.offsets.Store(vbID, offset)
s.dirtyOffsets.Store(vbID, dirty)
if _, ok := s.vbIds.Load(vbID); ok {
s.offsets.Store(vbID, offset)
s.dirtyOffsets.Store(vbID, dirty)
} else {
logger.Log.Warn("vbId=%v not belong our vbId range", vbID)
}
}

func (s *stream) waitAndForward(payload interface{}, offset *models.Offset, vbID uint16, eventTime time.Time) {
Expand Down Expand Up @@ -172,6 +177,10 @@ func (s *stream) Open() {
s.activeStreams = len(vbIds)

s.checkpoint = NewCheckpoint(s, vbIds, s.client, s.metadata, s.config)
s.vbIds = wrapper.CreateConcurrentSwissMap[uint16, struct{}](1024)
for _, vbID := range vbIds {
s.vbIds.Store(vbID, struct{}{})
}
s.offsets, s.dirtyOffsets, s.anyDirtyOffset = s.checkpoint.Load()
s.observer = couchbase.NewObserver(s.config, s.collectionIDs, s.bus)

Expand Down

0 comments on commit cf76bb4

Please sign in to comment.