Skip to content

Commit

Permalink
Fixed message filtering for batch subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m committed Aug 21, 2024
1 parent 5d0f48b commit f8dad25
Showing 1 changed file with 6 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,14 @@ private Optional<Message> readAndTransform(Subscription subscription, String bat
if (maybeMessage.isPresent()) {
Message message = maybeMessage.get();

Message transformed = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
transformed = message().fromMessage(transformed).withData(wrap(subscription, transformed)).build();
if (!message.isFiltered()) {
Message transformed = messageConverterResolver.converterFor(message, subscription).convert(message, topic);
transformed = message().fromMessage(transformed).withData(wrap(subscription, transformed)).build();

trackers.get(subscription).logInflight(toMessageMetadata(transformed, subscription, batchId));
trackers.get(subscription).logInflight(toMessageMetadata(transformed, subscription, batchId));

return Optional.of(transformed);
return Optional.of(transformed);
}
}
return Optional.empty();
}
Expand Down

0 comments on commit f8dad25

Please sign in to comment.