Skip to content

Commit

Permalink
Fixed message filtering for batch subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
szczygiel-m committed Aug 21, 2024
1 parent 9bf86d6 commit 5d0f48b
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,21 @@ public MessageBatchingResult next(Subscription subscription, Runnable signalsInt
if (maybeMessage.isPresent()) {
Message message = maybeMessage.get();

if (batch.canFit(message.getData())) {
batch.append(message.getData(), toMessageMetadata(message, subscription, batch.getId()));
} else if (batch.isBiggerThanTotalCapacity(message.getData())) {
logger.error("Message size exceeds buffer total capacity [size={}, capacity={}, subscription={}]",
message.getData().length, batch.getCapacity(), subscription.getQualifiedName());
discarded.add(toMessageMetadata(message, subscription));
} else {
logger.debug(
"Message too large for current batch [message_size={}, subscription={}]",
message.getData().length, subscription.getQualifiedName()
);
checkArgument(inflight.offer(message));
break;
if (!message.isFiltered()) {
if (batch.canFit(message.getData())) {
batch.append(message.getData(), toMessageMetadata(message, subscription, batch.getId()));
} else if (batch.isBiggerThanTotalCapacity(message.getData())) {
logger.error("Message size exceeds buffer total capacity [size={}, capacity={}, subscription={}]",
message.getData().length, batch.getCapacity(), subscription.getQualifiedName());
discarded.add(toMessageMetadata(message, subscription));
} else {
logger.debug(
"Message too large for current batch [message_size={}, subscription={}]",
message.getData().length, subscription.getQualifiedName()
);
checkArgument(inflight.offer(message));
break;
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.junit.jupiter.api.extension.RegisterExtension;
import pl.allegro.tech.hermes.api.BatchSubscriptionPolicy;
import pl.allegro.tech.hermes.api.ContentType;
import pl.allegro.tech.hermes.api.MessageFilterSpecification;
import pl.allegro.tech.hermes.api.Subscription;
import pl.allegro.tech.hermes.api.Topic;
import pl.allegro.tech.hermes.integrationtests.setup.HermesExtension;
Expand All @@ -23,10 +24,12 @@
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
import static com.github.tomakehurst.wiremock.stubbing.Scenario.STARTED;
import static com.google.common.collect.ImmutableMap.of;
import static java.util.Arrays.stream;
import static pl.allegro.tech.hermes.api.BatchSubscriptionPolicy.Builder.batchSubscriptionPolicy;
import static pl.allegro.tech.hermes.api.TopicWithSchema.topicWithSchema;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscription;
import static pl.allegro.tech.hermes.test.helper.builder.SubscriptionBuilder.subscriptionWithRandomName;
import static pl.allegro.tech.hermes.test.helper.builder.TopicBuilder.topicWithRandomName;

public class BatchDeliveryTest {
Expand All @@ -39,10 +42,19 @@ public class BatchDeliveryTest {
@RegisterExtension
public static final TestSubscribersExtension subscribers = new TestSubscribersExtension();

static final AvroUser BOB = new AvroUser("Bob", 50, "blue");

static final AvroUser ALICE = new AvroUser("Alice", 20, "magenta");

private static final TestMessage[] SMALL_BATCH = TestMessage.simpleMessages(2);

private static final TestMessage SINGLE_MESSAGE = TestMessage.simple();

private static final TestMessage SINGLE_MESSAGE_FILTERED = BOB.asTestMessage();

private static final MessageFilterSpecification MESSAGE_NAME_FILTER =
new MessageFilterSpecification(of("type", "jsonpath", "path", ".name", "matcher", "^Bob.*"));

@Test
public void shouldDeliverMessagesInBatch() {
// given
Expand All @@ -67,6 +79,28 @@ public void shouldDeliverMessagesInBatch() {
expectSingleBatch(subscriber, SMALL_BATCH);
}

@Test
public void shouldFilterIncomingEventsForBatch() {
// given
TestSubscriber subscriber = subscribers.createSubscriber();
Topic topic = hermes.initHelper().createTopic(topicWithRandomName().build());
hermes.initHelper().createSubscription(subscriptionWithRandomName(topic.getName(), subscriber.getEndpoint())
.withSubscriptionPolicy(buildBatchPolicy()
.withBatchSize(2)
.withBatchTime(Integer.MAX_VALUE)
.withBatchVolume(1024)
.build())
.withFilter(MESSAGE_NAME_FILTER)
.build());

// when
hermes.api().publishUntilSuccess(topic.getQualifiedName(), ALICE.asJson());
hermes.api().publishUntilSuccess(topic.getQualifiedName(), BOB.asJson());

// then
expectSingleBatch(subscriber, SINGLE_MESSAGE_FILTERED);
}

@Test
public void shouldDeliverBatchInGivenTimePeriod() {
// given
Expand Down

0 comments on commit 5d0f48b

Please sign in to comment.