Skip to content

Commit

Permalink
GH-2870: EmbeddedKafka: register BeanDefinition
Browse files Browse the repository at this point in the history
Fixes #2870

The `EmbeddedKafkaContextCustomizer` uses `beanFactory.initializeBean()`
which is too early according to the `ApplicationContext` lifecycle
since it is not refreshed yet for `ContextCustomizer`

* Rework the logic in the `EmbeddedKafkaContextCustomizer` to register a `BeanDefinition`
for an `EmbeddedKafkaBroker` to include it into standard `ApplicationContext` lifecycle
* Also mark `@EmbeddedKafka` with a `@DisabledInAotMode` to disallow this kind of tests
in native images
  • Loading branch information
artembilan committed Oct 31, 2023
1 parent 9794c58 commit 3a9e611
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 48 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ ext {
springBootVersion = '3.0.9' // docs module
springDataVersion = '2023.1.0-RC1'
springRetryVersion = '2.0.4'
springVersion = '6.1.0-RC1'
springVersion = '6.1.0-SNAPSHOT'
zookeeperVersion = '3.6.4'

idPrefix = 'kafka'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.EmbeddedKafkaZKBroker;
import org.springframework.kafka.test.condition.EmbeddedKafkaCondition;
import org.springframework.test.context.aot.DisabledInAotMode;

/**
* Annotation that can be specified on a test class that runs Spring for Apache Kafka
Expand Down Expand Up @@ -72,6 +73,7 @@
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@DisabledInAotMode
public @interface EmbeddedKafka {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.util.Properties;

import org.springframework.beans.factory.config.ConfigurableListableBeanFactory;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.DefaultSingletonBeanRegistry;
import org.springframework.beans.factory.support.RootBeanDefinition;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.core.io.Resource;
Expand Down Expand Up @@ -124,9 +126,8 @@ public void customizeContext(ConfigurableApplicationContext context, MergedConte
embeddedKafkaBroker.brokerListProperty(this.embeddedKafka.bootstrapServersProperty());
}

beanFactory.initializeBean(embeddedKafkaBroker, EmbeddedKafkaBroker.BEAN_NAME);
beanFactory.registerSingleton(EmbeddedKafkaBroker.BEAN_NAME, embeddedKafkaBroker);
((DefaultSingletonBeanRegistry) beanFactory).registerDisposableBean(EmbeddedKafkaBroker.BEAN_NAME, embeddedKafkaBroker);
((BeanDefinitionRegistry) beanFactory).registerBeanDefinition(EmbeddedKafkaBroker.BEAN_NAME,
new RootBeanDefinition(EmbeddedKafkaBroker.class, () -> embeddedKafkaBroker));
}

private int[] setupPorts() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,13 @@
package org.springframework.kafka.test.context;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.springframework.beans.BeansException;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.env.ConfigurableEnvironment;
import org.springframework.kafka.test.EmbeddedKafkaBroker;
import org.springframework.kafka.test.utils.KafkaTestUtils;

Expand Down Expand Up @@ -75,15 +70,14 @@ void testPorts() {
EmbeddedKafka annotationWithPorts =
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaPorts.class, EmbeddedKafka.class);
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
ConfigurableApplicationContext context = mock(ConfigurableApplicationContext.class);
BeanFactoryStub factoryStub = new BeanFactoryStub();
given(context.getBeanFactory()).willReturn(factoryStub);
given(context.getEnvironment()).willReturn(mock(ConfigurableEnvironment.class));
ConfigurableApplicationContext context = new GenericApplicationContext();
customizer.customizeContext(context, null);
context.refresh();

assertThat(factoryStub.getBroker().getBrokersAsString())
EmbeddedKafkaBroker embeddedKafkaBroker = context.getBean(EmbeddedKafkaBroker.class);
assertThat(embeddedKafkaBroker.getBrokersAsString())
.isEqualTo("127.0.0.1:" + annotationWithPorts.ports()[0]);
assertThat(KafkaTestUtils.getPropertyValue(factoryStub.getBroker(), "brokerListProperty"))
assertThat(KafkaTestUtils.getPropertyValue(embeddedKafkaBroker, "brokerListProperty"))
.isEqualTo("my.bss.prop");
}

Expand All @@ -92,14 +86,12 @@ void testMulti() {
EmbeddedKafka annotationWithPorts =
AnnotationUtils.findAnnotation(TestWithEmbeddedKafkaMulti.class, EmbeddedKafka.class);
EmbeddedKafkaContextCustomizer customizer = new EmbeddedKafkaContextCustomizer(annotationWithPorts);
ConfigurableApplicationContext context = mock(ConfigurableApplicationContext.class);
BeanFactoryStub factoryStub = new BeanFactoryStub();
given(context.getBeanFactory()).willReturn(factoryStub);
given(context.getEnvironment()).willReturn(mock(ConfigurableEnvironment.class));
ConfigurableApplicationContext context = new GenericApplicationContext();
customizer.customizeContext(context, null);
context.refresh();

assertThat(factoryStub.getBroker().getBrokersAsString())
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
assertThat(context.getBean(EmbeddedKafkaBroker.class).getBrokersAsString())
.matches("127.0.0.1:[0-9]+,127.0.0.1:[0-9]+");
}


Expand All @@ -122,31 +114,5 @@ private class TestWithEmbeddedKafkaPorts {
private class TestWithEmbeddedKafkaMulti {

}
@SuppressWarnings("serial")
private class BeanFactoryStub extends DefaultListableBeanFactory {

private Object bean;

public EmbeddedKafkaBroker getBroker() {
return (EmbeddedKafkaBroker) bean;
}

@Override
public Object initializeBean(Object existingBean, String beanName) throws BeansException {
this.bean = existingBean;
return bean;
}

@Override
public void registerSingleton(String beanName, Object singletonObject) {

}

@Override
public void registerDisposableBean(String beanName, DisposableBean bean) {

}

}

}

0 comments on commit 3a9e611

Please sign in to comment.