diff --git a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java index 5bc5f44c8..4a4861174 100644 --- a/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java +++ b/spring-rabbit/src/main/java/org/springframework/amqp/rabbit/AsyncRabbitTemplate.java @@ -49,6 +49,7 @@ import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder; import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener; +import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.amqp.support.converter.SmartMessageConverter; import org.springframework.amqp.utils.JavaUtils; @@ -89,6 +90,7 @@ * @author Artem Bilan * @author FengYang Su * @author Ngoc Nhan + * @author Ben Efrati * * @since 1.6 */ @@ -604,12 +606,17 @@ public void onMessage(Message message, Channel channel) { if (future instanceof RabbitConverterFuture) { MessageConverter messageConverter = this.template.getMessageConverter(); RabbitConverterFuture rabbitFuture = (RabbitConverterFuture) future; - Object converted = rabbitFuture.getReturnType() != null + try { + Object converted = rabbitFuture.getReturnType() != null && messageConverter instanceof SmartMessageConverter smart ? smart.fromMessage(message, rabbitFuture.getReturnType()) : messageConverter.fromMessage(message); - rabbitFuture.complete(converted); + rabbitFuture.complete(converted); + } + catch (MessageConversionException e) { + rabbitFuture.completeExceptionally(e); + } } else { ((RabbitMessageFuture) future).complete(message); diff --git a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java index b0cb6d92c..770e6178d 100644 --- a/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java +++ b/spring-rabbit/src/test/java/org/springframework/amqp/rabbit/AsyncRabbitTemplateTests.java @@ -57,6 +57,7 @@ import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener; +import org.springframework.amqp.support.converter.MessageConversionException; import org.springframework.amqp.support.converter.SimpleMessageConverter; import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor; import org.springframework.amqp.support.postprocessor.GZipPostProcessor; @@ -72,6 +73,7 @@ /** * @author Gary Russell * @author Artem Bilan + * @author Ben Efrati * * @since 1.6 */ @@ -394,6 +396,29 @@ public void testStopCancelled() throws Exception { assertThat(callback.result).isNull(); } + @Test + @DirtiesContext + public void testConversionException() throws InterruptedException { + this.asyncTemplate.getRabbitTemplate().setMessageConverter(new SimpleMessageConverter() { + @Override + public Object fromMessage(Message message) throws MessageConversionException { + throw new MessageConversionException("Failed to convert message"); + } + }); + + RabbitConverterFuture replyFuture = this.asyncTemplate.convertSendAndReceive("conversionException"); + + final CountDownLatch cdl = new CountDownLatch(1); + final AtomicReference resultRef = new AtomicReference<>(); + replyFuture.whenComplete((result, ex) -> { + resultRef.set(result); + cdl.countDown(); + }); + assertThat(cdl.await(10, TimeUnit.SECONDS)).isTrue(); + assertThat(replyFuture).isCompletedExceptionally(); + assertThat(resultRef.get()).isNull(); + } + @Test void ctorCoverage() { AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");