diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/MultiRabbitListenerAnnotationBeanPostProcessor.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/MultiRabbitListenerAnnotationBeanPostProcessor.java index 0764691e1..521adc21e 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/MultiRabbitListenerAnnotationBeanPostProcessor.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/annotation/MultiRabbitListenerAnnotationBeanPostProcessor.java @@ -95,7 +95,7 @@ protected String resolveMultiRabbitAdminName(RabbitListener rabbitListener) { return rlcf.getBeanName() + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX; } - return containerFactory + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX; + return resolved + RabbitListenerConfigUtils.MULTI_RABBIT_ADMIN_SUFFIX; } return RabbitListenerConfigUtils.RABBIT_ADMIN_BEAN_NAME; diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/MockMultiRabbitTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/MockMultiRabbitTests.java index cda0362fe..22956228d 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/MockMultiRabbitTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/annotation/MockMultiRabbitTests.java @@ -68,7 +68,7 @@ void multipleSimpleMessageListeners() { Map factories = context .getBeansOfType(RabbitListenerContainerTestFactory.class, false, false); - Assertions.assertThat(factories).hasSize(3); + Assertions.assertThat(factories).hasSize(4); factories.values().forEach(factory -> { Assertions.assertThat(factory.getListenerContainers().size()) @@ -99,34 +99,38 @@ void testDeclarablesMatchProperRabbitAdmin() { Map factories = context .getBeansOfType(RabbitListenerContainerTestFactory.class, false, false); - Assertions.assertThat(factories).hasSize(3); + Assertions.assertThat(factories).hasSize(4); BiFunction declares = (admin, dec) -> dec.getDeclaringAdmins().size() == 1 && dec.getDeclaringAdmins().contains(admin.getBeanName()); Map exchanges = context.getBeansOfType(AbstractExchange.class, false, false) .values().stream().collect(Collectors.toMap(AbstractExchange::getName, v -> v)); - Assertions.assertThat(exchanges).hasSize(3); + Assertions.assertThat(exchanges).hasSize(4); Assertions.assertThat(declares.apply(MultiConfig.DEFAULT_RABBIT_ADMIN, exchanges.get("testExchange"))).isTrue(); Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_B, exchanges.get("testExchangeB"))) .isTrue(); Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_C, exchanges.get("testExchangeC"))) .isTrue(); + Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_D, exchanges.get("testExchangeD"))) + .isTrue(); Map queues = context .getBeansOfType(org.springframework.amqp.core.Queue.class, false, false) .values().stream().collect(Collectors.toMap(org.springframework.amqp.core.Queue::getName, v -> v)); - Assertions.assertThat(queues).hasSize(3); + Assertions.assertThat(queues).hasSize(4); Assertions.assertThat(declares.apply(MultiConfig.DEFAULT_RABBIT_ADMIN, queues.get("testQueue"))).isTrue(); Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_B, queues.get("testQueueB"))).isTrue(); Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_C, queues.get("testQueueC"))).isTrue(); + Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_D, queues.get("testQueueD"))).isTrue(); Map bindings = context.getBeansOfType(Binding.class, false, false) .values().stream().collect(Collectors.toMap(Binding::getRoutingKey, v -> v)); - Assertions.assertThat(bindings).hasSize(3); + Assertions.assertThat(bindings).hasSize(4); Assertions.assertThat(declares.apply(MultiConfig.DEFAULT_RABBIT_ADMIN, bindings.get("testKey"))).isTrue(); Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_B, bindings.get("testKeyB"))).isTrue(); Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_C, bindings.get("testKeyC"))).isTrue(); + Assertions.assertThat(declares.apply(MultiConfig.RABBIT_ADMIN_BROKER_D, bindings.get("testKeyD"))).isTrue(); context.close(); // Close and stop the listeners } @@ -180,9 +184,19 @@ void testCreationOfConnections() { Mockito.verify(MultiConfig.CONNECTION_FACTORY_BROKER_C).createConnection(); Mockito.verify(MultiConfig.CONNECTION_BROKER_C).createChannel(false); + Mockito.verify(MultiConfig.CONNECTION_FACTORY_BROKER_D, Mockito.never()).createConnection(); + Mockito.verify(MultiConfig.CONNECTION_BROKER_D, Mockito.never()).createChannel(false); + SimpleResourceHolder.bind(MultiConfig.ROUTING_CONNECTION_FACTORY, "brokerD"); + rabbitTemplate.convertAndSend("messageToBrokerD"); + SimpleResourceHolder.unbind(MultiConfig.ROUTING_CONNECTION_FACTORY); + Mockito.verify(MultiConfig.CONNECTION_FACTORY_BROKER_D).createConnection(); + Mockito.verify(MultiConfig.CONNECTION_BROKER_D).createChannel(false); + context.close(); // Close and stop the listeners } + + @Test @DisplayName("Test assignment of RabbitAdmin in the endpoint registry") void testAssignmentOfRabbitAdminInTheEndpointRegistry() { @@ -192,7 +206,7 @@ void testAssignmentOfRabbitAdminInTheEndpointRegistry() { final RabbitListenerEndpointRegistry registry = context.getBean(RabbitListenerEndpointRegistry.class); final Collection listenerContainers = registry.getListenerContainers(); - Assertions.assertThat(listenerContainers).hasSize(3); + Assertions.assertThat(listenerContainers).hasSize(4); listenerContainers.forEach(container -> { Assertions.assertThat(container).isInstanceOf(MessageListenerTestContainer.class); final MessageListenerTestContainer refContainer = (MessageListenerTestContainer) container; @@ -228,6 +242,13 @@ public void handleItB(String body) { key = "testKeyC")) public void handleItC(String body) { } + + @RabbitListener(containerFactory = "${broker-name:brokerD}", bindings = @QueueBinding( + exchange = @Exchange("testExchangeD"), + value = @Queue("testQueueD"), + key = "testKeyD")) + public void handleItD(String body) { + } } @Component @@ -244,6 +265,10 @@ public void handleItB(String body) { @RabbitListener(queues = "testQueueC", containerFactory = "brokerC") public void handleItC(String body) { } + + @RabbitListener(queues = "testQueueD", containerFactory = "${broker-name:brokerD}") + public void handleItD(String body) { + } } @Configuration @@ -254,34 +279,41 @@ static class MultiConfig { static final ConnectionFactory DEFAULT_CONNECTION_FACTORY = Mockito.mock(ConnectionFactory.class); static final ConnectionFactory CONNECTION_FACTORY_BROKER_B = Mockito.mock(ConnectionFactory.class); static final ConnectionFactory CONNECTION_FACTORY_BROKER_C = Mockito.mock(ConnectionFactory.class); + static final ConnectionFactory CONNECTION_FACTORY_BROKER_D = Mockito.mock(ConnectionFactory.class); static final Connection DEFAULT_CONNECTION = Mockito.mock(Connection.class); static final Connection CONNECTION_BROKER_B = Mockito.mock(Connection.class); static final Connection CONNECTION_BROKER_C = Mockito.mock(Connection.class); + static final Connection CONNECTION_BROKER_D = Mockito.mock(Connection.class); static final Channel DEFAULT_CHANNEL = Mockito.mock(Channel.class); static final Channel CHANNEL_BROKER_B = Mockito.mock(Channel.class); static final Channel CHANNEL_BROKER_C = Mockito.mock(Channel.class); + static final Channel CHANNEL_BROKER_D = Mockito.mock(Channel.class); static { final Map targetConnectionFactories = new HashMap<>(); targetConnectionFactories.put("brokerB", CONNECTION_FACTORY_BROKER_B); targetConnectionFactories.put("brokerC", CONNECTION_FACTORY_BROKER_C); + targetConnectionFactories.put("brokerD", CONNECTION_FACTORY_BROKER_D); ROUTING_CONNECTION_FACTORY.setDefaultTargetConnectionFactory(DEFAULT_CONNECTION_FACTORY); ROUTING_CONNECTION_FACTORY.setTargetConnectionFactories(targetConnectionFactories); Mockito.when(DEFAULT_CONNECTION_FACTORY.createConnection()).thenReturn(DEFAULT_CONNECTION); Mockito.when(CONNECTION_FACTORY_BROKER_B.createConnection()).thenReturn(CONNECTION_BROKER_B); Mockito.when(CONNECTION_FACTORY_BROKER_C.createConnection()).thenReturn(CONNECTION_BROKER_C); + Mockito.when(CONNECTION_FACTORY_BROKER_D.createConnection()).thenReturn(CONNECTION_BROKER_D); Mockito.when(DEFAULT_CONNECTION.createChannel(false)).thenReturn(DEFAULT_CHANNEL); Mockito.when(CONNECTION_BROKER_B.createChannel(false)).thenReturn(CHANNEL_BROKER_B); Mockito.when(CONNECTION_BROKER_C.createChannel(false)).thenReturn(CHANNEL_BROKER_C); + Mockito.when(CONNECTION_BROKER_D.createChannel(false)).thenReturn(CHANNEL_BROKER_D); } static final RabbitAdmin DEFAULT_RABBIT_ADMIN = new RabbitAdmin(DEFAULT_CONNECTION_FACTORY); static final RabbitAdmin RABBIT_ADMIN_BROKER_B = new RabbitAdmin(CONNECTION_FACTORY_BROKER_B); static final RabbitAdmin RABBIT_ADMIN_BROKER_C = new RabbitAdmin(CONNECTION_FACTORY_BROKER_C); + static final RabbitAdmin RABBIT_ADMIN_BROKER_D = new RabbitAdmin(CONNECTION_FACTORY_BROKER_D); @Bean public RabbitListenerAnnotationBeanPostProcessor postProcessor() { @@ -307,6 +339,11 @@ public RabbitAdmin rabbitAdminBrokerC() { return RABBIT_ADMIN_BROKER_C; } + @Bean("brokerD-admin") + public RabbitAdmin rabbitAdminBrokerD() { + return RABBIT_ADMIN_BROKER_D; + } + @Bean("defaultContainerFactory") public RabbitListenerContainerTestFactory defaultContainerFactory() { return new RabbitListenerContainerTestFactory(); @@ -322,6 +359,11 @@ public RabbitListenerContainerTestFactory containerFactoryBrokerC() { return new RabbitListenerContainerTestFactory(); } + @Bean("brokerD") + public RabbitListenerContainerTestFactory containerFactoryBrokerD() { + return new RabbitListenerContainerTestFactory(); + } + @Bean public RabbitListenerEndpointRegistry rabbitListenerEndpointRegistry() { return new RabbitListenerEndpointRegistry();