diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java index 5bc5f44c8..994620c02 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java @@ -604,12 +604,16 @@ public void onMessage(Message message, Channel channel) { if (future instanceof RabbitConverterFuture) { MessageConverter messageConverter = this.template.getMessageConverter(); RabbitConverterFuture rabbitFuture = (RabbitConverterFuture) future; - Object converted = rabbitFuture.getReturnType() != null + try { + Object converted = rabbitFuture.getReturnType() != null && messageConverter instanceof SmartMessageConverter smart ? smart.fromMessage(message, rabbitFuture.getReturnType()) : messageConverter.fromMessage(message); - rabbitFuture.complete(converted); + rabbitFuture.complete(converted); + } catch (MessageConversionException e) { + rabbitFuture.completeExceptionally(e); + } } else { ((RabbitMessageFuture) future).complete(message); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java index b0cb6d92c..928c29235 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java @@ -57,6 +57,7 @@ import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener; +import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor; import org.springframework.amqp.support.postprocessor.GZipPostProcessor; @@ -394,6 +395,33 @@ public void testStopCancelled() throws Exception { assertThat(callback.result).isNull(); } + @Test + public void testConversionException() throws InterruptedException { + CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost"); + connectionFactory.setChannelCacheSize(1); + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + rabbitTemplate.setMessageConverter(new SimpleMessageConverter(){ + @Override + public Object fromMessage(Message message) throws MessageConversionException { + throw new MessageConversionException("Failed to convert message"); + } + }); + AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate); + asyncRabbitTemplate.start(); + + RabbitConverterFuture replyFuture = asyncRabbitTemplate.convertSendAndReceive("conversionException"); + + CountDownLatch cdl = new CountDownLatch(1); + replyFuture.whenComplete((result, ex) -> { + cdl.countDown(); + }); + assertThat(cdl.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(replyFuture).isCompletedExceptionally(); + + asyncRabbitTemplate.stop(); + connectionFactory.destroy(); + } + @Test void ctorCoverage() { AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");