Skip to content

Commit

Permalink
GH-2715: Fix channel leak in CachingConnectionFactory
Browse files Browse the repository at this point in the history
Fixes: #2715

When connection is closed from the broker, there are some channels leak into a cache after reconnection

**Auto-cherry-pick to `3.0.x`**
  • Loading branch information
artembilan committed May 20, 2024
1 parent 105dfac commit b6409de
Showing 1 changed file with 19 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,6 @@ public enum ConfirmType {

private final Condition connectionAvailableCondition = this.connectionLock.newCondition();


private final ActiveObjectCounter<Channel> inFlightAsyncCloses = new ActiveObjectCounter<>();

private final AtomicBoolean running = new AtomicBoolean();
Expand Down Expand Up @@ -1156,7 +1155,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
return null;
}
else {
physicalClose(channelProxy);
physicalClose();
return null;
}
}
Expand Down Expand Up @@ -1237,29 +1236,16 @@ else if (txEnds.contains(methodName)) {
}
}

private void releasePermitIfNecessary(ChannelProxy proxy) {
private void releasePermitIfNecessary() {
if (CachingConnectionFactory.this.channelCheckoutTimeout > 0) {
/*
* Only release a permit if this is a normal close; if the channel is
* in the list, it means we're closing a cached channel (for which a permit
* has already been released).
*/

this.theConnection.channelListLock.lock();
try {
if (this.channelList.contains(proxy)) {
return;
}
}
finally {
this.theConnection.channelListLock.unlock();
}
Semaphore permits = CachingConnectionFactory.this.checkoutPermits.get(this.theConnection);
if (permits != null) {
permits.release();
if (logger.isDebugEnabled()) {
logger.debug("Released permit for '" + this.theConnection + "', remaining: "
+ permits.availablePermits());
if (permits.availablePermits() < CachingConnectionFactory.this.channelCacheSize) {
permits.release();
if (logger.isDebugEnabled()) {
logger.debug("Released permit for '" + this.theConnection + "', remaining: "
+ permits.availablePermits());
}
}
}
else {
Expand All @@ -1285,11 +1271,8 @@ private void logicalClose(ChannelProxy proxy) throws IOException, TimeoutExcepti
if (this.target instanceof PublisherCallbackChannel) {
this.target.close(); // emit nacks if necessary
}
if (this.channelList.contains(proxy)) {
this.channelList.remove(proxy);
}
else {
releasePermitIfNecessary(proxy);
if (!this.channelList.remove(proxy)) {
releasePermitIfNecessary();
}
this.target = null;
return;
Expand Down Expand Up @@ -1328,7 +1311,7 @@ private void returnToCache(ChannelProxy proxy) {
// The channel didn't handle confirms, so close it altogether to avoid
// memory leaks for pending confirms
try {
physicalClose(this.theConnection.channelsAwaitingAcks.remove(this.target));
physicalClose();
}
catch (@SuppressWarnings(UNUSED) Exception e) {
}
Expand All @@ -1352,7 +1335,7 @@ private void doReturnToCache(@Nullable ChannelProxy proxy) {
else {
if (proxy.isOpen()) {
try {
physicalClose(proxy);
physicalClose();
}
catch (@SuppressWarnings(UNUSED) Exception e) {
}
Expand All @@ -1372,7 +1355,7 @@ private void cacheOrClose(ChannelProxy proxy) {
logger.trace("Cache limit reached: " + this.target);
}
try {
physicalClose(proxy);
physicalClose();
}
catch (@SuppressWarnings(UNUSED) Exception e) {
}
Expand All @@ -1381,8 +1364,8 @@ else if (!alreadyCached) {
if (logger.isTraceEnabled()) {
logger.trace("Returning cached Channel: " + this.target);
}
releasePermitIfNecessary(proxy);
this.channelList.addLast(proxy);
releasePermitIfNecessary();
setHighWaterMark();
}
}
Expand All @@ -1399,7 +1382,7 @@ private void setHighWaterMark() {
}
}

private void physicalClose(ChannelProxy proxy) throws IOException, TimeoutException {
private void physicalClose() throws IOException, TimeoutException {
if (logger.isDebugEnabled()) {
logger.debug("Closing cached Channel: " + this.target);
}
Expand All @@ -1413,7 +1396,7 @@ private void physicalClose(ChannelProxy proxy) throws IOException, TimeoutExcept
(ConfirmType.CORRELATED.equals(CachingConnectionFactory.this.confirmType) ||
CachingConnectionFactory.this.publisherReturns)) {
async = true;
asyncClose(proxy);
asyncClose();
}
else {
this.target.close();
Expand All @@ -1430,12 +1413,12 @@ private void physicalClose(ChannelProxy proxy) throws IOException, TimeoutExcept
finally {
this.target = null;
if (!async) {
releasePermitIfNecessary(proxy);
releasePermitIfNecessary();
}
}
}

private void asyncClose(ChannelProxy proxy) {
private void asyncClose() {
ExecutorService executorService = getChannelsExecutor();
final Channel channel = CachedChannelInvocationHandler.this.target;
CachingConnectionFactory.this.inFlightAsyncCloses.add(channel);
Expand Down Expand Up @@ -1467,7 +1450,7 @@ private void asyncClose(ChannelProxy proxy) {
}
finally {
CachingConnectionFactory.this.inFlightAsyncCloses.release(channel);
releasePermitIfNecessary(proxy);
releasePermitIfNecessary();
}
}
});
Expand Down

0 comments on commit b6409de

Please sign in to comment.