Skip to content

Commit

Permalink
fix some crashes
Browse files Browse the repository at this point in the history
  • Loading branch information
DatL4g committed Jul 21, 2024
1 parent c81ec2f commit 8d06025
Show file tree
Hide file tree
Showing 4 changed files with 100 additions and 42 deletions.
44 changes: 30 additions & 14 deletions src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.aSocket
import dev.datlag.k2k.Dispatcher
import dev.datlag.k2k.Host
import dev.datlag.tooling.async.scopeCatching
import dev.datlag.tooling.async.suspendCatching
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.Socket
import io.ktor.network.sockets.openWriteChannel
Expand All @@ -15,23 +17,35 @@ internal class ConnectionClient(
private val immediate: Boolean
) : AutoCloseable {

private var socket = aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
private var socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
}
}.getOrNull()

private var connectedSocket: Socket? = null

suspend fun send(
byteArray: ByteArray,
host: Host,
port: Int
) {
) = suspendCatching {
val socketAddress = InetSocketAddress(host.hostAddress, port)
connectedSocket = socket.connect(socketAddress) {
val useSocket = socket ?: suspendCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
}.getOrNull()?.also { socket = it } ?: return@suspendCatching

connectedSocket = useSocket.connect(socketAddress) {
reuseAddress = true
}.also {
val channel = it.openWriteChannel(autoFlush = true)
Expand All @@ -45,12 +59,14 @@ internal class ConnectionClient(
connectedSocket?.close()
connectedSocket = null

socket = aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
}
}.getOrNull()
}
}
66 changes: 45 additions & 21 deletions src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dev.datlag.k2k.connect

import dev.datlag.k2k.Dispatcher
import dev.datlag.k2k.NetInterface
import dev.datlag.tooling.async.scopeCatching
import dev.datlag.tooling.async.suspendCatching
import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
Expand All @@ -22,13 +23,15 @@ internal class ConnectionServer(
private val immediate: Boolean
) : AutoCloseable {
private var receiveJob: Job? = null
private var socket = aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
private var socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
}
}.getOrNull()

private var serverSocket: ServerSocket? = null
private var connectedSocket: Socket? = null
Expand All @@ -38,18 +41,37 @@ internal class ConnectionServer(
scope: CoroutineScope,
listener: suspend (ByteArray) -> Unit
) {
close()

receiveJob = scope.launch(Dispatcher.IO) {
val socketAddress = InetSocketAddress(NetInterface.getLocalAddress(), port)
receive(port, listener)
}
}

serverSocket = socket.bind(socketAddress) {
reuseAddress = true
suspend fun receive(
port: Int,
listener: suspend (ByteArray) -> Unit
) = suspendCatching {
close()

val socketAddress = InetSocketAddress(NetInterface.getLocalAddress(), port)
val useSocket = socket ?: suspendCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
}.getOrNull() ?: return@suspendCatching

serverSocket = useSocket.bind(socketAddress) {
reuseAddress = true
}

while(currentCoroutineContext().isActive) {
connectedSocket?.close()

while(currentCoroutineContext().isActive) {
connectedSocket?.close()
connectedSocket = serverSocket?.accept()?.also {
connectedSocket = suspendCatching {
serverSocket?.accept()?.also {
it.use { boundSocket ->
suspendCatching {
val readChannel = boundSocket.openReadChannel()
Expand All @@ -67,7 +89,7 @@ internal class ConnectionServer(
}
}
}
}
}.getOrNull()
}
}

Expand All @@ -81,12 +103,14 @@ internal class ConnectionServer(
connectedSocket?.close()
connectedSocket = null

socket = aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).let {
if (immediate) {
it.tcpNoDelay().tcp()
} else {
it.tcp()
}
}
}
}.getOrNull()
}
}
15 changes: 12 additions & 3 deletions src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.aSocket
import dev.datlag.k2k.Dispatcher
import dev.datlag.k2k.NetInterface
import dev.datlag.tooling.async.scopeCatching
import dev.datlag.tooling.async.suspendCatching
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.openWriteChannel
Expand All @@ -19,7 +20,9 @@ import kotlinx.coroutines.launch

internal class DiscoveryClient : AutoCloseable {
private var broadcastJob: Job? = null
private var socket = aSocket(SelectorManager(Dispatcher.IO)).udp()
private var socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).udp()
}.getOrNull()

fun broadcast(
port: Int,
Expand All @@ -41,7 +44,11 @@ internal class DiscoveryClient : AutoCloseable {
data: ByteArray
) {
suspend fun writeToSocket(address: String, port: Int) = suspendCatching {
val socketConnection = socket.connect(InetSocketAddress(address, port)) {
val useSocket = socket ?: suspendCatching {
aSocket(SelectorManager(Dispatcher.IO)).udp()
}.getOrNull()?.also { socket = it } ?: return@suspendCatching

val socketConnection = useSocket.connect(InetSocketAddress(address, port)) {
broadcast = true
reuseAddress = true
}
Expand All @@ -62,6 +69,8 @@ internal class DiscoveryClient : AutoCloseable {
override fun close() {
broadcastJob?.cancel()
broadcastJob = null
socket = aSocket(SelectorManager(Dispatcher.IO)).udp()
socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).udp()
}.getOrNull()
}
}
17 changes: 13 additions & 4 deletions src/commonMain/kotlin/dev/datlag/k2k/discover/DiscoveryServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import dev.datlag.k2k.Constants
import dev.datlag.k2k.Dispatcher
import dev.datlag.k2k.Host
import dev.datlag.k2k.NetInterface
import dev.datlag.tooling.async.scopeCatching
import dev.datlag.tooling.async.suspendCatching
import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
Expand All @@ -26,7 +27,9 @@ import kotlinx.serialization.decodeFromByteArray

internal class DiscoveryServer : AutoCloseable {
private var listenJob: Job? = null
private var socket = aSocket(SelectorManager(Dispatcher.IO)).udp()
private var socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).udp()
}.getOrNull()
internal val hosts = MutableStateFlow<ImmutableSet<Host>>(persistentSetOf())

fun listen(
Expand All @@ -47,9 +50,13 @@ internal class DiscoveryServer : AutoCloseable {
ping: Long,
filter: Regex,
hostIsClient: Boolean
) {
) = suspendCatching {
val socketAddress = InetSocketAddress(Constants.BROADCAST_SOCKET, port)
val serverSocket = socket.bind(socketAddress) {
val useSocket = socket ?: suspendCatching {
aSocket(SelectorManager(Dispatcher.IO)).udp()
}.getOrNull()?.also { socket = it } ?: return@suspendCatching

val serverSocket = useSocket.bind(socketAddress) {
broadcast = true
reuseAddress = true
}
Expand Down Expand Up @@ -90,7 +97,9 @@ internal class DiscoveryServer : AutoCloseable {
override fun close() {
listenJob?.cancel()
listenJob = null
socket = aSocket(SelectorManager(Dispatcher.IO)).udp()
socket = scopeCatching {
aSocket(SelectorManager(Dispatcher.IO)).udp()
}.getOrNull()
hosts.update { persistentSetOf() }
}
}

0 comments on commit 8d06025

Please sign in to comment.