Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-2870: EmbeddedKafka: register BeanDefinition #2872

Merged
merged 2 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ ext {
springBootVersion = '3.0.9' // docs module
springDataVersion = '2023.1.0-RC1'
springRetryVersion = '2.0.4'
springVersion = '6.1.0-RC1'
springVersion = '6.1.0-SNAPSHOT'
zookeeperVersion = '3.6.4'

idPrefix = 'kafka'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -92,6 +93,8 @@ public class EmbeddedKafkaKraftBroker implements EmbeddedKafkaBroker {

private final Properties brokerProperties = new Properties();

private final AtomicBoolean initialized = new AtomicBoolean();

private KafkaClusterTestKit cluster;

private int[] kafkaPorts;
Expand Down Expand Up @@ -191,9 +194,11 @@ public void setAdminTimeout(int adminTimeout) {

@Override
public void afterPropertiesSet() {
overrideExitMethods();
addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count);
start();
if (this.initialized.compareAndSet(false, true)) {
overrideExitMethods();
addDefaultBrokerPropsIfAbsent(this.brokerProperties, this.count);
start();
}
}


Expand Down Expand Up @@ -232,7 +237,6 @@ private void start() {
System.setProperty(this.brokerListProperty, getBrokersAsString());
}
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
System.setProperty(this.brokerListProperty, getBrokersAsString());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -111,6 +112,8 @@ public class EmbeddedKafkaZKBroker implements EmbeddedKafkaBroker {

private final Map<String, Object> brokerProperties = new HashMap<>();

private final AtomicBoolean initialized = new AtomicBoolean();

private EmbeddedZookeeper zookeeper;

private String zkConnect;
Expand Down Expand Up @@ -289,45 +292,47 @@ public synchronized EmbeddedKafkaZKBroker zkSessionTimeout(int zkSessionTimeout)

@Override
public void afterPropertiesSet() {
overrideExitMethods();
try {
this.zookeeper = new EmbeddedZookeeper(this.zkPort);
}
catch (IOException | InterruptedException e) {
throw new IllegalStateException("Failed to create embedded Zookeeper", e);
}
this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort();
this.kafkaServers.clear();
boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
for (int i = 0; i < this.count; i++) {
Properties brokerConfigProperties = createBrokerProperties(i);
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(),
String.valueOf(Long.MAX_VALUE));
this.brokerProperties.forEach(brokerConfigProperties::put);
if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
if (this.initialized.compareAndSet(false, true)) {
overrideExitMethods();
try {
this.zookeeper = new EmbeddedZookeeper(this.zkPort);
}
catch (IOException | InterruptedException e) {
throw new IllegalStateException("Failed to create embedded Zookeeper", e);
}
if (!userLogDir) {
logDir(brokerConfigProperties);
this.zkConnect = LOOPBACK + ":" + this.zookeeper.getPort();
this.kafkaServers.clear();
boolean userLogDir = this.brokerProperties.get(KafkaConfig.LogDirProp()) != null && this.count == 1;
for (int i = 0; i < this.count; i++) {
Properties brokerConfigProperties = createBrokerProperties(i);
brokerConfigProperties.setProperty(KafkaConfig.ReplicaSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.ControllerSocketTimeoutMsProp(), "1000");
brokerConfigProperties.setProperty(KafkaConfig.OffsetsTopicReplicationFactorProp(), "1");
brokerConfigProperties.setProperty(KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp(),
String.valueOf(Long.MAX_VALUE));
this.brokerProperties.forEach(brokerConfigProperties::put);
if (!this.brokerProperties.containsKey(KafkaConfig.NumPartitionsProp())) {
brokerConfigProperties.setProperty(KafkaConfig.NumPartitionsProp(), "" + this.partitionsPerTopic);
}
if (!userLogDir) {
logDir(brokerConfigProperties);
}
KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM);
this.kafkaServers.add(server);
if (this.kafkaPorts[i] == 0) {
this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT);
}
}
KafkaServer server = TestUtils.createServer(new KafkaConfig(brokerConfigProperties), Time.SYSTEM);
this.kafkaServers.add(server);
if (this.kafkaPorts[i] == 0) {
this.kafkaPorts[i] = TestUtils.boundPort(server, SecurityProtocol.PLAINTEXT);
createKafkaTopics(this.topics);
if (this.brokerListProperty == null) {
this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
}
if (this.brokerListProperty != null) {
System.setProperty(this.brokerListProperty, getBrokersAsString());
}
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
}
createKafkaTopics(this.topics);
if (this.brokerListProperty == null) {
this.brokerListProperty = System.getProperty(BROKER_LIST_PROPERTY);
}
if (this.brokerListProperty != null) {
System.setProperty(this.brokerListProperty, getBrokersAsString());
}
System.setProperty(SPRING_EMBEDDED_KAFKA_BROKERS, getBrokersAsString());
System.setProperty(SPRING_EMBEDDED_ZOOKEEPER_CONNECT, getZookeeperConnectionString());
}

private void logDir(Properties brokerConfigProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.test.context.aot.DisabledInAotMode;

/**
* Annotation that can be specified on a test class that runs Spring for Apache Kafka
Expand Down Expand Up @@ -72,6 +73,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@DisabledInAotMode
public @interface EmbeddedKafka {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Properties;

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.io.Resource;
Expand Down Expand Up @@ -124,9 +126,11 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty());
}

beanFactory.initializeBean(embeddedKafkaBroker, EmbeddedKafkaBroker.BEAN_NAME);
beanFactory.registerSingleton(EmbeddedKafkaBroker.BEAN_NAME, embeddedKafkaBroker);
((DefaultSingletonBeanRegistry) beanFactory).registerDisposableBean(EmbeddedKafkaBroker.BEAN_NAME, embeddedKafkaBroker);
// Safe to start an embedded broker eagerly before context refresh
embeddedKafkaBroker.afterPropertiesSet();

((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(EmbeddedKafkaBroker.BEAN_NAME,
new RootBeanDefinition(EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker));
}

private int[] setupPorts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@
package org.springframework.kafka.test.context;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;

Expand Down Expand Up @@ -75,15 +70,14 @@ void testPorts() {
EmbeddedKafka annotationWithPorts =
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaPorts.class, EmbeddedKafka.class);
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
ConfigurableApplicationContext context = mock(ConfigurableApplicationContext.class);
BeanFactoryStub factoryStub = new BeanFactoryStub();
given(context.getBeanFactory()).willReturn(factoryStub);
given(context.getEnvironment()).willReturn(mock(ConfigurableEnvironment.class));
ConfigurableApplicationContext context = new GenericApplicationContext();
customizer.customizeContext(context, null);
context.refresh();

assertThat(factoryStub.getBroker().getBrokersAsString())
EmbeddedKafkaBroker embeddedKafkaBroker = context.getBean(EmbeddedKafkaBroker.class);
assertThat(embeddedKafkaBroker.getBrokersAsString())
.isEqualTo("127.0.0.1:" + annotationWithPorts.ports()[0]);
assertThat(KafkaTestUtils.getPropertyValue(factoryStub.getBroker(), "brokerListProperty"))
assertThat(KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerListProperty"))
.isEqualTo("my.bss.prop");
}

Expand All @@ -92,14 +86,12 @@ void testMulti() {
EmbeddedKafka annotationWithPorts =
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaMulti.class, EmbeddedKafka.class);
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
ConfigurableApplicationContext context = mock(ConfigurableApplicationContext.class);
BeanFactoryStub factoryStub = new BeanFactoryStub();
given(context.getBeanFactory()).willReturn(factoryStub);
given(context.getEnvironment()).willReturn(mock(ConfigurableEnvironment.class));
ConfigurableApplicationContext context = new GenericApplicationContext();
customizer.customizeContext(context, null);
context.refresh();

assertThat(factoryStub.getBroker().getBrokersAsString())
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
assertThat(context.getBean(EmbeddedKafkaBroker.class).getBrokersAsString())
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
}


Expand All @@ -122,31 +114,5 @@ private class TestWithEmbeddedKafkaPorts {
private class TestWithEmbeddedKafkaMulti {

}
@SuppressWarnings("serial")
private class BeanFactoryStub extends DefaultListableBeanFactory {

private Object bean;

public EmbeddedKafkaBroker getBroker() {
return (EmbeddedKafkaBroker) bean;
}

@Override
public Object initializeBean(Object existingBean, String beanName) throws BeansException {
this.bean = existingBean;
return bean;
}

@Override
public void registerSingleton(String beanName, Object singletonObject) {

}

@Override
public void registerDisposableBean(String beanName, DisposableBean bean) {

}

}

}