Skip to content

Improve socketChannel closing in NioConnection #10895

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 23, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 44 additions & 20 deletions utils/src/main/java/com/cloud/utils/nio/NioConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,11 @@
import java.nio.channels.SocketChannel;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -80,7 +80,7 @@
protected ExecutorService _executor;
protected ExecutorService _sslHandshakeExecutor;
protected CAService caService;
protected Set<SocketChannel> socketChannels = new HashSet<>();
protected Set<SocketChannel> socketChannels = ConcurrentHashMap.newKeySet();
protected Integer sslHandshakeTimeout = null;
private final int factoryMaxNewConnectionsCount;
protected boolean blockNewConnections;
Expand Down Expand Up @@ -219,7 +219,7 @@
return true;
}

abstract void init() throws IOException;
protected abstract void init() throws IOException;

abstract void registerLink(InetSocketAddress saddr, Link link);

Expand Down Expand Up @@ -489,16 +489,47 @@
}

protected void closeConnection(final SelectionKey key) {
if (key != null) {
final SocketChannel channel = (SocketChannel)key.channel();
key.cancel();
if (key == null) {
return;

Check warning on line 493 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L493

Added line #L493 was not covered by tests
}

SocketChannel channel = null;
try {

Check warning on line 497 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L496-L497

Added lines #L496 - L497 were not covered by tests
// 1. Check type and handle potential CancelledKeyException
if (key.isValid() && key.channel() instanceof SocketChannel) {
channel = (SocketChannel) key.channel();

Check warning on line 500 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L500

Added line #L500 was not covered by tests
}
} catch (CancelledKeyException e) {
logger.trace("Key already cancelled when trying to get channel in closeConnection.");
}

Check warning on line 504 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L502-L504

Added lines #L502 - L504 were not covered by tests

// 2. Cancel the key (safe to call even if already cancelled)
key.cancel();

Check warning on line 507 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L507

Added line #L507 was not covered by tests

if (channel == null) {
logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key);
return;

Check warning on line 511 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L510-L511

Added lines #L510 - L511 were not covered by tests
}

// 3. Try to close the channel if we obtained it
if (channel != null) {
closeChannel(channel);

Check warning on line 516 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L516

Added line #L516 was not covered by tests
} else {
logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key);

Check warning on line 518 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L518

Added line #L518 was not covered by tests
}
}

Check warning on line 520 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L520

Added line #L520 was not covered by tests

private void closeChannel(SocketChannel channel) {

Check warning on line 522 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L522

Added line #L522 was not covered by tests
if (channel != null && channel.isOpen()) {
try {
if (channel != null) {
logger.debug("Closing socket {}", channel.socket());
channel.close();
}
} catch (final IOException ignore) {
logger.info("[ignored] channel");
logger.debug("Closing socket " + channel.socket());
channel.close();
} catch (IOException ignore) {
logger.warn(String.format("[ignored] Exception closing channel: %s, due to %s", channel, ignore.getMessage()));
} catch (Exception e) {
logger.warn(String.format("Unexpected exception in closing channel %s", channel), e);

Check warning on line 530 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L525-L530

Added lines #L525 - L530 were not covered by tests
} finally {
socketChannels.remove(channel);

Check warning on line 532 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L532

Added line #L532 was not covered by tests
}
}
}
Expand Down Expand Up @@ -530,14 +561,7 @@
/* Release the resource used by the instance */
public void cleanUp() throws IOException {
for (SocketChannel channel : socketChannels) {
if (channel != null && channel.isOpen()) {
try {
logger.info("Closing connection: {}", channel.getRemoteAddress());
channel.close();
} catch (IOException e) {
logger.warn("Unable to close connection due to {}", e.getMessage());
}
}
closeChannel(channel);

Check warning on line 564 in utils/src/main/java/com/cloud/utils/nio/NioConnection.java

View check run for this annotation

Codecov / codecov/patch

utils/src/main/java/com/cloud/utils/nio/NioConnection.java#L564

Added line #L564 was not covered by tests
}
if (_selector != null) {
_selector.close();
Expand Down
8 changes: 4 additions & 4 deletions utils/src/main/java/com/cloud/utils/nio/NioServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.cloudstack.framework.ca.CAService;

Expand All @@ -34,15 +34,15 @@ public class NioServer extends NioConnection {
protected InetSocketAddress localAddress;
private ServerSocketChannel serverSocket;

protected WeakHashMap<InetSocketAddress, Link> links;
protected ConcurrentHashMap<InetSocketAddress, Link> links;

public NioServer(final String name, final int port, final int workers, final HandlerFactory factory,
final CAService caService, final Integer sslHandShakeTimeout) {
super(name, port, workers,factory);
super(name, port, workers, factory);
setCAService(caService);
setSslHandshakeTimeout(sslHandShakeTimeout);
localAddress = null;
links = new WeakHashMap<>(1024);
links = new ConcurrentHashMap<>(1024);
}

public int getPort() {
Expand Down
Loading