-
Notifications
You must be signed in to change notification settings - Fork 0
Kontor
A network server and client library supporting arbitrary message types and generalized inbound/outbound channels. This project demonstrates some experimental Kotlin features, namely Coroutines and the Serialization Prototype.
See an example chat client, server and cluster node that support history of chat messages. An abridged demo is given below.
This project lays out how to:
- bridge Netty and Kotlin coroutines.
- bridge Netty and the Serialization Prototype.
A client is initialized on the classes it may serialize. It is connected ot the server on the local machine, upon connection, the user is promted for a username.
fun main(args: Array<String>) = runBlocking {
val k = KontorClient(Message::class)
await(k.start("localhost", 5000))
print("Connected to server, enter username: ")
val username = readLine()!!
Once the username is entered, the chat client is properly configured, a repeated task is launched that consumes a channel, picking only Messages and printing them.
k.inbound pick { (n, s): Message ->
println(msg)
}
Aside from that, a second task is lauched that backingReads the user's input (consoleLines
) and feeds them into the outbound messages. Once there are no lines anymore, the client is stopped.
launch(CommonPool) {
for (s in consoleLines)
k.outbound.send(Message(username, s))
k.stop()
}
With both tasks running, the main block awaits disconnection of the client and then shuts down the workers.
await(k.disconnect())
k.shutdown().join()
A data class is specified for the messages, it is serializable and appropriate methods are generated through the serialization framework.
@Serializable
data class Message(val username: String, val string: String) {
override fun toString() = "$username: $string"
}
The server is started and awaits connections on the given port. The username is also prompted and a history of chat messages is initialized.
fun main(args: Array<String>) = runBlocking {
val k = KontorServer(Message::class)
await(k.start(5000))
print("Started server, enter username: ")
val username = readLine()!!
val history = arrayListOf<Message>()
From the network management channel, all Connected
messages are chosen and handled by sending the new client the existing history. From the remaining network management messages, Disconnected
is logged.
k.network choose { c: Connected ->
for (msg in history)
c.channel.writeAndFlush(msg)
println("Connected: ${c.channel.remoteAddress()}")
} pick { d: Disconnected ->
println("Disconnected: ${d.channel.remoteAddress()}")
}
All incoming messages are sent to all clients except the sender, they are also backed in the history.
k.inbound pick { (msg, c): From<Message> ->
history += msg
println(msg)
// Loopback any message
k.outbound.sendAllExcept(msg, c)
}
The server itself can also chat. Messages are added to the histroy and sent to all clients. When no more lines are available (an empty line was entered), the server terminates.
for (s in consoleLines)
Message(username, s).apply {
history += this
k.outbound.sendAll(this)
}
await(k.stop())
k.shutdown().join()
}
A data class is specified for the history, it is serializable, just as the message class. This class will be used to store and transfer the Shared state of the cluster.
@Serializable
data class History(val messages: List<Message> = listOf()) {
operator fun plus(message: Message) =
History(messages + message)
}
The history is maintained by a mutable variable. Kontor is configured to read from and write to history
when transferring the state. Connection to a cluster is established by simply giving the cluster name to start
.
var history = History()
fun main(args: Array<String>) = runBlocking {
val k = KontorCluster(::history.toProsumer(), History::class, Message::class)
println("Joining cluster")
k.start("Chatty").join()
The previous state is requested first, the history is then printed for the user.
// Request message history
k.requestState()
for (msg in history.messages)
println(msg)
Network messages could be handled just like in the server example. This example however is abridged in that point. Incoming chat messages are handled by appending them to the history and printing them.
// Handling of messages
k.inbound pick { (msg, _): From<Message, Address> ->
history += msg
println(msg)
}
The rest is similar to the client and server examples and therefore left out.