From f6a7cfcbabb4633be4b0daba7bd9bf8a7449e10c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 15 Dec 2023 12:47:36 +0200 Subject: [PATCH] Fix. --- .../com/transferwise/kafka/tkms/EndToEndIntTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index 2d3063a..010d3ff 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -251,13 +251,19 @@ void testExactlyOnceDelivery(int scenario) throws Exception { void testThatMessagesWithSameKeyEndUpInOnePartition(boolean deferUntilCommit) { setupConfig(deferUntilCommit); - String message = "Hello World!"; + String protoMessage = "Hello Estonia!"; + String message = StringUtils.repeat(protoMessage, 100); String key = "GrailsRocks"; - int n = 20; + int n = 200; ConcurrentHashMap partitionsMap = new ConcurrentHashMap<>(); AtomicInteger receivedCount = new AtomicInteger(); Consumer> messageCounter = cr -> { + var testEvent = ExceptionUtils.doUnchecked(() -> objectMapper.readValue(cr.value(), TestEvent.class)); + if (!message.equals(testEvent.getMessage())) { + throw new IllegalStateException("Unexpected message '" + message + "' received."); + } + partitionsMap.computeIfAbsent(cr.partition(), (k) -> new AtomicInteger()).incrementAndGet(); receivedCount.incrementAndGet(); };