Skip to content

Commit

Permalink
Fixed message filtering for batch subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m committed Aug 21, 2024
1 parent f8dad25 commit 5db6818
Showing 1 changed file with 13 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,21 +81,19 @@ public MessageBatchingResult next(Subscription subscription, Runnable signalsInt
if (maybeMessage.isPresent()) {
Message message = maybeMessage.get();

if (!message.isFiltered()) {
if (batch.canFit(message.getData())) {
batch.append(message.getData(), toMessageMetadata(message, subscription, batch.getId()));
} else if (batch.isBiggerThanTotalCapacity(message.getData())) {
logger.error("Message size exceeds buffer total capacity [size={}, capacity={}, subscription={}]",
message.getData().length, batch.getCapacity(), subscription.getQualifiedName());
discarded.add(toMessageMetadata(message, subscription));
} else {
logger.debug(
"Message too large for current batch [message_size={}, subscription={}]",
message.getData().length, subscription.getQualifiedName()
);
checkArgument(inflight.offer(message));
break;
}
if (batch.canFit(message.getData())) {
batch.append(message.getData(), toMessageMetadata(message, subscription, batch.getId()));
} else if (batch.isBiggerThanTotalCapacity(message.getData())) {
logger.error("Message size exceeds buffer total capacity [size={}, capacity={}, subscription={}]",
message.getData().length, batch.getCapacity(), subscription.getQualifiedName());
discarded.add(toMessageMetadata(message, subscription));
} else {
logger.debug(
"Message too large for current batch [message_size={}, subscription={}]",
message.getData().length, subscription.getQualifiedName()
);
checkArgument(inflight.offer(message));
break;
}
}
}
Expand Down

0 comments on commit 5db6818

Please sign in to comment.