Skip to content

Commit

Permalink
Fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
onukristo committed Dec 15, 2023
1 parent dc041a7 commit f6a7cfc
Showing 1 changed file with 8 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,13 +251,19 @@ void testExactlyOnceDelivery(int scenario) throws Exception {
void testThatMessagesWithSameKeyEndUpInOnePartition(boolean deferUntilCommit) {
setupConfig(deferUntilCommit);

String message = "Hello World!";
String protoMessage = "Hello Estonia!";
String message = StringUtils.repeat(protoMessage, 100);
String key = "GrailsRocks";
int n = 20;
int n = 200;
ConcurrentHashMap<Integer, AtomicInteger> partitionsMap = new ConcurrentHashMap<>();
AtomicInteger receivedCount = new AtomicInteger();

Consumer<ConsumerRecord<String, String>> messageCounter = cr -> {
var testEvent = ExceptionUtils.doUnchecked(() -> objectMapper.readValue(cr.value(), TestEvent.class));
if (!message.equals(testEvent.getMessage())) {
throw new IllegalStateException("Unexpected message '" + message + "' received.");
}

partitionsMap.computeIfAbsent(cr.partition(), (k) -> new AtomicInteger()).incrementAndGet();
receivedCount.incrementAndGet();
};
Expand Down

0 comments on commit f6a7cfc

Please sign in to comment.