diff --git a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderIntegrationTests.java b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderIntegrationTests.java index d02b1d201..bb13e9ef6 100644 --- a/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderIntegrationTests.java +++ b/binders/pulsar-binder/spring-cloud-stream-binder-pulsar/src/test/java/org/springframework/cloud/stream/binder/pulsar/PulsarBinderIntegrationTests.java @@ -26,6 +26,7 @@ import java.util.function.Supplier; import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.JSONSchema; import org.apache.pulsar.common.schema.KeyValue; @@ -637,8 +638,8 @@ public Consumer textLogger() { static class BinderAndBindingPropsTestConfig { @Bean - TrackingProducerFactoryBeanPostProcessor trackingProducerFactory() { - return new TrackingProducerFactoryBeanPostProcessor(); + TrackingProducerFactoryBeanPostProcessor trackingProducerFactory(PulsarClient pulsarClient) { + return new TrackingProducerFactoryBeanPostProcessor(pulsarClient); } @Bean @@ -650,10 +651,16 @@ TrackingConsumerFactoryBeanPostProcessor trackingConsumerFactory() { static class TrackingProducerFactoryBeanPostProcessor implements BeanPostProcessor { + private final PulsarClient pulsarClient; + + TrackingProducerFactoryBeanPostProcessor(PulsarClient pulsarClient) { + this.pulsarClient = pulsarClient; + } + @Override public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { if (bean instanceof DefaultPulsarProducerFactory defaultFactory) { - return new TrackingProducerFactory(defaultFactory); + return new TrackingProducerFactory(defaultFactory, this.pulsarClient); } return bean; } @@ -664,10 +671,18 @@ static class TrackingProducerFactory implements PulsarProducerFactory { private final DefaultPulsarProducerFactory trackedProducerFactory; + private final PulsarClient pulsarClient; + List> producersCreated = new ArrayList<>(); - TrackingProducerFactory(DefaultPulsarProducerFactory trackedProducerFactory) { + TrackingProducerFactory(DefaultPulsarProducerFactory trackedProducerFactory, PulsarClient pulsarClient) { this.trackedProducerFactory = trackedProducerFactory; + this.pulsarClient = pulsarClient; + } + + // This is required in PulsarProducerFactory in Spring Pulsar 1.1.x (i.e. Spring Boot 3.3.x) + public PulsarClient getPulsarClient() { + return this.pulsarClient; } @Override