Skip to content

Commit

Permalink
Fix containerFactory SpEL Resolution
Browse files Browse the repository at this point in the history
  • Loading branch information
BenEfrati committed Dec 22, 2024
1 parent d139c98 commit e8077b3
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ protected String resolveMultiRabbitAdminName(RabbitListener rabbitListener) {
return rlcf.getBeanName() + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX;
}

return containerFactory + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX;
return resolved + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX;
}

return RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void multipleSimpleMessageListeners() {

Map<String, RabbitListenerContainerTestFactory> factories = context
.getBeansOfType(RabbitListenerContainerTestFactory.class, false, false);
Assertions.assertThat(factories).hasSize(3);
Assertions.assertThat(factories).hasSize(4);

factories.values().forEach(factory -> {
Assertions.assertThat(factory.getListenerContainers().size())
Expand Down Expand Up @@ -99,34 +99,38 @@ void testDeclarablesMatchProperRabbitAdmin() {

Map<String, RabbitListenerContainerTestFactory> factories = context
.getBeansOfType(RabbitListenerContainerTestFactory.class, false, false);
Assertions.assertThat(factories).hasSize(3);
Assertions.assertThat(factories).hasSize(4);

BiFunction<RabbitAdmin, Declarable, Boolean> declares = (admin, dec) -> dec.getDeclaringAdmins().size() == 1
&& dec.getDeclaringAdmins().contains(admin.getBeanName());

Map<String, AbstractExchange> exchanges = context.getBeansOfType(AbstractExchange.class, false, false)
.values().stream().collect(Collectors.toMap(AbstractExchange::getName, v -> v));
Assertions.assertThat(exchanges).hasSize(3);
Assertions.assertThat(exchanges).hasSize(4);
Assertions.assertThat(declares.apply(MultiConfig.DEFAULT_RABBIT_ADMIN, exchanges.get("testExchange"))).isTrue();
Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_B, exchanges.get("testExchangeB")))
.isTrue();
Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_C, exchanges.get("testExchangeC")))
.isTrue();
Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_D, exchanges.get("testExchangeD")))
.isTrue();

Map<String, org.springframework.amqp.core.Queue> queues = context
.getBeansOfType(org.springframework.amqp.core.Queue.class, false, false)
.values().stream().collect(Collectors.toMap(org.springframework.amqp.core.Queue::getName, v -> v));
Assertions.assertThat(queues).hasSize(3);
Assertions.assertThat(queues).hasSize(4);
Assertions.assertThat(declares.apply(MultiConfig.DEFAULT_RABBIT_ADMIN, queues.get("testQueue"))).isTrue();
Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_B, queues.get("testQueueB"))).isTrue();
Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_C, queues.get("testQueueC"))).isTrue();
Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_D, queues.get("testQueueD"))).isTrue();

Map<String, Binding> bindings = context.getBeansOfType(Binding.class, false, false)
.values().stream().collect(Collectors.toMap(Binding::getRoutingKey, v -> v));
Assertions.assertThat(bindings).hasSize(3);
Assertions.assertThat(bindings).hasSize(4);
Assertions.assertThat(declares.apply(MultiConfig.DEFAULT_RABBIT_ADMIN, bindings.get("testKey"))).isTrue();
Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_B, bindings.get("testKeyB"))).isTrue();
Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_C, bindings.get("testKeyC"))).isTrue();
Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_D, bindings.get("testKeyD"))).isTrue();

context.close(); // Close and stop the listeners
}
Expand Down Expand Up @@ -180,9 +184,19 @@ void testCreationOfConnections() {
Mockito.verify(MultiConfig.CONNECTION_FACTORY_BROKER_C).createConnection();
Mockito.verify(MultiConfig.CONNECTION_BROKER_C).createChannel(false);

Mockito.verify(MultiConfig.CONNECTION_FACTORY_BROKER_D, Mockito.never()).createConnection();
Mockito.verify(MultiConfig.CONNECTION_BROKER_D, Mockito.never()).createChannel(false);
SimpleResourceHolder.bind(MultiConfig.ROUTING_CONNECTION_FACTORY, "brokerD");
rabbitTemplate.convertAndSend("messageToBrokerD");
SimpleResourceHolder.unbind(MultiConfig.ROUTING_CONNECTION_FACTORY);
Mockito.verify(MultiConfig.CONNECTION_FACTORY_BROKER_D).createConnection();
Mockito.verify(MultiConfig.CONNECTION_BROKER_D).createChannel(false);

context.close(); // Close and stop the listeners
}



