diff --git a/stream/stream.go b/stream/stream.go index 7b20981..1c17e8a 100644 --- a/stream/stream.go +++ b/stream/stream.go @@ -52,16 +52,17 @@ type stream struct { vBucketDiscovery VBucketDiscovery bus EventBus.Bus eventHandler models.EventHandler - stopCh chan struct{} - finishStreamWithCloseCh chan struct{} + listener models.Listener + finishStreamWithEndEventCh chan struct{} rebalanceTimer *time.Timer dirtyOffsets *wrapper.ConcurrentSwissMap[uint16, bool] - listener models.Listener + stopCh chan struct{} config *config.Dcp metric *Metric - finishStreamWithEndEventCh chan struct{} + finishStreamWithCloseCh chan struct{} collectionIDs map[uint32]string offsets *wrapper.ConcurrentSwissMap[uint16, *models.Offset] + vbIds *wrapper.ConcurrentSwissMap[uint16, struct{}] activeStreams int rebalanceLock sync.Mutex anyDirtyOffset bool @@ -70,8 +71,12 @@ type stream struct { } func (s *stream) setOffset(vbID uint16, offset *models.Offset, dirty bool) { - s.offsets.Store(vbID, offset) - s.dirtyOffsets.Store(vbID, dirty) + if _, ok := s.vbIds.Load(vbID); ok { + s.offsets.Store(vbID, offset) + s.dirtyOffsets.Store(vbID, dirty) + } else { + logger.Log.Warn("vbId=%v not belong our vbId range", vbID) + } } func (s *stream) waitAndForward(payload interface{}, offset *models.Offset, vbID uint16, eventTime time.Time) { @@ -172,6 +177,10 @@ func (s *stream) Open() { s.activeStreams = len(vbIds) s.checkpoint = NewCheckpoint(s, vbIds, s.client, s.metadata, s.config) + s.vbIds = wrapper.CreateConcurrentSwissMap[uint16, struct{}](1024) + for _, vbID := range vbIds { + s.vbIds.Store(vbID, struct{}{}) + } s.offsets, s.dirtyOffsets, s.anyDirtyOffset = s.checkpoint.Load() s.observer = couchbase.NewObserver(s.config, s.collectionIDs, s.bus)