Skip to content

Commit

Permalink
ReactorKafkaBinderTests race condition issue
Browse files Browse the repository at this point in the history
* There seems to be a race condition in ReactorKafkaBinderTests that causes
  tests to fail on CI occasionaly. Trying to address this by single dedicated
  topic per test.
  • Loading branch information
sobychacko committed May 17, 2024
1 parent fd9fa80 commit 13f94d2
Showing 1 changed file with 22 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2021-2023 the original author or authors.
* Copyright 2021-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,12 +27,10 @@
import java.util.concurrent.atomic.AtomicReference;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.jupiter.api.Test;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.kafka.receiver.ReceiverOffset;
import reactor.kafka.sender.SenderResult;

Expand Down Expand Up @@ -68,10 +66,11 @@

/**
* @author Gary Russell
* @author Soby Chacko
* @since 4.0
*
*/
@EmbeddedKafka(topics = { "testCa", "testCb", "testC1", "testP" })
@EmbeddedKafka(topics = { "testCa", "testCb", "testC1", "testC-Manual", "testC-AMO", "testP" })
class ReactorKafkaBinderTests {

@SuppressWarnings({ "rawtypes", "unchecked" })
Expand All @@ -90,7 +89,7 @@ void consumerBinding() throws Exception {
CountDownLatch latch = new CountDownLatch(2);

FluxMessageChannel inbound = new FluxMessageChannel();
Subscriber<Message<?>> sub = new Subscriber<Message<?>>() {
Subscriber<Message<?>> sub = new Subscriber<>() {

@Override
public void onSubscribe(Subscription s) {
Expand Down Expand Up @@ -132,16 +131,16 @@ public void onComplete() {

@Test
void concurrencyManual() throws Exception {
concurrency(false);
concurrency("testC-Manual", "concurrencyManual-group", false);
}

@Test
void concurrencyAtMostOnce() throws Exception {
concurrency(true);
concurrency("testC-AMO", "concurrencyAtMostOnce-group", true);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
void concurrency(boolean atMostOnce) throws Exception {
void concurrency(String topic, String group, boolean atMostOnce) throws Exception {
KafkaProperties kafkaProperties = new KafkaProperties();
kafkaProperties.setBootstrapServers(
Collections.singletonList(EmbeddedKafkaCondition.getBroker().getBrokersAsString()));
Expand All @@ -159,7 +158,7 @@ void concurrency(boolean atMostOnce) throws Exception {
List<String> payloads = Collections.synchronizedList(new ArrayList<>());

FluxMessageChannel inbound = new FluxMessageChannel();
Subscriber<Message<?>> sub = new Subscriber<Message<?>>() {
Subscriber<Message<?>> sub = new Subscriber<>() {

@Override
public void onSubscribe(Subscription s) {
Expand Down Expand Up @@ -192,24 +191,24 @@ public void onComplete() {
KafkaConsumerProperties ext = new KafkaConsumerProperties();
ext.setReactiveAtMostOnce(atMostOnce);
ExtendedConsumerProperties<KafkaConsumerProperties> props =
new ExtendedConsumerProperties<KafkaConsumerProperties>(ext);
new ExtendedConsumerProperties<>(ext);
props.setConcurrency(2);

Binding<MessageChannel> consumer = binder.bindConsumer("testC1", "foo", inbound, props);
Binding<MessageChannel> consumer = binder.bindConsumer(topic, group, inbound, props);

assertThat(subscriptionLatch.await(10, TimeUnit.SECONDS)).isTrue();
DefaultKafkaProducerFactory pf =
new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(EmbeddedKafkaCondition.getBroker()));
KafkaTemplate kt = new KafkaTemplate<>(pf);
kt.send("testC1", 0, null, "foo").get(10, TimeUnit.SECONDS);
kt.send("testC1", 1, null, "bar").get(10, TimeUnit.SECONDS);
kt.send("testC1", 0, null, "baz").get(10, TimeUnit.SECONDS);
kt.send("testC1", 1, null, "qux").get(10, TimeUnit.SECONDS);
kt.send(topic, 0, null, "foo").get(10, TimeUnit.SECONDS);
kt.send(topic, 1, null, "bar").get(10, TimeUnit.SECONDS);
kt.send(topic, 0, null, "baz").get(10, TimeUnit.SECONDS);
kt.send(topic, 1, null, "qux").get(10, TimeUnit.SECONDS);
assertThat(messageLatch1.await(10, TimeUnit.SECONDS)).isTrue();
consumer.stop();
consumer.start();
kt.send("testC1", 0, null, "fiz").get(10, TimeUnit.SECONDS);
kt.send("testC1", 1, null, "buz").get(10, TimeUnit.SECONDS);
kt.send(topic, 0, null, "fiz").get(10, TimeUnit.SECONDS);
kt.send(topic, 1, null, "buz").get(10, TimeUnit.SECONDS);
assertThat(messageLatch2.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(partitions).hasSize(2);
consumer.unbind();
Expand All @@ -235,11 +234,10 @@ void autoCommit() throws Exception {

CountDownLatch subscriptionLatch = new CountDownLatch(1);
CountDownLatch messageLatch1 = new CountDownLatch(4);
Set<Integer> partitions = new HashSet<>();
List<String> payloads = Collections.synchronizedList(new ArrayList<>());

FluxMessageChannel inbound = new FluxMessageChannel();
Subscriber<Message<?>> sub = new Subscriber<Message<?>>() {
Subscriber<Message<?>> sub = new Subscriber<>() {

@Override
public void onSubscribe(Subscription s) {
Expand All @@ -250,11 +248,11 @@ public void onSubscribe(Subscription s) {
@Override
public void onNext(Message<?> msg) {
((Message<Flux<ConsumerRecord<?, String>>>) msg).getPayload()
.doOnNext(rec -> {
payloads.add(rec.value());
messageLatch1.countDown();
})
.subscribe();
.doOnNext(rec -> {
payloads.add(rec.value());
messageLatch1.countDown();
})
.subscribe();
}

@Override
Expand Down Expand Up @@ -339,7 +337,6 @@ public void handleMessage(Message<?> message) throws MessagingException {
props.getExtension().setRecordMetadataChannel("sendResults");

Binding<MessageChannel> bindProducer = binder.bindProducer("testP", outbound, props);
AtomicReference<Mono<RecordMetadata>> sendResult = new AtomicReference<>();
outbound.send(MessageBuilder.withPayload("foo")
.setHeader(IntegrationMessageHeaderAccessor.CORRELATION_ID, 1)
.build());
Expand Down

0 comments on commit 13f94d2

Please sign in to comment.