Skip to content

Commit

Permalink
GH-2478: Handle conversion exception in AsyncRabbitTemplate
Browse files Browse the repository at this point in the history
Fixes: #2478

Previously, conversion errors in `AsyncRabbitTemplate` have led to `AmqpReplyTimeoutException`

* Fix `AsyncRabbitTemplate` to catch conversion errors on receiving reply and call `rabbitFuture.completeExceptionally()`, respectively.
* Use AssertJ `CompletableFuture` assertions for exception verification
  • Loading branch information
BenEfrati authored Dec 19, 2024
1 parent 7ea51de commit 410b584
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.springframework.amqp.rabbit.listener.DirectReplyToMessageListenerContainer.ChannelHolder;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
import org.springframework.amqp.utils.JavaUtils;
Expand Down Expand Up @@ -89,6 +90,7 @@
* @author Artem Bilan
* @author FengYang Su
* @author Ngoc Nhan
* @author Ben Efrati
*
* @since 1.6
*/
Expand Down Expand Up @@ -604,12 +606,17 @@ public void onMessage(Message message, Channel channel) {
if (future instanceof RabbitConverterFuture) {
MessageConverter messageConverter = this.template.getMessageConverter();
RabbitConverterFuture<Object> rabbitFuture = (RabbitConverterFuture<Object>) future;
Object converted = rabbitFuture.getReturnType() != null
try {
Object converted = rabbitFuture.getReturnType() != null
&& messageConverter instanceof SmartMessageConverter smart
? smart.fromMessage(message,
rabbitFuture.getReturnType())
: messageConverter.fromMessage(message);
rabbitFuture.complete(converted);
rabbitFuture.complete(converted);
}
catch (MessageConversionException e) {
rabbitFuture.completeExceptionally(e);
}
}
else {
((RabbitMessageFuture) future).complete(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.fail;
import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.mock;

Expand Down Expand Up @@ -57,6 +56,7 @@
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.rabbit.listener.adapter.ReplyingMessageListener;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.postprocessor.GUnzipPostProcessor;
import org.springframework.amqp.support.postprocessor.GZipPostProcessor;
Expand All @@ -72,6 +72,7 @@
/**
* @author Gary Russell
* @author Artem Bilan
* @author Ben Efrati
*
* @since 1.6
*/
Expand Down Expand Up @@ -100,7 +101,7 @@ static void setup() {
}

@Test
public void testConvert1Arg() throws Exception {
public void testConvert1Arg() {
final AtomicBoolean mppCalled = new AtomicBoolean();
CompletableFuture<String> future = this.asyncTemplate.convertSendAndReceive("foo", m -> {
mppCalled.set(true);
Expand All @@ -111,7 +112,7 @@ public void testConvert1Arg() throws Exception {
}

@Test
public void testConvert1ArgDirect() throws Exception {
public void testConvert1ArgDirect() {
this.latch.set(new CountDownLatch(1));
CompletableFuture<String> future1 = this.asyncDirectTemplate.convertSendAndReceive("foo");
CompletableFuture<String> future2 = this.asyncDirectTemplate.convertSendAndReceive("bar");
Expand Down Expand Up @@ -139,19 +140,19 @@ public void testConvert1ArgDirect() throws Exception {
}

@Test
public void testConvert2Args() throws Exception {
public void testConvert2Args() {
CompletableFuture<String> future = this.asyncTemplate.convertSendAndReceive(this.requests.getName(), "foo");
checkConverterResult(future, "FOO");
}

@Test
public void testConvert3Args() throws Exception {
public void testConvert3Args() {
CompletableFuture<String> future = this.asyncTemplate.convertSendAndReceive("", this.requests.getName(), "foo");
checkConverterResult(future, "FOO");
}

@Test
public void testConvert4Args() throws Exception {
public void testConvert4Args() {
CompletableFuture<String> future = this.asyncTemplate.convertSendAndReceive("", this.requests.getName(), "foo",
message -> {
String body = new String(message.getBody());
Expand Down Expand Up @@ -187,7 +188,7 @@ public void testMessage1ArgDirect() throws Exception {
assertThat(TestUtils
.getPropertyValue(this.asyncDirectTemplate, "directReplyToContainer.consumerCount",
AtomicInteger.class).get())
.isEqualTo(0);
.isZero();
}

private void waitForZeroInUseConsumers() {
Expand All @@ -214,7 +215,7 @@ public void testMessage3Args() throws Exception {
public void testCancel() {
CompletableFuture<String> future = this.asyncTemplate.convertSendAndReceive("foo");
future.cancel(false);
assertThat(TestUtils.getPropertyValue(asyncTemplate, "pending", Map.class)).hasSize(0);
assertThat(TestUtils.getPropertyValue(asyncTemplate, "pending", Map.class)).isEmpty();
}

@Test
Expand All @@ -234,44 +235,46 @@ private Message getFooMessage() {

@Test
@DirtiesContext
public void testReturn() throws Exception {
public void testReturn() {
this.asyncTemplate.setMandatory(true);
CompletableFuture<String> future = this.asyncTemplate.convertSendAndReceive(this.requests.getName() + "x",
"foo");
try {
future.get(10, TimeUnit.SECONDS);
fail("Expected exception");
}
catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(AmqpMessageReturnedException.class);
assertThat(((AmqpMessageReturnedException) e.getCause()).getRoutingKey()).isEqualTo(this.requests.getName() + "x");
}
assertThat(future)
.as("Expected exception")
.failsWithin(Duration.ofSeconds(10))
.withThrowableOfType(ExecutionException.class)
.havingCause()
.isInstanceOf(AmqpMessageReturnedException.class)
.extracting(cause -> ((AmqpMessageReturnedException) cause).getRoutingKey())
.isEqualTo(this.requests.getName() + "x");
}

@Test
@DirtiesContext
public void testReturnDirect() throws Exception {
public void testReturnDirect() {
this.asyncDirectTemplate.setMandatory(true);
CompletableFuture<String> future = this.asyncDirectTemplate.convertSendAndReceive(this.requests.getName() + "x",
"foo");
try {
future.get(10, TimeUnit.SECONDS);
fail("Expected exception");
}
catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(AmqpMessageReturnedException.class);
assertThat(((AmqpMessageReturnedException) e.getCause()).getRoutingKey()).isEqualTo(this.requests.getName() + "x");
}

assertThat(future)
.as("Expected exception")
.failsWithin(Duration.ofSeconds(10))
.withThrowableOfType(ExecutionException.class)
.havingCause()
.isInstanceOf(AmqpMessageReturnedException.class)
.extracting(cause -> ((AmqpMessageReturnedException) cause).getRoutingKey())
.isEqualTo(this.requests.getName() + "x");
}

@Test
@DirtiesContext
public void testConvertWithConfirm() throws Exception {
public void testConvertWithConfirm() {
this.asyncTemplate.setEnableConfirms(true);
RabbitConverterFuture<String> future = this.asyncTemplate.convertSendAndReceive("sleep");
CompletableFuture<Boolean> confirm = future.getConfirm();
assertThat(confirm).isNotNull();
assertThat(confirm.get(10, TimeUnit.SECONDS)).isTrue();
assertThat(confirm).isNotNull()
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(true);
checkConverterResult(future, "SLEEP");
}

Expand All @@ -282,19 +285,21 @@ public void testMessageWithConfirm() throws Exception {
RabbitMessageFuture future = this.asyncTemplate
.sendAndReceive(new SimpleMessageConverter().toMessage("sleep", new MessageProperties()));
CompletableFuture<Boolean> confirm = future.getConfirm();
assertThat(confirm).isNotNull();
assertThat(confirm.get(10, TimeUnit.SECONDS)).isTrue();
assertThat(confirm).isNotNull()
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(true);
checkMessageResult(future, "SLEEP");
}

@Test
@DirtiesContext
public void testConvertWithConfirmDirect() throws Exception {
public void testConvertWithConfirmDirect() {
this.asyncDirectTemplate.setEnableConfirms(true);
RabbitConverterFuture<String> future = this.asyncDirectTemplate.convertSendAndReceive("sleep");
CompletableFuture<Boolean> confirm = future.getConfirm();
assertThat(confirm).isNotNull();
assertThat(confirm.get(10, TimeUnit.SECONDS)).isTrue();
assertThat(confirm).isNotNull()
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(true);
checkConverterResult(future, "SLEEP");
}

Expand All @@ -305,8 +310,9 @@ public void testMessageWithConfirmDirect() throws Exception {
RabbitMessageFuture future = this.asyncDirectTemplate
.sendAndReceive(new SimpleMessageConverter().toMessage("sleep", new MessageProperties()));
CompletableFuture<Boolean> confirm = future.getConfirm();
assertThat(confirm).isNotNull();
assertThat(confirm.get(10, TimeUnit.SECONDS)).isTrue();
assertThat(confirm).isNotNull()
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(true);
checkMessageResult(future, "SLEEP");
}

Expand All @@ -319,14 +325,12 @@ public void testReceiveTimeout() throws Exception {
TheCallback callback = new TheCallback();
future.whenComplete(callback);
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).hasSize(1);
try {
future.get(10, TimeUnit.SECONDS);
fail("Expected ExecutionException");
}
catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(AmqpReplyTimeoutException.class);
}
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).hasSize(0);
assertThat(future)
.as("Expected ExecutionException")
.failsWithin(Duration.ofSeconds(10))
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(AmqpReplyTimeoutException.class);
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).isEmpty();
assertThat(callback.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(callback.ex).isInstanceOf(AmqpReplyTimeoutException.class);
}
Expand All @@ -340,14 +344,13 @@ public void testReplyAfterReceiveTimeout() throws Exception {
TheCallback callback = new TheCallback();
future.whenComplete(callback);
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).hasSize(1);
try {
future.get(10, TimeUnit.SECONDS);
fail("Expected ExecutionException");
}
catch (ExecutionException e) {
assertThat(e.getCause()).isInstanceOf(AmqpReplyTimeoutException.class);
}
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).hasSize(0);

assertThat(future)
.as("Expected ExecutionException")
.failsWithin(Duration.ofSeconds(10))
.withThrowableOfType(ExecutionException.class)
.withCauseInstanceOf(AmqpReplyTimeoutException.class);
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).isEmpty();
assertThat(callback.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(callback.ex).isInstanceOf(AmqpReplyTimeoutException.class);

Expand All @@ -373,16 +376,17 @@ public void testStopCancelled() throws Exception {
this.asyncTemplate.stop();
// Second stop() to be sure that it is idempotent
this.asyncTemplate.stop();
try {
future.get(10, TimeUnit.SECONDS);
fail("Expected CancellationException");
}
catch (CancellationException e) {
assertThat(future.getNackCause()).isEqualTo("AsyncRabbitTemplate was stopped while waiting for reply");
}
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).hasSize(0);
assertThat(future)
.as("Expected CancellationException")
.failsWithin(Duration.ofSeconds(10))
.withThrowableOfType(CancellationException.class)
.satisfies(e -> {
assertThat(future.getNackCause()).isEqualTo("AsyncRabbitTemplate was stopped while waiting for reply");
assertThat(future).isCancelled();
});

assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "pending", Map.class)).isEmpty();
assertThat(callback.latch.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(future.isCancelled()).isTrue();
assertThat(TestUtils.getPropertyValue(this.asyncTemplate, "taskScheduler")).isNull();

/*
Expand All @@ -394,6 +398,23 @@ public void testStopCancelled() throws Exception {
assertThat(callback.result).isNull();
}

@Test
@DirtiesContext
public void testConversionException() {
this.asyncTemplate.getRabbitTemplate().setMessageConverter(new SimpleMessageConverter() {
@Override
public Object fromMessage(Message message) throws MessageConversionException {
throw new MessageConversionException("Failed to convert message");
}
});

RabbitConverterFuture<String> replyFuture = this.asyncTemplate.convertSendAndReceive("conversionException");

assertThat(replyFuture).failsWithin(Duration.ofSeconds(10))
.withThrowableThat()
.withCauseInstanceOf(MessageConversionException.class);
}

@Test
void ctorCoverage() {
AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");
Expand Down Expand Up @@ -461,15 +482,10 @@ public void limitedChannelsAreReleasedOnTimeout() {
connectionFactory.destroy();
}

private void checkConverterResult(CompletableFuture<String> future, String expected) throws InterruptedException {
final CountDownLatch cdl = new CountDownLatch(1);
final AtomicReference<String> resultRef = new AtomicReference<>();
future.whenComplete((result, ex) -> {
resultRef.set(result);
cdl.countDown();
});
assertThat(cdl.await(10, TimeUnit.SECONDS)).isTrue();
assertThat(resultRef.get()).isEqualTo(expected);
private void checkConverterResult(CompletableFuture<String> future, String expected) {
assertThat(future)
.succeedsWithin(Duration.ofSeconds(10))
.isEqualTo(expected);
}

private Message checkMessageResult(CompletableFuture<Message> future, String expected) throws InterruptedException {
Expand Down

0 comments on commit 410b584

Please sign in to comment.