Skip to content

Commit

Permalink
* Call embeddedKafkaBroker.afterPropertiesSet() explicitly in the `…
Browse files Browse the repository at this point in the history
…EmbeddedKafkaContextCustomizer`

since we need to have broker started and system props initialized before the rest of `ApplicationContext`
  • Loading branch information
artembilan committed Oct 31, 2023
1 parent 3a9e611 commit 7cbd1ea
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -92,6 +93,8 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {

private final Properties brokerProperties = new Properties();

private final AtomicBoolean initialized = new AtomicBoolean();

private KafkaClusterTestKit cluster;

private int[] kafkaPorts;
Expand Down Expand Up @@ -191,9 +194,11 @@ public void setAdminTimeout(int adminTimeout) {

@Override
public void afterPropertiesSet() {
overrideExitMethods();
addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count);
start();
if (this.initialized.compareAndSet(false, true)) {
overrideExitMethods();
addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count);
start();
}
}


Expand Down Expand Up @@ -232,7 +237,6 @@ private void start() {
System.setProperty(this.brokerListProperty, getBrokersAsString());
}
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
System.setProperty(this.brokerListProperty, getBrokersAsString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -111,6 +112,8 @@ public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {

private final Map<String, Object> brokerProperties = new HashMap<>();

private final AtomicBoolean initialized = new AtomicBoolean();

private EmbeddedZookeeper zookeeper;

private String zkConnect;
Expand Down Expand Up @@ -289,45 +292,47 @@ public synchronized EmbeddedKafkaZKBroker zkSessionTimeout(int zkSessionTimeout)

@Override
public void afterPropertiesSet() {
overrideExitMethods();
try {
this.zookeeper = new EmbeddedZookeeper(this.zkPort);
}
catch (IOException | InterruptedException e) {
throw new IllegalStateException("Failed to create embedded Zookeeper", e);
}
this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort();
this.kafkaServers.clear();
boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
for (int i = 0; i < this.count; i++) {
Properties brokerConfigProperties = createBrokerProperties(i);
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(),
String.valueOf(Long.MAX_VALUE));
this.brokerProperties.forEach(brokerConfigProperties::put);
if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
if (this.initialized.compareAndSet(false, true)) {
overrideExitMethods();
try {
this.zookeeper = new EmbeddedZookeeper(this.zkPort);
}
catch (IOException | InterruptedException e) {
throw new IllegalStateException("Failed to create embedded Zookeeper", e);
}
if (!userLogDir) {
logDir(brokerConfigProperties);
this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort();
this.kafkaServers.clear();
boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
for (int i = 0; i < this.count; i++) {
Properties brokerConfigProperties = createBrokerProperties(i);
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(),
String.valueOf(Long.MAX_VALUE));
this.brokerProperties.forEach(brokerConfigProperties::put);
if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
}
if (!userLogDir) {
logDir(brokerConfigProperties);
}
KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM);
this.kafkaServers.add(server);
if (this.kafkaPorts[i] == 0) {
this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT);
}
}
KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM);
this.kafkaServers.add(server);
if (this.kafkaPorts[i] == 0) {
this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT);
createKafkaTopics(this.topics);
if (this.brokerListProperty == null) {
this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
}
if (this.brokerListProperty != null) {
System.setProperty(this.brokerListProperty, getBrokersAsString());
}
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
}
createKafkaTopics(this.topics);
if (this.brokerListProperty == null) {
this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
}
if (this.brokerListProperty != null) {
System.setProperty(this.brokerListProperty, getBrokersAsString());
}
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
}

private void logDir(Properties brokerConfigProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty());
}

// Safe to start an embedded broker eagerly before context refresh
embeddedKafkaBroker.afterPropertiesSet();

((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(EmbeddedKafkaBroker.BEAN_NAME,
new RootBeanDefinition(EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker));
}
Expand Down

0 comments on commit 7cbd1ea

Please sign in to comment.