Skip to content

Commit

Permalink
GH-2593: Reliably shutdown SMLC
Browse files Browse the repository at this point in the history
Fixes: #2593

* Add `activeObjectCounter` release into the `BlockingQueueConsumer.handleCancelOk()`
in reply to the `basicCancel()` call
* Adjust `BlockingQueueConsumer.basicCancel()` to call `RabbitUtils.closeMessageConsumer()`
to setisfy transactional context
* Adjust `SimpleMessageListenerContainerIntegrationTests` to eventually setisfy to the transaction rollback
when container is shuted down
* Add new tests into the `ContainerShutDownTests` to verify the listener containers are not blocked
waiting on the `cancelationLock`

**Cherry-pick to `3.0.x`**
  • Loading branch information
artembilan committed Jan 9, 2024
1 parent 6cc91b5 commit 70ba65f
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1177,7 +1177,7 @@ protected boolean isForceStop() {
/**
* Set to true to stop the container after the current message(s) are processed and
* requeue any prefetched. Useful when using exclusive or single-active consumers.
* @param forceStop true to stop when current messsage(s) are processed.
* @param forceStop true to stop when current message(s) are processed.
* @since 2.4.14
*/
public void setForceStop(boolean forceStop) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -465,11 +465,10 @@ protected void basicCancel() {

protected void basicCancel(boolean expected) {
this.normalCancel = expected;
getConsumerTags().forEach(consumerTag -> {
if (this.channel.isOpen()) {
RabbitUtils.cancel(this.channel, consumerTag);
}
});
Collection<String> consumerTags = getConsumerTags();
if (!CollectionUtils.isEmpty(consumerTags)) {
RabbitUtils.closeMessageConsumer(this.channel, consumerTags, this.transactional);
}
this.cancelled.set(true);
this.abortStarted = System.currentTimeMillis();
}
Expand Down Expand Up @@ -1007,6 +1006,7 @@ public void handleCancelOk(String consumerTag) {
+ "); " + BlockingQueueConsumer.this);
}
this.canceled = true;
BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,11 +30,13 @@
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.junit.RabbitAvailable;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.util.StopWatch;

import com.rabbitmq.client.AMQP.BasicProperties;

/**
* @author Gary Russell
* @author Artem Bilan
* @since 2.0
*
*/
Expand All @@ -56,7 +58,6 @@ public void testUninterruptibleListenerDMLC() throws Exception {
public void testUninterruptibleListener(AbstractMessageListenerContainer container) throws Exception {
CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
container.setConnectionFactory(cf);
container.setShutdownTimeout(500);
container.setQueueNames("test.shutdown");
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch testEnded = new CountDownLatch(1);
Expand Down Expand Up @@ -91,11 +92,49 @@ public void testUninterruptibleListener(AbstractMessageListenerContainer contain
assertThat(channels).hasSize(2);
}
finally {
testEnded.countDown();
container.stop();
assertThat(channels).hasSize(1);
cf.destroy();
}
}

@Test
public void consumersCorrectlyCancelledOnShutdownSMLC() throws Exception {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
consumersCorrectlyCancelledOnShutdown(container);
}

@Test
public void consumersCorrectlyCancelledOnShutdownDMLC() throws Exception {
DirectMessageListenerContainer container = new DirectMessageListenerContainer();
consumersCorrectlyCancelledOnShutdown(container);
}

private void consumersCorrectlyCancelledOnShutdown(AbstractMessageListenerContainer container)
throws InterruptedException {

CachingConnectionFactory cf = new CachingConnectionFactory("localhost");
container.setConnectionFactory(cf);
container.setQueueNames("test.shutdown");
container.setMessageListener(m -> {
});
final CountDownLatch startLatch = new CountDownLatch(1);
container.setApplicationEventPublisher(e -> {
if (e instanceof AsyncConsumerStartedEvent) {
startLatch.countDown();
}
});
container.start();
try {
assertThat(startLatch.await(30, TimeUnit.SECONDS)).isTrue();
StopWatch stopWatch = new StopWatch();
stopWatch.start();
container.shutdown();
stopWatch.stop();
assertThat(stopWatch.getTotalTimeMillis()).isLessThan(3000);
}
finally {
cf.destroy();
testEnded.countDown();
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -18,6 +18,7 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException;
import static org.awaitility.Awaitility.await;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -314,7 +315,7 @@ private void doListenerWithExceptionTest(CountDownLatch latch, MessageListener l
container.shutdown();
}
if (acknowledgeMode.isTransactionAllowed()) {
assertThat(template.receiveAndConvert(queue.getName())).isNotNull();
await().untilAsserted(() -> assertThat(template.receiveAndConvert(queue.getName())).isNotNull());
}
else {
assertThat(template.receiveAndConvert(queue.getName())).isNull();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,6 +50,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -517,7 +518,7 @@ public void testWithConnectionPerListenerThread() throws Exception {
waitForConsumersToStop(consumers);
Set<?> allocatedConnections = TestUtils.getPropertyValue(ccf, "allocatedConnections", Set.class);
assertThat(allocatedConnections).hasSize(2);
assertThat(ccf.getCacheProperties().get("openConnections")).isEqualTo("1");
assertThat(ccf.getCacheProperties().get("openConnections")).isEqualTo("2");
}

@Test
Expand Down Expand Up @@ -807,15 +808,15 @@ private Answer<Object> messageToConsumer(final Channel mockChannel, final Simple

}

private void waitForConsumersToStop(Set<?> consumers) throws Exception {
private void waitForConsumersToStop(Set<?> consumers) {
with().pollInterval(Duration.ofMillis(10)).atMost(Duration.ofSeconds(10))
.until(() -> consumers.stream()
.map(consumer -> TestUtils.getPropertyValue(consumer, "consumer"))
.allMatch(c -> c == null));
.allMatch(Objects::isNull));
}

@SuppressWarnings("serial")
private class TestTransactionManager extends AbstractPlatformTransactionManager {
private static class TestTransactionManager extends AbstractPlatformTransactionManager {

@Override
protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException {
Expand Down

0 comments on commit 70ba65f

Please sign in to comment.