diff --git a/src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionClient.kt b/src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionClient.kt index 1a1b860..6cdb314 100644 --- a/src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionClient.kt +++ b/src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionClient.kt @@ -4,6 +4,8 @@ import io.ktor.network.selector.SelectorManager import io.ktor.network.sockets.aSocket import dev.datlag.k2k.Dispatcher import dev.datlag.k2k.Host +import dev.datlag.tooling.async.scopeCatching +import dev.datlag.tooling.async.suspendCatching import io.ktor.network.sockets.InetSocketAddress import io.ktor.network.sockets.Socket import io.ktor.network.sockets.openWriteChannel @@ -15,13 +17,15 @@ internal class ConnectionClient( private val immediate: Boolean ) : AutoCloseable { - private var socket = aSocket(SelectorManager(Dispatcher.IO)).let { - if (immediate) { - it.tcpNoDelay().tcp() - } else { - it.tcp() + private var socket = scopeCatching { + aSocket(SelectorManager(Dispatcher.IO)).let { + if (immediate) { + it.tcpNoDelay().tcp() + } else { + it.tcp() + } } - } + }.getOrNull() private var connectedSocket: Socket? = null @@ -29,9 +33,19 @@ internal class ConnectionClient( byteArray: ByteArray, host: Host, port: Int - ) { + ) = suspendCatching { val socketAddress = InetSocketAddress(host.hostAddress, port) - connectedSocket = socket.connect(socketAddress) { + val useSocket = socket ?: suspendCatching { + aSocket(SelectorManager(Dispatcher.IO)).let { + if (immediate) { + it.tcpNoDelay().tcp() + } else { + it.tcp() + } + } + }.getOrNull()?.also { socket = it } ?: return@suspendCatching + + connectedSocket = useSocket.connect(socketAddress) { reuseAddress = true }.also { val channel = it.openWriteChannel(autoFlush = true) @@ -45,12 +59,14 @@ internal class ConnectionClient( connectedSocket?.close() connectedSocket = null - socket = aSocket(SelectorManager(Dispatcher.IO)).let { - if (immediate) { - it.tcpNoDelay().tcp() - } else { - it.tcp() + socket = scopeCatching { + aSocket(SelectorManager(Dispatcher.IO)).let { + if (immediate) { + it.tcpNoDelay().tcp() + } else { + it.tcp() + } } - } + }.getOrNull() } } \ No newline at end of file diff --git a/src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionServer.kt b/src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionServer.kt index 4abac8b..ce825fa 100644 --- a/src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionServer.kt +++ b/src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionServer.kt @@ -2,6 +2,7 @@ package dev.datlag.k2k.connect import dev.datlag.k2k.Dispatcher import dev.datlag.k2k.NetInterface +import dev.datlag.tooling.async.scopeCatching import dev.datlag.tooling.async.suspendCatching import io.ktor.network.selector.SelectorManager import io.ktor.network.sockets.InetSocketAddress @@ -22,13 +23,15 @@ internal class ConnectionServer( private val immediate: Boolean ) : AutoCloseable { private var receiveJob: Job? = null - private var socket = aSocket(SelectorManager(Dispatcher.IO)).let { - if (immediate) { - it.tcpNoDelay().tcp() - } else { - it.tcp() + private var socket = scopeCatching { + aSocket(SelectorManager(Dispatcher.IO)).let { + if (immediate) { + it.tcpNoDelay().tcp() + } else { + it.tcp() + } } - } + }.getOrNull() private var serverSocket: ServerSocket? = null private var connectedSocket: Socket? = null @@ -38,18 +41,37 @@ internal class ConnectionServer( scope: CoroutineScope, listener: suspend (ByteArray) -> Unit ) { - close() - receiveJob = scope.launch(Dispatcher.IO) { - val socketAddress = InetSocketAddress(NetInterface.getLocalAddress(), port) + receive(port, listener) + } + } - serverSocket = socket.bind(socketAddress) { - reuseAddress = true + suspend fun receive( + port: Int, + listener: suspend (ByteArray) -> Unit + ) = suspendCatching { + close() + + val socketAddress = InetSocketAddress(NetInterface.getLocalAddress(), port) + val useSocket = socket ?: suspendCatching { + aSocket(SelectorManager(Dispatcher.IO)).let { + if (immediate) { + it.tcpNoDelay().tcp() + } else { + it.tcp() + } } + }.getOrNull() ?: return@suspendCatching + + serverSocket = useSocket.bind(socketAddress) { + reuseAddress = true + } + + while(currentCoroutineContext().isActive) { + connectedSocket?.close() - while(currentCoroutineContext().isActive) { - connectedSocket?.close() - connectedSocket = serverSocket?.accept()?.also { + connectedSocket = suspendCatching { + serverSocket?.accept()?.also { it.use { boundSocket -> suspendCatching { val readChannel = boundSocket.openReadChannel() @@ -67,7 +89,7 @@ internal class ConnectionServer( } } } - } + }.getOrNull() } } @@ -81,12 +103,14 @@ internal class ConnectionServer( connectedSocket?.close() connectedSocket = null - socket = aSocket(SelectorManager(Dispatcher.IO)).let { - if (immediate) { - it.tcpNoDelay().tcp() - } else { - it.tcp() + socket = scopeCatching { + aSocket(SelectorManager(Dispatcher.IO)).let { + if (immediate) { + it.tcpNoDelay().tcp() + } else { + it.tcp() + } } - } + }.getOrNull() } } \ No newline at end of file diff --git a/src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryClient.kt b/src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryClient.kt index 1d7de07..94098c6 100644 --- a/src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryClient.kt +++ b/src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryClient.kt @@ -5,6 +5,7 @@ import io.ktor.network.selector.SelectorManager import io.ktor.network.sockets.aSocket import dev.datlag.k2k.Dispatcher import dev.datlag.k2k.NetInterface +import dev.datlag.tooling.async.scopeCatching import dev.datlag.tooling.async.suspendCatching import io.ktor.network.sockets.InetSocketAddress import io.ktor.network.sockets.openWriteChannel @@ -19,7 +20,9 @@ import kotlinx.coroutines.launch internal class DiscoveryClient : AutoCloseable { private var broadcastJob: Job? = null - private var socket = aSocket(SelectorManager(Dispatcher.IO)).udp() + private var socket = scopeCatching { + aSocket(SelectorManager(Dispatcher.IO)).udp() + }.getOrNull() fun broadcast( port: Int, @@ -41,7 +44,11 @@ internal class DiscoveryClient : AutoCloseable { data: ByteArray ) { suspend fun writeToSocket(address: String, port: Int) = suspendCatching { - val socketConnection = socket.connect(InetSocketAddress(address, port)) { + val useSocket = socket ?: suspendCatching { + aSocket(SelectorManager(Dispatcher.IO)).udp() + }.getOrNull()?.also { socket = it } ?: return@suspendCatching + + val socketConnection = useSocket.connect(InetSocketAddress(address, port)) { broadcast = true reuseAddress = true } @@ -62,6 +69,8 @@ internal class DiscoveryClient : AutoCloseable { override fun close() { broadcastJob?.cancel() broadcastJob = null - socket = aSocket(SelectorManager(Dispatcher.IO)).udp() + socket = scopeCatching { + aSocket(SelectorManager(Dispatcher.IO)).udp() + }.getOrNull() } } \ No newline at end of file diff --git a/src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryServer.kt b/src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryServer.kt index 5cfc6de..f36a599 100644 --- a/src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryServer.kt +++ b/src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryServer.kt @@ -4,6 +4,7 @@ import dev.datlag.k2k.Constants import dev.datlag.k2k.Dispatcher import dev.datlag.k2k.Host import dev.datlag.k2k.NetInterface +import dev.datlag.tooling.async.scopeCatching import dev.datlag.tooling.async.suspendCatching import io.ktor.network.selector.SelectorManager import io.ktor.network.sockets.InetSocketAddress @@ -26,7 +27,9 @@ import kotlinx.serialization.decodeFromByteArray internal class DiscoveryServer : AutoCloseable { private var listenJob: Job? = null - private var socket = aSocket(SelectorManager(Dispatcher.IO)).udp() + private var socket = scopeCatching { + aSocket(SelectorManager(Dispatcher.IO)).udp() + }.getOrNull() internal val hosts = MutableStateFlow>(persistentSetOf()) fun listen( @@ -47,9 +50,13 @@ internal class DiscoveryServer : AutoCloseable { ping: Long, filter: Regex, hostIsClient: Boolean - ) { + ) = suspendCatching { val socketAddress = InetSocketAddress(Constants.BROADCAST_SOCKET, port) - val serverSocket = socket.bind(socketAddress) { + val useSocket = socket ?: suspendCatching { + aSocket(SelectorManager(Dispatcher.IO)).udp() + }.getOrNull()?.also { socket = it } ?: return@suspendCatching + + val serverSocket = useSocket.bind(socketAddress) { broadcast = true reuseAddress = true } @@ -90,7 +97,9 @@ internal class DiscoveryServer : AutoCloseable { override fun close() { listenJob?.cancel() listenJob = null - socket = aSocket(SelectorManager(Dispatcher.IO)).udp() + socket = scopeCatching { + aSocket(SelectorManager(Dispatcher.IO)).udp() + }.getOrNull() hosts.update { persistentSetOf() } } } \ No newline at end of file