Skip to content

Commit

Permalink
change collecting
Browse files Browse the repository at this point in the history
  • Loading branch information
DatL4g committed Jul 20, 2024
1 parent 803be57 commit ea4de6b
Showing 1 changed file with 28 additions and 22 deletions.
50 changes: 28 additions & 22 deletions src/commonMain/kotlin/dev/datlag/k2k/connect/ConnectionServer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import dev.datlag.k2k.NetInterface
import dev.datlag.tooling.async.suspendCatching
import io.ktor.network.selector.SelectorManager
import io.ktor.network.sockets.InetSocketAddress
import io.ktor.network.sockets.ServerSocket
import io.ktor.network.sockets.Socket
import io.ktor.network.sockets.aSocket
import io.ktor.network.sockets.openReadChannel
Expand All @@ -29,6 +30,7 @@ internal class ConnectionServer(
}
}

private var serverSocket: ServerSocket? = null
private var connectedSocket: Socket? = null

fun receive(
Expand All @@ -41,30 +43,41 @@ internal class ConnectionServer(
receiveJob = scope.launch(Dispatcher.IO) {
val socketAddress = InetSocketAddress(NetInterface.getLocalAddress(), port)

connectedSocket = socket.bind(socketAddress) {
serverSocket = socket.bind(socketAddress) {
reuseAddress = true
}.accept().also {
it.use { boundSocket ->
suspendCatching {
val readChannel = boundSocket.openReadChannel()
val buffer = ByteArray(readChannel.availableForRead)
while (currentCoroutineContext().isActive) {
val bytesRead = readChannel.readAvailable(buffer)
if (bytesRead <= 0) {
continue
}
}

listener(buffer)
while(currentCoroutineContext().isActive) {
connectedSocket?.close()
connectedSocket = serverSocket?.accept()?.also {
it.use { boundSocket ->
suspendCatching {
val readChannel = boundSocket.openReadChannel()
val buffer = ByteArray(readChannel.availableForRead)
while (true) {
val bytesRead = readChannel.readAvailable(buffer)
if (bytesRead <= 0) {
break
}

listener(buffer)
}
}.onFailure {
boundSocket.close()
}
}.onFailure {
boundSocket.close()
}
}
}
}
}

private fun closeSocket() {
override fun close() {
receiveJob?.cancel()
receiveJob = null

serverSocket?.close()
serverSocket = null

connectedSocket?.close()
connectedSocket = null

Expand All @@ -76,11 +89,4 @@ internal class ConnectionServer(
}
}
}

override fun close() {
receiveJob?.cancel()
receiveJob = null

closeSocket()
}
}

0 comments on commit ea4de6b

Please sign in to comment.