@Test
@DisplayName("Test assignment of RabbitAdmin in the endpoint registry")
void testAssignmentOfRabbitAdminInTheEndpointRegistry() {
Expand All @@ -192,7 +206,7 @@ void testAssignmentOfRabbitAdminInTheEndpointRegistry() {
final RabbitListenerEndpointRegistry registry = context.getBean(RabbitListenerEndpointRegistry.class);
final Collection<MessageListenerContainer> listenerContainers = registry.getListenerContainers();

Assertions.assertThat(listenerContainers).hasSize(3);
Assertions.assertThat(listenerContainers).hasSize(4);
listenerContainers.forEach(container -> {
Assertions.assertThat(container).isInstanceOf(MessageListenerTestContainer.class);
final MessageListenerTestContainer refContainer = (MessageListenerTestContainer) container;
Expand Down Expand Up @@ -228,6 +242,13 @@ public void handleItB(String body) {
key = "testKeyC"))
public void handleItC(String body) {
}

@RabbitListener(containerFactory = "${broker-name:brokerD}", bindings = @QueueBinding(
exchange = @Exchange("testExchangeD"),
value = @Queue("testQueueD"),
key = "testKeyD"))
public void handleItD(String body) {
}
}

@Component
Expand All @@ -244,6 +265,10 @@ public void handleItB(String body) {
@RabbitListener(queues = "testQueueC", containerFactory = "brokerC")
public void handleItC(String body) {
}

@RabbitListener(queues = "testQueueD", containerFactory = "${broker-name:brokerD}")
public void handleItD(String body) {
}
}

@Configuration
Expand All @@ -254,34 +279,41 @@ static class MultiConfig {
static final ConnectionFactory DEFAULT_CONNECTION_FACTORY = Mockito.mock(ConnectionFactory.class);
static final ConnectionFactory CONNECTION_FACTORY_BROKER_B = Mockito.mock(ConnectionFactory.class);
static final ConnectionFactory CONNECTION_FACTORY_BROKER_C = Mockito.mock(ConnectionFactory.class);
static final ConnectionFactory CONNECTION_FACTORY_BROKER_D = Mockito.mock(ConnectionFactory.class);

static final Connection DEFAULT_CONNECTION = Mockito.mock(Connection.class);
static final Connection CONNECTION_BROKER_B = Mockito.mock(Connection.class);
static final Connection CONNECTION_BROKER_C = Mockito.mock(Connection.class);
static final Connection CONNECTION_BROKER_D = Mockito.mock(Connection.class);

static final Channel DEFAULT_CHANNEL = Mockito.mock(Channel.class);
static final Channel CHANNEL_BROKER_B = Mockito.mock(Channel.class);
static final Channel CHANNEL_BROKER_C = Mockito.mock(Channel.class);
static final Channel CHANNEL_BROKER_D = Mockito.mock(Channel.class);

static {
final Map<Object, ConnectionFactory> targetConnectionFactories = new HashMap<>();
targetConnectionFactories.put("brokerB", CONNECTION_FACTORY_BROKER_B);
targetConnectionFactories.put("brokerC", CONNECTION_FACTORY_BROKER_C);
targetConnectionFactories.put("brokerD", CONNECTION_FACTORY_BROKER_D);
ROUTING_CONNECTION_FACTORY.setDefaultTargetConnectionFactory(DEFAULT_CONNECTION_FACTORY);
ROUTING_CONNECTION_FACTORY.setTargetConnectionFactories(targetConnectionFactories);

Mockito.when(DEFAULT_CONNECTION_FACTORY.createConnection()).thenReturn(DEFAULT_CONNECTION);
Mockito.when(CONNECTION_FACTORY_BROKER_B.createConnection()).thenReturn(CONNECTION_BROKER_B);
Mockito.when(CONNECTION_FACTORY_BROKER_C.createConnection()).thenReturn(CONNECTION_BROKER_C);
Mockito.when(CONNECTION_FACTORY_BROKER_D.createConnection()).thenReturn(CONNECTION_BROKER_D);

Mockito.when(DEFAULT_CONNECTION.createChannel(false)).thenReturn(DEFAULT_CHANNEL);
Mockito.when(CONNECTION_BROKER_B.createChannel(false)).thenReturn(CHANNEL_BROKER_B);
Mockito.when(CONNECTION_BROKER_C.createChannel(false)).thenReturn(CHANNEL_BROKER_C);
Mockito.when(CONNECTION_BROKER_D.createChannel(false)).thenReturn(CHANNEL_BROKER_D);
}

static final RabbitAdmin DEFAULT_RABBIT_ADMIN = new RabbitAdmin(DEFAULT_CONNECTION_FACTORY);
static final RabbitAdmin RABBIT_ADMIN_BROKER_B = new RabbitAdmin(CONNECTION_FACTORY_BROKER_B);
static final RabbitAdmin RABBIT_ADMIN_BROKER_C = new RabbitAdmin(CONNECTION_FACTORY_BROKER_C);
static final RabbitAdmin RABBIT_ADMIN_BROKER_D = new RabbitAdmin(CONNECTION_FACTORY_BROKER_D);

@Bean
public RabbitListenerAnnotationBeanPostProcessor postProcessor() {
Expand All @@ -307,6 +339,11 @@ public RabbitAdmin rabbitAdminBrokerC() {
return RABBIT_ADMIN_BROKER_C;
}

@Bean("brokerD-admin")
public RabbitAdmin rabbitAdminBrokerD() {
return RABBIT_ADMIN_BROKER_D;
}

@Bean("defaultContainerFactory")
public RabbitListenerContainerTestFactory defaultContainerFactory() {
return new RabbitListenerContainerTestFactory();
Expand All @@ -322,6 +359,11 @@ public RabbitListenerContainerTestFactory containerFactoryBrokerC() {
return new RabbitListenerContainerTestFactory();
}

@Bean("brokerD")
public RabbitListenerContainerTestFactory containerFactoryBrokerD() {
return new RabbitListenerContainerTestFactory();
}

@Bean
public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() {
return new RabbitListenerEndpointRegistry();
Expand Down

0 comments on commit e8077b3

Please sign in to comment.