diff --git a/pkg/pgcapture/debounce.go b/pkg/pgcapture/debounce.go index 28913ed..276f9fc 100644 --- a/pkg/pgcapture/debounce.go +++ b/pkg/pgcapture/debounce.go @@ -96,26 +96,45 @@ func (b *DebounceHandler) Handle(fn ModelAsyncHandlerFunc, checkpoint cursor.Che switch change.Op { case pb.Change_INSERT: - if prev, ok := b.store[debounceKey(change.New)]; ok { + key := debounceKey(change.New) + if prev, ok := b.store[key]; ok { b.handle(prev) + delete(b.store, key) } b.handle(e) case pb.Change_DELETE: - if prev, ok := b.store[debounceKey(change.Old)]; ok { + key := debounceKey(change.Old) + if prev, ok := b.store[key]; ok { b.handle(prev) + delete(b.store, key) } b.handle(e) case pb.Change_UPDATE: if change.Old != nil { - if prev, ok := b.store[debounceKey(change.Old)]; ok { + key := debounceKey(change.Old) + if prev, ok := b.store[key]; ok { b.handle(prev) + delete(b.store, key) } } key := debounceKey(change.New) if prev, ok := b.store[key]; ok { - b.source.Commit(prev.Checkpoint) + // since requeue order is not guaranteed, we need to check if the new event is newer than the previous one + // then we commit the previous one and store the new one + // workaround for the LSN == 0 issue because schedule dump lsn is 0 and should be always the latest event + // also, when the checkpoint is equal, we cannot commit the previous event because it might be a same event + if change.Checkpoint.LSN == 0 || change.Checkpoint.After(prev.Checkpoint) { + b.source.Commit(prev.Checkpoint) + b.store[key] = e + } else if change.Checkpoint.Equal(prev.Checkpoint) { + b.handle(prev) + b.store[key] = e + } else { + b.source.Commit(change.Checkpoint) + } + } else { + b.store[key] = e } - b.store[key] = e } }