Skip to content

Commit

Permalink
Fixed message filtering for batch subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m committed Aug 21, 2024
1 parent 497351f commit e0d04c7
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@
public enum MessageState {
INFLIGHT,
PROCESSED,
FILTERED
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,7 @@ public Set<SubscriptionPartitionOffset> calculateOffsetsToBeCommitted(Map<Subscr
try (HermesTimerContext ignored = timer.time()) {
List<SubscriptionPartitionOffset> processedOffsets = new ArrayList<>();
for (Map.Entry<SubscriptionPartitionOffset, MessageState> entry : offsets.entrySet()) {
// we consider filtered messages as processed
if (entry.getValue() != MessageState.INFLIGHT) {
if (entry.getValue() == MessageState.PROCESSED) {
processedOffsets.add(entry.getKey());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Map<SubscriptionPartitionOffset, MessageState> getOffsetsSnapshotAndRelea

for (Map.Entry<SubscriptionPartitionOffset, MessageState> entry : slots.entrySet()) {
offsetSnapshot.put(entry.getKey(), entry.getValue());
if (entry.getValue() != MessageState.INFLIGHT) {
if (entry.getValue() == MessageState.PROCESSED) {
slots.remove(entry.getKey());
permitsReleased++;
}
Expand Down

0 comments on commit e0d04c7

Please sign in to comment.