diff --git a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java index 7e8fe32af767..e2ecbb3210e4 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioConnection.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioConnection.java @@ -34,11 +34,11 @@ import java.nio.channels.SocketChannel; import java.security.GeneralSecurityException; import java.util.ArrayList; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -80,7 +80,7 @@ public abstract class NioConnection implements Callable { protected ExecutorService _executor; protected ExecutorService _sslHandshakeExecutor; protected CAService caService; - protected Set socketChannels = new HashSet<>(); + protected Set socketChannels = ConcurrentHashMap.newKeySet(); protected Integer sslHandshakeTimeout = null; private final int factoryMaxNewConnectionsCount; protected boolean blockNewConnections; @@ -219,7 +219,7 @@ public Boolean call() throws NioConnectionException { return true; } - abstract void init() throws IOException; + protected abstract void init() throws IOException; abstract void registerLink(InetSocketAddress saddr, Link link); @@ -489,16 +489,47 @@ protected void write(final SelectionKey key) throws IOException { } protected void closeConnection(final SelectionKey key) { - if (key != null) { - final SocketChannel channel = (SocketChannel)key.channel(); - key.cancel(); + if (key == null) { + return; + } + + SocketChannel channel = null; + try { + // 1. Check type and handle potential CancelledKeyException + if (key.isValid() && key.channel() instanceof SocketChannel) { + channel = (SocketChannel) key.channel(); + } + } catch (CancelledKeyException e) { + logger.trace("Key already cancelled when trying to get channel in closeConnection."); + } + + // 2. Cancel the key (safe to call even if already cancelled) + key.cancel(); + + if (channel == null) { + logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key); + return; + } + + // 3. Try to close the channel if we obtained it + if (channel != null) { + closeChannel(channel); + } else { + logger.trace("Channel was null, invalid, or not a SocketChannel for key: " + key); + } + } + + private void closeChannel(SocketChannel channel) { + if (channel != null && channel.isOpen()) { try { - if (channel != null) { - logger.debug("Closing socket {}", channel.socket()); - channel.close(); - } - } catch (final IOException ignore) { - logger.info("[ignored] channel"); + logger.debug("Closing socket " + channel.socket()); + channel.close(); + } catch (IOException ignore) { + logger.warn(String.format("[ignored] Exception closing channel: %s, due to %s", channel, ignore.getMessage())); + } catch (Exception e) { + logger.warn(String.format("Unexpected exception in closing channel %s", channel), e); + } finally { + socketChannels.remove(channel); } } } @@ -530,14 +561,7 @@ public void close(final SelectionKey key) { /* Release the resource used by the instance */ public void cleanUp() throws IOException { for (SocketChannel channel : socketChannels) { - if (channel != null && channel.isOpen()) { - try { - logger.info("Closing connection: {}", channel.getRemoteAddress()); - channel.close(); - } catch (IOException e) { - logger.warn("Unable to close connection due to {}", e.getMessage()); - } - } + closeChannel(channel); } if (_selector != null) { _selector.close(); diff --git a/utils/src/main/java/com/cloud/utils/nio/NioServer.java b/utils/src/main/java/com/cloud/utils/nio/NioServer.java index fd5af516badd..c4f44afeccec 100644 --- a/utils/src/main/java/com/cloud/utils/nio/NioServer.java +++ b/utils/src/main/java/com/cloud/utils/nio/NioServer.java @@ -25,7 +25,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.spi.SelectorProvider; -import java.util.WeakHashMap; +import java.util.concurrent.ConcurrentHashMap; import org.apache.cloudstack.framework.ca.CAService; @@ -34,15 +34,15 @@ public class NioServer extends NioConnection { protected InetSocketAddress localAddress; private ServerSocketChannel serverSocket; - protected WeakHashMap links; + protected ConcurrentHashMap links; public NioServer(final String name, final int port, final int workers, final HandlerFactory factory, final CAService caService, final Integer sslHandShakeTimeout) { - super(name, port, workers,factory); + super(name, port, workers, factory); setCAService(caService); setSslHandshakeTimeout(sslHandShakeTimeout); localAddress = null; - links = new WeakHashMap<>(1024); + links = new ConcurrentHashMap<>(1024); } public int getPort() {