Skip to content

Commit

Permalink
SMLC: defer counter release until fetched messages are processed
Browse files Browse the repository at this point in the history
Currently, the `SimpleMessageListenerContainer` is stopped immediately
when `cancelOK` is received.

The expectation do not stop an application context until all the fetched
messages are processed.
Therefore, move `this.activeObjectCounter.release(this);` to the `BlockingQueueConsumer.nextMessage()`
if the internal queue is empty and `cancelled` has been requested.

* Adjust all the SMLC tests for a shorter `receiveTimeout` to not have blocking for nothing
* Fix `EnableRabbitIntegrationTests.exec1` bean for `setAcceptTasksAfterContextClose(true)`.
Looks like an `Executor` can be stopped by the application context before listener container
is able to schedule its shutdown
* Also add `System.setProperty("spring.amqp.deserialization.trust.all", "true");` to be
able to run tests from IDE

**Cherry-pick to `3.0.x`**
  • Loading branch information
artembilan committed Feb 5, 2024
1 parent e7e3e14 commit 37d9641
Show file tree
Hide file tree
Showing 35 changed files with 178 additions and 107 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -135,6 +135,7 @@ public String baz(String in) {
public SimpleMessageListenerContainer smlc1() throws IOException {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
container.setQueueNames("foo", "bar");
container.setReceiveTimeout(10);
container.setMessageListener(new MessageListenerAdapter(new Object() {

@SuppressWarnings("unused")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,7 @@ public Message nextMessage(long timeout) throws InterruptedException, ShutdownSi
}
Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
if (message == null && this.cancelled.get()) {
this.activeObjectCounter.release(this);
throw new ConsumerCancelledException();
}
return message;
Expand Down Expand Up @@ -1018,7 +1019,6 @@ public void handleCancelOk(String consumerTag) {
+ "); " + BlockingQueueConsumer.this);
}
this.canceled = true;
BlockingQueueConsumer.this.activeObjectCounter.release(BlockingQueueConsumer.this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2016-2023 the original author or authors.
* Copyright 2016-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -383,17 +383,17 @@ public void testStopCancelled() throws Exception {
@Test
void ctorCoverage() {
AsyncRabbitTemplate template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("exchange")
.isEqualTo("ex");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("routingKey")
.isEqualTo("rk");
template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk", "rq");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("exchange")
.isEqualTo("ex");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("routingKey")
.isEqualTo("rk");
assertThat(template)
Expand All @@ -403,10 +403,10 @@ void ctorCoverage() {
.extracting("queueNames")
.isEqualTo(new String[] { "rq" });
template = new AsyncRabbitTemplate(mock(ConnectionFactory.class), "ex", "rk", "rq", "ra");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("exchange")
.isEqualTo("ex");
assertThat(template).extracting(t -> t.getRabbitTemplate())
assertThat(template).extracting(AsyncRabbitTemplate::getRabbitTemplate)
.extracting("routingKey")
.isEqualTo("rk");
assertThat(template)
Expand Down Expand Up @@ -523,6 +523,7 @@ public RabbitTemplate templateForDirect(ConnectionFactory connectionFactory) {
@Primary
public SimpleMessageListenerContainer replyContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setReceiveTimeout(10);
container.setAfterReceivePostProcessors(new GUnzipPostProcessor());
container.setQueueNames(replies().getName());
return container;
Expand All @@ -545,6 +546,7 @@ public AsyncRabbitTemplate asyncDirectTemplate(
@Bean
public SimpleMessageListenerContainer remoteContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setReceiveTimeout(10);
container.setQueueNames(requests().getName());
container.setAfterReceivePostProcessors(new GUnzipPostProcessor());
MessageListenerAdapter messageListener =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2020 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -142,6 +142,7 @@ public void testFullConfiguration(ApplicationContext context) {
// Resolve the container and invoke a message on it

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setReceiveTimeout(10);
endpoint.setupListenerContainer(container);
MessagingMessageListenerAdapter listener = (MessagingMessageListenerAdapter) container.getMessageListener();

Expand All @@ -158,9 +159,9 @@ public void testFullConfiguration(ApplicationContext context) {
}

/**
* Test for {@link CustomBean} and an manually endpoint registered
* Test for {@link CustomBean} and a manually registered endpoint
* with "myCustomEndpointId". The custom endpoint does not provide
* any factory so it's registered with the default one
* any factory, so it's registered with the default one
*/
public void testCustomConfiguration(ApplicationContext context) {
RabbitListenerContainerTestFactory defaultFactory =
Expand All @@ -171,14 +172,15 @@ public void testCustomConfiguration(ApplicationContext context) {
assertThat(customFactory.getListenerContainers()).hasSize(1);
RabbitListenerEndpoint endpoint = defaultFactory.getListenerContainers().get(0).getEndpoint();
assertThat(endpoint.getClass()).as("Wrong endpoint type").isEqualTo(SimpleRabbitListenerEndpoint.class);
assertThat(((SimpleRabbitListenerEndpoint) endpoint).getMessageListener()).as("Wrong listener set in custom endpoint").isEqualTo(context.getBean("simpleMessageListener"));
assertThat(((SimpleRabbitListenerEndpoint) endpoint).getMessageListener())
.as("Wrong listener set in custom endpoint").isEqualTo(context.getBean("simpleMessageListener"));

RabbitListenerEndpointRegistry customRegistry =
context.getBean("customRegistry", RabbitListenerEndpointRegistry.class);
assertThat(customRegistry.getListenerContainerIds().size()).as("Wrong number of containers in the registry").isEqualTo(2);
assertThat(customRegistry.getListenerContainers().size()).as("Wrong number of containers in the registry").isEqualTo(2);
assertThat(customRegistry.getListenerContainer("listenerId")).as("Container with custom id on the annotation should be found").isNotNull();
assertThat(customRegistry.getListenerContainer("myCustomEndpointId")).as("Container created with custom id should be found").isNotNull();
assertThat(customRegistry.getListenerContainerIds()).hasSize(2);
assertThat(customRegistry.getListenerContainers()).hasSize(2);
assertThat(customRegistry.getListenerContainer("listenerId")).isNotNull();
assertThat(customRegistry.getListenerContainer("myCustomEndpointId")).isNotNull();
}

/**
Expand All @@ -205,7 +207,6 @@ public void testDefaultContainerFactoryConfiguration(ApplicationContext context)
/**
* Test for {@link ValidationBean} with a validator ({@link TestValidator}) specified
* in a custom {@link org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory}.
*
* The test should throw a {@link org.springframework.amqp.rabbit.support.ListenerExecutionFailedException}
*/
public void testRabbitHandlerMethodFactoryConfiguration(ApplicationContext context) throws Exception {
Expand All @@ -216,6 +217,7 @@ public void testRabbitHandlerMethodFactoryConfiguration(ApplicationContext conte
simpleFactory.getListenerContainers().get(0).getEndpoint();

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setReceiveTimeout(10);
endpoint.setupListenerContainer(container);
MessagingMessageListenerAdapter listener = (MessagingMessageListenerAdapter) container.getMessageListener();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2014-2023 the original author or authors.
* Copyright 2014-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -41,6 +41,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -249,6 +250,7 @@ public class EnableRabbitIntegrationTests extends NeedsManagementTests {
public static void setUp() {
System.setProperty(RabbitListenerAnnotationBeanPostProcessor.RABBIT_EMPTY_STRING_ARGUMENTS_PROPERTY,
"test-empty");
System.setProperty("spring.amqp.deserialization.trust.all", "true");
RabbitAvailableCondition.getBrokerRunning().removeExchanges("auto.exch.tx",
"auto.exch",
"auto.exch.fanout",
Expand Down Expand Up @@ -827,7 +829,7 @@ public void testMeta() throws Exception {
}

@Test
public void testHeadersExchange() throws Exception {
public void testHeadersExchange() {
assertThat(rabbitTemplate.convertSendAndReceive("auto.headers", "", "foo",
message -> {
message.getMessageProperties().getHeaders().put("foo", "bar");
Expand All @@ -846,7 +848,7 @@ public void deadLetterOnDefaultExchange() {
this.rabbitTemplate.convertAndSend("amqp656", "foo");
assertThat(this.rabbitTemplate.receiveAndConvert("amqp656dlq", 10000)).isEqualTo("foo");
try {
Map<String, Object> amqp656 = await().until(() -> queueInfo("amqp656"), q -> q != null);
Map<String, Object> amqp656 = await().until(() -> queueInfo("amqp656"), Objects::nonNull);
if (amqp656 != null) {
assertThat(arguments(amqp656).get("test-empty")).isEqualTo("");
assertThat(arguments(amqp656).get("test-null")).isEqualTo("undefined");
Expand Down Expand Up @@ -961,7 +963,7 @@ public void messagingMessageReturned() throws InterruptedException {
catch (@SuppressWarnings("unused") Exception e) {
return null;
}
}, tim -> tim != null);
}, Objects::nonNull);
assertThat(timer.count()).isEqualTo(1L);
}

Expand Down Expand Up @@ -1790,6 +1792,7 @@ public SimpleRabbitListenerContainerFactory consumerBatchContainerFactory() {
factory.setBatchListener(true);
factory.setBatchSize(2);
factory.setConsumerBatchEnabled(true);
factory.setReceiveTimeout(10L);
return factory;
}

Expand Down Expand Up @@ -1865,7 +1868,7 @@ public CountDownLatch errorHandlerLatch2() {

@Bean
public AtomicReference<Throwable> errorHandlerError() {
return new AtomicReference<Throwable>();
return new AtomicReference<>();
}

@Bean
Expand Down Expand Up @@ -1947,7 +1950,9 @@ public TxService txService() {

@Bean
public TaskExecutor exec1() {
return new ThreadPoolTaskExecutor();
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setAcceptTasksAfterContextClose(true);
return threadPoolTaskExecutor;
}

// Rabbit infrastructure setup
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -83,6 +83,7 @@ void multipleSimpleMessageListeners() {
Assertions.assertThat(methodEndpoint.getMethod()).isNotNull();

SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setReceiveTimeout(10);
methodEndpoint.setupListenerContainer(listenerContainer);
Assertions.assertThat(listenerContainer.getMessageListener()).isNotNull();
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -86,6 +86,7 @@ public void simpleMessageListener() {
assertThat(methodEndpoint.getMethod()).isNotNull();

SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setReceiveTimeout(10);
methodEndpoint.setupListenerContainer(listenerContainer);
assertThat(listenerContainer.getMessageListener()).isNotNull();

Expand Down Expand Up @@ -114,6 +115,7 @@ public void simpleMessageListenerWithMixedAnnotations() {
assertThat(iterator.next()).isEqualTo("secondQueue");

SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer();
listenerContainer.setReceiveTimeout(10);
methodEndpoint.setupListenerContainer(listenerContainer);
assertThat(listenerContainer.getMessageListener()).isNotNull();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2022 the original author or authors.
* Copyright 2015-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -63,11 +63,11 @@
*/
public class LocalizedQueueConnectionFactoryTests {

private final Map<String, Channel> channels = new HashMap<String, Channel>();
private final Map<String, Channel> channels = new HashMap<>();

private final Map<String, Consumer> consumers = new HashMap<String, Consumer>();
private final Map<String, Consumer> consumers = new HashMap<>();

private final Map<String, String> consumerTags = new HashMap<String, String>();
private final Map<String, String> consumerTags = new HashMap<>();

@Test
public void testFailOver() throws Exception {
Expand All @@ -83,7 +83,7 @@ public void testFailOver() throws Exception {
final AtomicBoolean firstServer = new AtomicBoolean(true);
final WebClient client1 = doCreateClient(adminUris[0], username, password, nodes[0]);
final WebClient client2 = doCreateClient(adminUris[1], username, password, nodes[1]);
final Map<String, ConnectionFactory> mockCFs = new HashMap<String, ConnectionFactory>();
final Map<String, ConnectionFactory> mockCFs = new HashMap<>();
CountDownLatch latch1 = new CountDownLatch(1);
CountDownLatch latch2 = new CountDownLatch(1);
mockCFs.put(rabbit1, mockCF(rabbit1, latch1));
Expand Down Expand Up @@ -116,6 +116,7 @@ public WebClient createClient(String username, String password) {
willAnswer(new CallsRealMethods()).given(logger).debug(anyString());
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(lqcf);
container.setReceiveTimeout(10);
container.setQueueNames("q");
container.afterPropertiesSet();
container.start();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -170,7 +170,7 @@ protected Object determineCurrentLookupKey() {
public void testAbstractRoutingConnectionFactoryWithListenerContainer() {
ConnectionFactory connectionFactory1 = mock(ConnectionFactory.class);
ConnectionFactory connectionFactory2 = mock(ConnectionFactory.class);
Map<Object, ConnectionFactory> factories = new HashMap<Object, ConnectionFactory>(2);
Map<Object, ConnectionFactory> factories = new HashMap<>(2);
factories.put("[baz]", connectionFactory1);
factories.put("[foo,bar]", connectionFactory2);
ConnectionFactory defaultConnectionFactory = mock(ConnectionFactory.class);
Expand All @@ -181,6 +181,7 @@ public void testAbstractRoutingConnectionFactoryWithListenerContainer() {
connectionFactory.setTargetConnectionFactories(factories);

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setReceiveTimeout(10);
container.setQueueNames("foo", "bar");
container.afterPropertiesSet();
container.start();
Expand Down
Loading

0 comments on commit 37d9641

Please sign in to comment.