Skip to content

Commit

Permalink
GH-2640: Fix leak for non-confirmed channel
Browse files Browse the repository at this point in the history
Fixes: #2640

Rabbit server was unstable for a while.
Once restored, we were unable to publish new confirmed messages to it
(the max number of channel on connection was reached and the existing channels were ignored).
Essentially `PublisherCallbackChannel` instances ara waiting for acks on their confirms
which never going to happen.
Therefore, these channels are not closed and cache state is not reset.

* Fix `CachingConnectionFactory.CachedChannelInvocationHandler.returnToCache()`
to schedule `waitForConfirms()` in the separate thread.
If `TimeoutException` happens, perform `physicalClose()` to avoid any possible
memory leaks
* Adjust `RabbitTemplatePublisherCallbacksIntegrationTests.testPublisherConfirmNotReceived()`
to ensure that "unconfirmed" channel is closed and `CachingConnectionFactory` can produce
a new channel

(cherry picked from commit 9a8d741)
  • Loading branch information
artembilan authored and spring-builds committed Feb 28, 2024
1 parent 6e6d9d1 commit d90b175
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 9 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2023 the original author or authors.
* Copyright 2002-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -1250,13 +1250,38 @@ private void logicalClose(ChannelProxy proxy) throws IOException, TimeoutExcepti
}

private void returnToCache(ChannelProxy proxy) {
if (CachingConnectionFactory.this.active && this.publisherConfirms
&& proxy instanceof PublisherCallbackChannel) {
if (CachingConnectionFactory.this.active
&& this.publisherConfirms
&& proxy instanceof PublisherCallbackChannel publisherCallbackChannel) {

this.theConnection.channelsAwaitingAcks.put(this.target, proxy);
((PublisherCallbackChannel) proxy)
.setAfterAckCallback(c ->
doReturnToCache(this.theConnection.channelsAwaitingAcks.remove(c)));
AtomicBoolean ackCallbackCalledImmediately = new AtomicBoolean();
publisherCallbackChannel
.setAfterAckCallback(c -> {
ackCallbackCalledImmediately.set(true);
doReturnToCache(this.theConnection.channelsAwaitingAcks.remove(c));
});

if (!ackCallbackCalledImmediately.get()) {
getChannelsExecutor()
.execute(() -> {
try {
publisherCallbackChannel.waitForConfirms(ASYNC_CLOSE_TIMEOUT);
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
catch (TimeoutException ex) {
// The channel didn't handle confirms, so close it altogether to avoid
// memory leaks for pending confirms
try {
physicalClose(this.theConnection.channelsAwaitingAcks.remove(this.target));
}
catch (@SuppressWarnings(UNUSED) Exception e) {
}
}
});
}
}
else {
doReturnToCache(proxy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.willAnswer;
Expand All @@ -43,6 +44,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -326,6 +328,14 @@ public void testPublisherConfirmNotReceived() throws Exception {
given(mockChannel.isOpen()).willReturn(true);
given(mockChannel.getNextPublishSeqNo()).willReturn(1L);

CountDownLatch timeoutExceptionLatch = new CountDownLatch(1);

given(mockChannel.waitForConfirms(anyLong()))
.willAnswer(invocation -> {
timeoutExceptionLatch.await(10, TimeUnit.SECONDS);
throw new TimeoutException();
});

given(mockConnectionFactory.newConnection(any(ExecutorService.class), anyString())).willReturn(mockConnection);
given(mockConnection.isOpen()).willReturn(true);

Expand All @@ -334,20 +344,26 @@ public void testPublisherConfirmNotReceived() throws Exception {
.createChannel();

CachingConnectionFactory ccf = new CachingConnectionFactory(mockConnectionFactory);
ccf.setExecutor(mock(ExecutorService.class));
ccf.setExecutor(Executors.newCachedThreadPool());
ccf.setPublisherConfirmType(ConfirmType.CORRELATED);
ccf.setChannelCacheSize(1);
ccf.setChannelCheckoutTimeout(10000);
final RabbitTemplate template = new RabbitTemplate(ccf);

final AtomicBoolean confirmed = new AtomicBoolean();
template.setConfirmCallback((correlationData, ack, cause) -> confirmed.set(true));
template.convertAndSend(ROUTE, (Object) "message", new CorrelationData("abc"));
Thread.sleep(5);

assertThat(template.getUnconfirmedCount()).isEqualTo(1);
Collection<CorrelationData> unconfirmed = template.getUnconfirmed(-1);
assertThat(template.getUnconfirmedCount()).isEqualTo(0);
assertThat(unconfirmed).hasSize(1);
assertThat(unconfirmed.iterator().next().getId()).isEqualTo("abc");
assertThat(confirmed.get()).isFalse();

timeoutExceptionLatch.countDown();

assertThat(ccf.createConnection().createChannel(false)).isNotNull();
}

@Test
Expand Down Expand Up @@ -563,7 +579,7 @@ public void testPublisherConfirmMultipleWithTwoListeners() throws Exception {
* time as adding a new pending ack to the map. Test verifies we don't
* get a {@link ConcurrentModificationException}.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
@SuppressWarnings({"rawtypes", "unchecked"})
@Test
public void testConcurrentConfirms() throws Exception {
ConnectionFactory mockConnectionFactory = mock(ConnectionFactory.class);
Expand Down

0 comments on commit d90b175

Please sign in to comment.