Skip to content

Commit

Permalink
GH-2715: Fix channel leak in CachingConnectionFactory
Browse files Browse the repository at this point in the history
Fixes: #2715

When connection is closed from the broker, there are some channels leak into a cache after reconnection
  • Loading branch information
artembilan committed May 20, 2024
1 parent 602dcaf commit bb73fb8
Showing 1 changed file with 19 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1121,7 +1121,7 @@ else if (methodName.equals("close")) {
return null;
}
else {
physicalClose(proxy);
physicalClose();
return null;
}
}
Expand Down Expand Up @@ -1193,24 +1193,16 @@ else if (txEnds.contains(methodName)) {
}
}

private void releasePermitIfNecessary(Object proxy) {
private void releasePermitIfNecessary() {
if (CachingConnectionFactory.this.channelCheckoutTimeout > 0) {
/*
* Only release a permit if this is a normal close; if the channel is
* in the list, it means we're closing a cached channel (for which a permit
* has already been released).
*/
synchronized (this.channelList) {
if (this.channelList.contains(proxy)) {
return;
}
}
Semaphore permits = CachingConnectionFactory.this.checkoutPermits.get(this.theConnection);
if (permits != null) {
permits.release();
if (logger.isDebugEnabled()) {
logger.debug("Released permit for '" + this.theConnection + "', remaining: "
+ permits.availablePermits());
if (permits.availablePermits() < CachingConnectionFactory.this.channelCacheSize) {
permits.release();
if (logger.isDebugEnabled()) {
logger.debug("Released permit for '" + this.theConnection + "', remaining: "
+ permits.availablePermits());
}
}
}
else {
Expand All @@ -1235,11 +1227,8 @@ private void logicalClose(ChannelProxy proxy) throws IOException, TimeoutExcepti
if (this.target instanceof PublisherCallbackChannel) {
this.target.close(); // emit nacks if necessary
}
if (this.channelList.contains(proxy)) {
this.channelList.remove(proxy);
}
else {
releasePermitIfNecessary(proxy);
if (!this.channelList.remove(proxy)) {
releasePermitIfNecessary();
}
this.target = null;
return;
Expand Down Expand Up @@ -1275,7 +1264,7 @@ private void returnToCache(ChannelProxy proxy) {
// The channel didn't handle confirms, so close it altogether to avoid
// memory leaks for pending confirms
try {
physicalClose(this.theConnection.channelsAwaitingAcks.remove(this.target));
physicalClose();
}
catch (@SuppressWarnings(UNUSED) Exception e) {
}
Expand All @@ -1298,7 +1287,7 @@ private void doReturnToCache(Channel proxy) {
else {
if (proxy.isOpen()) {
try {
physicalClose(proxy);
physicalClose();
}
catch (@SuppressWarnings(UNUSED) Exception e) {
}
Expand All @@ -1315,7 +1304,7 @@ private void cacheOrClose(Channel proxy) {
logger.trace("Cache limit reached: " + this.target);
}
try {
physicalClose(proxy);
physicalClose();
}
catch (@SuppressWarnings(UNUSED) Exception e) {
}
Expand All @@ -1324,8 +1313,8 @@ else if (!alreadyCached) {
if (logger.isTraceEnabled()) {
logger.trace("Returning cached Channel: " + this.target);
}
releasePermitIfNecessary(proxy);
this.channelList.addLast((ChannelProxy) proxy);
releasePermitIfNecessary();
setHighWaterMark();
}
}
Expand All @@ -1342,7 +1331,7 @@ private void setHighWaterMark() {
}
}

private void physicalClose(Object proxy) throws IOException, TimeoutException {
private void physicalClose() throws IOException, TimeoutException {
if (logger.isDebugEnabled()) {
logger.debug("Closing cached Channel: " + this.target);
}
Expand All @@ -1356,7 +1345,7 @@ private void physicalClose(Object proxy) throws IOException, TimeoutException {
(ConfirmType.CORRELATED.equals(CachingConnectionFactory.this.confirmType) ||
CachingConnectionFactory.this.publisherReturns)) {
async = true;
asyncClose(proxy);
asyncClose();
}
else {
this.target.close();
Expand All @@ -1373,12 +1362,12 @@ private void physicalClose(Object proxy) throws IOException, TimeoutException {
finally {
this.target = null;
if (!async) {
releasePermitIfNecessary(proxy);
releasePermitIfNecessary();
}
}
}

private void asyncClose(Object proxy) {
private void asyncClose() {
ExecutorService executorService = getChannelsExecutor();
final Channel channel = CachedChannelInvocationHandler.this.target;
CachingConnectionFactory.this.inFlightAsyncCloses.add(channel);
Expand Down Expand Up @@ -1414,7 +1403,7 @@ private void asyncClose(Object proxy) {
}
finally {
CachingConnectionFactory.this.inFlightAsyncCloses.release(channel);
releasePermitIfNecessary(proxy);
releasePermitIfNecessary();
}
}
});
Expand Down

0 comments on commit bb73fb8

Please sign in to comment.