From 725391804a17791a52f4e0c9d2396eb2e2795705 Mon Sep 17 00:00:00 2001 From: maca88 Date: Sun, 28 Apr 2024 19:41:24 +0200 Subject: [PATCH 1/7] Added support for client results --- signalrkore/build.gradle.kts | 1 + .../signalrkore/HttpHubConnectionBuilder.kt | 3 + .../signalrkore/HubCommunication.kt | 442 ++++++++++++-- .../signalrkore/HubConnection.kt | 59 +- .../lepicekmichal/signalrkore/HubMessage.kt | 5 +- .../signalrkore/CompletableSubject.kt | 26 + .../HubConnectionReturnResultTest.kt | 547 ++++++++++++++++++ .../signalrkore/HubConnectionTest.kt | 408 +++++++++++++ .../signalrkore/MockTransport.kt | 60 ++ .../signalrkore/SingleSubject.kt | 29 + .../lepicekmichal/signalrkore/TestLogger.kt | 30 + 11 files changed, 1551 insertions(+), 59 deletions(-) create mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/CompletableSubject.kt create mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionReturnResultTest.kt create mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionTest.kt create mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/MockTransport.kt create mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/SingleSubject.kt create mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/TestLogger.kt diff --git a/signalrkore/build.gradle.kts b/signalrkore/build.gradle.kts index 7d1bc4f..12f29ff 100644 --- a/signalrkore/build.gradle.kts +++ b/signalrkore/build.gradle.kts @@ -61,6 +61,7 @@ kotlin { val commonTest by getting { dependencies { implementation(kotlin("test")) + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.0") } } val jvmMain by getting { diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HttpHubConnectionBuilder.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HttpHubConnectionBuilder.kt index 18f633b..f222d9b 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HttpHubConnectionBuilder.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HttpHubConnectionBuilder.kt @@ -12,6 +12,8 @@ class HttpHubConnectionBuilder(private val url: String) { */ var transportEnum: TransportEnum = TransportEnum.All + internal var transport: Transport? = null + /** * The [HttpClient] to be used by the [eu.lepicekmichal.signalrkore.HubConnection] */ @@ -72,6 +74,7 @@ class HttpHubConnectionBuilder(private val url: String) { if (::protocol.isInitialized) protocol else JsonHubProtocol(logger), handshakeResponseTimeout, headers.toMap(), + transport, transportEnum, json, logger, diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt index 3bf8910..f71f415 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt @@ -4,7 +4,9 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.onSubscription import kotlinx.serialization.json.JsonElement import kotlin.reflect.KClass @@ -12,12 +14,20 @@ abstract class HubCommunication { protected abstract val receivedInvocations: SharedFlow + internal val subscribersWithResult: MutableMap = mutableMapOf() + protected abstract val logger: Logger protected abstract fun T.toJson(kClass: KClass): JsonElement protected abstract fun JsonElement.fromJson(kClass: KClass): T + protected abstract suspend fun handleInvocation( + message: HubMessage.Invocation, + resultType: KClass, + callback: suspend () -> TResult + ) where TResult : Any + abstract fun send(method: String, args: List, uploadStreams: List> = emptyList()) abstract suspend fun invoke(method: String, args: List, uploadStreams: List> = emptyList()) @@ -15727,21 +15737,40 @@ abstract class HubCommunication { ) } + suspend fun on(target: String, callback: () -> Unit) { + on(target) + .collect { + handleInvocation(it, Unit::class) { callback() } + } + } + suspend fun on(target: String, param1: KClass, callback: (T1) -> Unit) where T1 : Any { on(target) - .collect { callback(it.arguments[0].fromJson(param1)) } + .collect { + handleInvocation(it, Unit::class) { + callback(it.arguments[0].fromJson(param1)) + } + } } + suspend inline fun on(target: String, noinline callback: (T1) -> Unit) where T1: Any = + on(target, T1::class, callback) + suspend fun on(target: String, param1: KClass, param2: KClass, callback: (T1, T2) -> Unit) where T1 : Any, T2 : Any { on(target) .collect { - callback( - it.arguments[0].fromJson(param1), - it.arguments[1].fromJson(param2), - ) + handleInvocation(it, Unit::class) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + ) + } } } + suspend inline fun on(target: String, noinline callback: (T1, T2) -> Unit) where T1: Any, T2: Any = + on(target, T1::class, T2::class, callback) + suspend fun on( target: String, param1: KClass, @@ -15751,13 +15780,21 @@ abstract class HubCommunication { ) where T1 : Any, T2 : Any, T3 : Any { on(target) .collect { - callback( - it.arguments[0].fromJson(param1), - it.arguments[1].fromJson(param2), - it.arguments[2].fromJson(param3), - ) + handleInvocation(it, Unit::class) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + ) + } } } + + suspend inline fun on( + target: String, + noinline callback: (T1, T2, T3) -> Unit + ) where T1: Any, T2: Any, T3: Any = + on(target, T1::class, T2::class, T3::class, callback) suspend fun on( target: String, @@ -15769,15 +15806,23 @@ abstract class HubCommunication { ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any { on(target) .collect { - callback( - it.arguments[0].fromJson(param1), - it.arguments[1].fromJson(param2), - it.arguments[2].fromJson(param3), - it.arguments[3].fromJson(param4), - ) + handleInvocation(it, Unit::class) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4), + ) + } } } + suspend inline fun on( + target: String, + noinline callback: (T1, T2, T3, T4) -> Unit + ) where T1: Any, T2: Any, T3: Any, T4: Any = + on(target, T1::class, T2::class, T3::class, T4::class, callback) + suspend fun on( target: String, param1: KClass, @@ -15789,16 +15834,24 @@ abstract class HubCommunication { ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any { on(target) .collect { - callback( - it.arguments[0].fromJson(param1), - it.arguments[1].fromJson(param2), - it.arguments[2].fromJson(param3), - it.arguments[3].fromJson(param4), - it.arguments[4].fromJson(param5), - ) + handleInvocation(it, Unit::class) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4), + it.arguments[4].fromJson(param5), + ) + } } } + suspend inline fun on( + target: String, + noinline callback: (T1, T2, T3, T4, T5) -> Unit + ) where T1: Any, T2: Any, T3: Any, T4: Any, T5: Any = + on(target, T1::class, T2::class, T3::class, T4::class, T5::class, callback) + suspend fun on( target: String, param1: KClass, @@ -15811,17 +15864,25 @@ abstract class HubCommunication { ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any { on(target) .collect { - callback( - it.arguments[0].fromJson(param1), - it.arguments[1].fromJson(param2), - it.arguments[2].fromJson(param3), - it.arguments[3].fromJson(param4), - it.arguments[4].fromJson(param5), - it.arguments[5].fromJson(param6), - ) + handleInvocation(it, Unit::class) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4), + it.arguments[4].fromJson(param5), + it.arguments[5].fromJson(param6), + ) + } } } + suspend inline fun on( + target: String, + noinline callback: (T1, T2, T3, T4, T5, T6) -> Unit + ) where T1: Any, T2: Any, T3: Any, T4: Any, T5: Any, T6: Any = + on(target, T1::class, T2::class, T3::class, T4::class, T5::class, T6::class, callback) + suspend fun on( target: String, param1: KClass, @@ -15835,18 +15896,26 @@ abstract class HubCommunication { ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any { on(target) .collect { - callback( - it.arguments[0].fromJson(param1), - it.arguments[1].fromJson(param2), - it.arguments[2].fromJson(param3), - it.arguments[3].fromJson(param4), - it.arguments[4].fromJson(param5), - it.arguments[5].fromJson(param6), - it.arguments[6].fromJson(param7), - ) + handleInvocation(it, Unit::class) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4), + it.arguments[4].fromJson(param5), + it.arguments[5].fromJson(param6), + it.arguments[6].fromJson(param7), + ) + } } } + suspend inline fun on( + target: String, + noinline callback: (T1, T2, T3, T4, T5, T6, T7) -> Unit + ) where T1: Any, T2: Any, T3: Any, T4: Any, T5: Any, T6: Any, T7: Any = + on(target, T1::class, T2::class, T3::class, T4::class, T5::class, T6::class, T7::class, callback) + suspend fun on( target: String, param1: KClass, @@ -15861,19 +15930,292 @@ abstract class HubCommunication { ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any { on(target) .collect { - callback( - it.arguments[0].fromJson(param1), - it.arguments[1].fromJson(param2), - it.arguments[2].fromJson(param3), - it.arguments[3].fromJson(param4), - it.arguments[4].fromJson(param5), - it.arguments[5].fromJson(param6), - it.arguments[6].fromJson(param7), - it.arguments[7].fromJson(param8), - ) + handleInvocation(it, Unit::class) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4), + it.arguments[4].fromJson(param5), + it.arguments[5].fromJson(param6), + it.arguments[6].fromJson(param7), + it.arguments[7].fromJson(param8), + ) + } + } + } + + suspend inline fun on( + target: String, + noinline callback: (T1, T2, T3, T4, T5, T6, T7, T8) -> Unit + ) where T1: Any, T2: Any, T3: Any, T4: Any, T5: Any, T6: Any, T7: Any, T8: Any = + on(target, T1::class, T2::class, T3::class, T4::class, T5::class, T6::class, T7::class, T8::class, callback) + + private fun onWithResult(target: String): Flow { + if (subscribersWithResult[target] == true) { + throw RuntimeException("'$target' already has a value returning handler. Multiple return values are not supported.") + } + + return receivedInvocations + .onSubscription { + subscribersWithResult[target] = true + } + .onCompletion { + subscribersWithResult[target] = false + } + .filter { it.target == target } + .onEach { logger.log(Logger.Level.INFO, "Received invocation: $it") } + } + + suspend fun onWithResult( + target: String, + resultType: KClass, + callback: suspend () -> TResult + ) where TResult: Any { + onWithResult(target) + .collect { + handleInvocation(it, resultType) { + callback() + } + } + } + + suspend inline fun onWithResult( + target: String, + noinline callback: suspend () -> TResult + ) where TResult: Any = + onWithResult(target, TResult::class, callback) + + suspend fun onWithResult( + target: String, + param1: KClass, + resultType: KClass, + callback: suspend (T1) -> TResult + ) where T1 : Any, TResult: Any { + onWithResult(target) + .collect { + handleInvocation(it, resultType) { + callback(it.arguments[0].fromJson(param1)) + } + } + } + + suspend inline fun onWithResult( + target: String, + noinline callback: suspend (T1) -> TResult + ) where T1 : Any, TResult: Any = + onWithResult(target, T1::class, TResult::class, callback) + + suspend fun onWithResult( + target: String, + param1: KClass, + param2: KClass, + resultType: KClass, + callback: suspend (T1, T2) -> TResult + ) where T1 : Any, T2 : Any, TResult: Any { + onWithResult(target) + .collect { + handleInvocation(it, resultType) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2) + ) + } + } + } + + suspend inline fun onWithResult( + target: String, + noinline callback: suspend (T1, T2) -> TResult + ) where T1 : Any, T2 : Any, TResult: Any = + onWithResult(target, T1::class, T2::class, TResult::class, callback) + + suspend fun onWithResult( + target: String, + param1: KClass, + param2: KClass, + param3: KClass, + resultType: KClass, + callback: suspend (T1, T2, T3) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, TResult: Any { + onWithResult(target) + .collect { + handleInvocation(it, resultType) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3) + ) + } + } + } + + suspend inline fun onWithResult( + target: String, + noinline callback: suspend (T1, T2, T3) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, TResult: Any = + onWithResult(target, T1::class, T2::class, T3::class, TResult::class, callback) + + suspend fun onWithResult( + target: String, + param1: KClass, + param2: KClass, + param3: KClass, + param4: KClass, + resultType: KClass, + callback: suspend (T1, T2, T3, T4) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, TResult: Any { + onWithResult(target) + .collect { + handleInvocation(it, resultType) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4) + ) + } + } + } + + suspend inline fun onWithResult( + target: String, + noinline callback: suspend (T1, T2, T3, T4) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, TResult: Any = + onWithResult(target, T1::class, T2::class, T3::class, T4::class, TResult::class, callback) + + suspend fun onWithResult( + target: String, + param1: KClass, + param2: KClass, + param3: KClass, + param4: KClass, + param5: KClass, + resultType: KClass, + callback: suspend (T1, T2, T3, T4, T5) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, TResult: Any { + onWithResult(target) + .collect { + handleInvocation(it, resultType) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4), + it.arguments[4].fromJson(param5) + ) + } + } + } + + suspend inline fun onWithResult( + target: String, + noinline callback: suspend (T1, T2, T3, T4, T5) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, TResult: Any = + onWithResult(target, T1::class, T2::class, T3::class, T4::class, T5::class, TResult::class, callback) + + suspend fun onWithResult( + target: String, + param1: KClass, + param2: KClass, + param3: KClass, + param4: KClass, + param5: KClass, + param6: KClass, + resultType: KClass, + callback: suspend (T1, T2, T3, T4, T5, T6) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, TResult: Any { + onWithResult(target) + .collect { + handleInvocation(it, resultType) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4), + it.arguments[4].fromJson(param5), + it.arguments[5].fromJson(param6) + ) + } + } + } + + suspend inline fun onWithResult( + target: String, + noinline callback: suspend (T1, T2, T3, T4, T5, T6) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, TResult: Any = + onWithResult(target, T1::class, T2::class, T3::class, T4::class, T5::class, T6::class, TResult::class, callback) + + suspend fun onWithResult( + target: String, + param1: KClass, + param2: KClass, + param3: KClass, + param4: KClass, + param5: KClass, + param6: KClass, + param7: KClass, + resultType: KClass, + callback: suspend (T1, T2, T3, T4, T5, T6, T7) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, TResult: Any { + onWithResult(target) + .collect { + handleInvocation(it, resultType) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4), + it.arguments[4].fromJson(param5), + it.arguments[5].fromJson(param6), + it.arguments[6].fromJson(param7) + ) + } } } + suspend inline fun onWithResult( + target: String, + noinline callback: suspend (T1, T2, T3, T4, T5, T6, T7) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, TResult: Any = + onWithResult(target, T1::class, T2::class, T3::class, T4::class, T5::class, T6::class, T7::class, TResult::class, callback) + + suspend fun onWithResult( + target: String, + param1: KClass, + param2: KClass, + param3: KClass, + param4: KClass, + param5: KClass, + param6: KClass, + param7: KClass, + param8: KClass, + resultType: KClass, + callback: suspend (T1, T2, T3, T4, T5, T6, T7, T8) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, TResult: Any { + onWithResult(target) + .collect { + handleInvocation(it, resultType) { + callback( + it.arguments[0].fromJson(param1), + it.arguments[1].fromJson(param2), + it.arguments[2].fromJson(param3), + it.arguments[3].fromJson(param4), + it.arguments[4].fromJson(param5), + it.arguments[5].fromJson(param6), + it.arguments[6].fromJson(param7), + it.arguments[7].fromJson(param8) + ) + } + } + } + + suspend inline fun onWithResult( + target: String, + noinline callback: suspend (T1, T2, T3, T4, T5, T6, T7, T8) -> TResult + ) where T1 : Any, T2 : Any, T3 : Any, T4 : Any, T5 : Any, T6 : Any, T7 : Any, T8 : Any, TResult: Any = + onWithResult(target, T1::class, T2::class, T3::class, T4::class, T5::class, T6::class, T7::class, T8::class, TResult::class, callback) + fun stream(itemType: KClass, method: String): Flow = stream( itemType = itemType, diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt index 58680f9..21dc42a 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt @@ -6,7 +6,6 @@ import eu.lepicekmichal.signalrkore.transports.WebSocketTransport import io.ktor.client.* import io.ktor.client.call.* import io.ktor.client.plugins.* -import io.ktor.client.plugins.api.createClientPlugin import io.ktor.client.plugins.contentnegotiation.* import io.ktor.client.plugins.websocket.* import io.ktor.client.request.* @@ -112,6 +111,7 @@ class HubConnection private constructor( protocol: HubProtocol, handshakeResponseTimeout: Duration, headers: Map, + transport: Transport?, transportEnum: TransportEnum, json: Json, logger: Logger, @@ -130,7 +130,11 @@ class HubConnection private constructor( automaticReconnect = automaticReconnect, json = json, logger = logger, - ) + ) { + if (transport != null) { + this.transport = transport + } + } suspend fun start(reconnectionAttempt: Boolean = false) { if (connectionState.value != HubConnectionState.DISCONNECTED && connectionState.value != HubConnectionState.RECONNECTING) return @@ -158,10 +162,12 @@ class HubConnection private constructor( Negotiation(TransportEnum.WebSockets, baseUrl) } - transport = when (negotiationTransport) { - TransportEnum.LongPolling -> LongPollingTransport(headers, httpClient) - TransportEnum.ServerSentEvents -> ServerSentEventsTransport(headers, httpClient) - else -> WebSocketTransport(headers, httpClient) + if (!::transport.isInitialized) { + transport = when (negotiationTransport) { + TransportEnum.LongPolling -> LongPollingTransport(headers, httpClient) + TransportEnum.ServerSentEvents -> ServerSentEventsTransport(headers, httpClient) + else -> WebSocketTransport(headers, httpClient) + } } try { @@ -391,7 +397,7 @@ class HubConnection private constructor( if (message.allowReconnect && automaticReconnect !is AutomaticReconnect.Inactive) reconnect(message.error) else stop(message.error) - is HubMessage.Invocation -> receivedInvocations.emit(message) + is HubMessage.Invocation -> processReceivedInvocation(message) is HubMessage.Ping -> Unit is HubMessage.CancelInvocation -> Unit // this should not happen according to standard is HubMessage.StreamItem -> receivedStreamItems.emit(message) @@ -400,12 +406,51 @@ class HubConnection private constructor( } } + private suspend fun processReceivedInvocation(message: HubMessage.Invocation) { + if (message is HubMessage.Invocation.Blocking && message.invocationId != null && subscribersWithResult[message.target] != true) { + logger.log(Logger.Level.ERROR, "Failed to find a value returning handler for ${message.target} method. Sending error to server.") + + val resultMessage = HubMessage.Completion.Error(invocationId = message.invocationId, error = "Client did not provide a result.") + sendHubMessage(resultMessage) + } + + receivedInvocations.emit(message) + } + @OptIn(InternalSerializationApi::class) override fun T.toJson(kClass: KClass): JsonElement = json.encodeToJsonElement(kClass.serializer(), this) @OptIn(InternalSerializationApi::class) override fun JsonElement.fromJson(kClass: KClass): T = json.decodeFromJsonElement(kClass.serializer(), this) + override suspend fun handleInvocation( + message: HubMessage.Invocation, + resultType: KClass, + callback: suspend () -> TResult + ) where TResult : Any { + val invocationId = if (message is HubMessage.Invocation.Blocking) message.invocationId else null + var resultMessage: HubMessage.Completion? = null + + try { + val result = callback() + if (invocationId != null) { + resultMessage = HubMessage.Completion.Resulted(invocationId = invocationId, result = result.toJson(resultType)) + } + } catch (ex: Exception) { + logger.log(Logger.Level.ERROR, "Invoking client side method '${message.target}' failed: $ex") + if (invocationId != null) { + resultMessage = HubMessage.Completion.Error(invocationId = invocationId, error = ex.message.orEmpty()) + } + } + + if (resultMessage != null) { + connectedCheck("handleInvocation") + sendHubMessage(resultMessage) + } else if (resultType != Unit::class) { + logger.log(Logger.Level.ERROR, "Result given for '${message.target}' method but server is not expecting a result.") + } + } + override fun send(method: String, args: List, uploadStreams: List>) { connectedCheck("send") diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubMessage.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubMessage.kt index 7557d90..1adbf62 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubMessage.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubMessage.kt @@ -30,7 +30,7 @@ sealed class HubMessage { data class Blocking( override val target: String, override val arguments: List, - val invocationId: String, + val invocationId: String?, override val streamIds: List? = null, ) : Invocation() { @@ -116,9 +116,10 @@ sealed class HubMessage { return when (val type = jsonObject["type"]?.jsonPrimitive?.int) { HubMessageType.INVOCATION.value -> when { jsonObject["streamIds"]?.jsonArray != null -> Invocation.Blocking.serializer() - jsonObject["invocationId"]?.jsonPrimitive?.contentOrNull != null -> Invocation.Streaming.serializer() + jsonObject["invocationId"]?.jsonPrimitive?.contentOrNull != null -> Invocation.Blocking.serializer() else -> Invocation.NonBlocking.serializer() } + HubMessageType.STREAM_INVOCATION.value -> Invocation.Streaming.serializer() HubMessageType.PING.value -> Ping.serializer() HubMessageType.CLOSE.value -> Close.serializer() HubMessageType.STREAM_ITEM.value -> StreamItem.serializer() diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/CompletableSubject.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/CompletableSubject.kt new file mode 100644 index 0000000..4fc3d0f --- /dev/null +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/CompletableSubject.kt @@ -0,0 +1,26 @@ +package eu.lepicekmichal.signalrkore + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.timeout +import kotlinx.coroutines.withContext +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +class CompletableSubject { + private val stateFlow = MutableStateFlow(false) + + suspend fun waitForCompletion(timeout: Duration = 30.seconds) = withContext(Dispatchers.Default) { + stateFlow.filter { it }.timeout(timeout).first() + } + + fun reset() { + stateFlow.value = false + } + + fun complete() { + stateFlow.value = true + } +} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionReturnResultTest.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionReturnResultTest.kt new file mode 100644 index 0000000..1fa269f --- /dev/null +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionReturnResultTest.kt @@ -0,0 +1,547 @@ +package eu.lepicekmichal.signalrkore + +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertContentEquals +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class HubConnectionReturnResultTest { + + @Test + fun `handler with no params should return a value`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + var called = false + val job = launch { + hubConnection.onWithResult("inc") { + called = true + 10 + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":10}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals(true, called) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with no return value should report an error`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val nonResultCalled = CompletableSubject() + val job = launch { + hubConnection.on("inc") { + nonResultCalled.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Client did not provide a result.\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + nonResultCalled.waitForCompletion() + + job.cancel() + hubConnection.stop() + } + + @Test + fun `missing handler with return value should report an error`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Client did not provide a result.\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + + hubConnection.stop() + } + + @Test + fun `faulty handler should return an error`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + val job = launch { + hubConnection.onWithResult("inc") { + throw RuntimeException("Custom error.") + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Custom error.\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `registering multiple handler should raise an exception`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + val job = launch { + hubConnection.onWithResult("inc") { "value" } + } + + testScheduler.advanceUntilIdle() + + val exception = assertFailsWith { + hubConnection.onWithResult("inc") { "value2" } + } + assertEquals("'inc' already has a value returning handler. Multiple return values are not supported.", exception.message) + + job.cancel() + } + + + @Test + fun `handler with result should log if server does not expect a return value`() = runTest { + val mockTransport = MockTransport() + val testLogger = TestLogger() + val hubConnection = createHubConnection(mockTransport) { + logger = testLogger + } + val nonResultCalled = CompletableSubject() + + val job = launch { + hubConnection.onWithResult("m") { 42 } + } + + val job2 = launch { + hubConnection.on("fin") { + nonResultCalled.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"m\",\"arguments\":[]}$RECORD_SEPARATOR") + // send another invocation message and wait for it to be processed to make sure the first invocation was processed + mockTransport.receiveMessage("{\"type\":1,\"target\":\"fin\",\"arguments\":[]}$RECORD_SEPARATOR") + + nonResultCalled.waitForCompletion() + + testLogger.assertLogEquals("Result given for 'm' method but server is not expecting a result.") + + job.cancel() + job2.cancel() + hubConnection.stop() + } + + @Test + fun `handler with one param should return a value`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + var calledWith: String? = null + val job = launch { + hubConnection.onWithResult("inc") { + calledWith = it + 10 + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":10}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("1", calledWith) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with two params should return a value`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + var calledWith: String? = null + var calledWith2: Int? = null + val job = launch { + hubConnection.onWithResult("inc") { p1, p2 -> + calledWith = p1 + calledWith2 = p2 + "bob" + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\", 13]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with three params should return a value`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + val job = launch { + hubConnection.onWithResult("inc") { p1, p2, p3 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + "bob" + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3]]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with four params should return a value`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + val job = launch { + hubConnection.onWithResult("inc") { p1, p2, p3, p4 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + "bob" + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with five params should return a value`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + val job = launch { + hubConnection.onWithResult("inc") { p1, p2, p3, p4, p5 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + "bob" + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\"]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("t", calledWith5) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with six params should return a value`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + val job = launch { + hubConnection.onWithResult("inc") { p1, p2, p3, p4, p5, p6 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + "bob" + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("t", calledWith5) + assertEquals(1.5, calledWith6) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with seven params should return a value`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + var calledWith7: String? = null + val job = launch { + hubConnection.onWithResult("inc") { p1, p2, p3, p4, p5, p6, p7 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + calledWith7 = p7 + "bob" + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5,\"h\"]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("t", calledWith5) + assertEquals(1.5, calledWith6) + assertEquals("h", calledWith7) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with eight params should return a value`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + var calledWith7: String? = null + var calledWith8: Int? = null + val job = launch { + hubConnection.onWithResult("inc") { p1, p2, p3, p4, p5, p6, p7, p8 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + calledWith7 = p7 + calledWith8 = p8 + "bob" + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5,\"h\",33]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("t", calledWith5) + assertEquals(1.5, calledWith6) + assertEquals("h", calledWith7) + assertEquals(33, calledWith8) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with return value should not block other handlers`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val resultCalled = CompletableSubject() + val nonResultCalled = CompletableSubject() + val completeResult = CompletableSubject() + + val job = launch { + hubConnection.onWithResult("inc") { + resultCalled.complete() + completeResult.waitForCompletion() + "bob" + } + } + + val job2 = launch { + hubConnection.on("inc2") { + nonResultCalled.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") + resultCalled.waitForCompletion() + + // Send an non-result invocation and make sure it's processed even with a blocking result invocation + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc2\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") + nonResultCalled.waitForCompletion() + + completeResult.complete() + + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" + assertEquals(expected, response) + + job.cancel() + job2.cancel() + hubConnection.stop() + } + + @Test + fun `handler should return an error if cannot parse argument`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + + val job = launch { + hubConnection.onWithResult("inc") { "bob" } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + val sentMessage = mockTransport.nextSentMessage + mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"not int\"]}$RECORD_SEPARATOR") + + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Failed to parse literal as 'int' value\\nJSON input: \\\"not int\\\"\"}$RECORD_SEPARATOR" + assertEquals(expected, response) + + job.cancel() + hubConnection.stop() + } + + private fun createHubConnection(customTransport: Transport, block: HttpHubConnectionBuilder.() -> Unit = {}): HubConnection { + val builder = HttpHubConnectionBuilder("http://example.com").apply { + transport = customTransport + skipNegotiate = true + transportEnum = TransportEnum.WebSockets + } + builder.apply(block) + + return builder.build() + } +} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionTest.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionTest.kt new file mode 100644 index 0000000..a644ab5 --- /dev/null +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionTest.kt @@ -0,0 +1,408 @@ +package eu.lepicekmichal.signalrkore + +import kotlinx.coroutines.launch +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertContentEquals +import kotlin.test.assertEquals + +class HubConnectionTest { + + @Test + fun `registering multiple handlers with parameter should all be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + val value = SingleSubject() + + val action: (Double) -> Unit = { + launch { + value.setResult((value.result ?: 0.0) + it) + if (value.result == 24.0) { + completable.complete() + } + } + } + + val job = launch { hubConnection.on("add", action) } + val job2 = launch { hubConnection.on("add", action) } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"add\",\"arguments\":[12]}$RECORD_SEPARATOR") + + // Confirming that our handler was called and the correct message was passed in. + completable.waitForCompletion() + assertEquals(24.0, value.result) + + job.cancel() + job2.cancel() + hubConnection.stop() + } + + @Test + fun `handler without param should be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + + val job = launch { + hubConnection.on("inc") { + completable.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with one param should be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + + var calledWith: String? = null + val job = launch { + hubConnection.on("inc") { + calledWith = it + completable.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("1", calledWith) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with two params should be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + + var calledWith: String? = null + var calledWith2: Int? = null + val job = launch { + hubConnection.on("inc") { p1, p2 -> + calledWith = p1 + calledWith2 = p2 + completable.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with three params should be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + val job = launch { + hubConnection.on("inc") { p1, p2, p3 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + completable.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3]]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with four params should be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + val job = launch { + hubConnection.on("inc") { p1, p2, p3, p4 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + completable.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with five params should be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + val job = launch { + hubConnection.on("inc") { p1, p2, p3, p4, p5 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + completable.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\"]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("t", calledWith5) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with six params should be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + val job = launch { + hubConnection.on("inc") { p1, p2, p3, p4, p5, p6 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + completable.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("t", calledWith5) + assertEquals(1.5, calledWith6) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with seven params should be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + var calledWith7: String? = null + val job = launch { + hubConnection.on("inc") { p1, p2, p3, p4, p5, p6, p7 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + calledWith7 = p7 + completable.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5,\"h\"]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("t", calledWith5) + assertEquals(1.5, calledWith6) + assertEquals("h", calledWith7) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `handler with eight params should be triggered`() = runTest { + val mockTransport = MockTransport() + val hubConnection = createHubConnection(mockTransport) + val completable = CompletableSubject() + + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + var calledWith7: String? = null + var calledWith8: Int? = null + val job = launch { + hubConnection.on("inc") { p1, p2, p3, p4, p5, p6, p7, p8 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + calledWith7 = p7 + calledWith8 = p8 + completable.complete() + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5,\"h\",33]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("1", calledWith) + assertEquals(13, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("t", calledWith5) + assertEquals(1.5, calledWith6) + assertEquals("h", calledWith7) + assertEquals(33, calledWith8) + + job.cancel() + hubConnection.stop() + } + + @Test + fun `throwing from one handler should not stop triggering other handlers`() = runTest { + val mockTransport = MockTransport() + val testLogger = TestLogger() + val hubConnection = createHubConnection(mockTransport) { + logger = testLogger + } + val value1 = SingleSubject() + val value2 = SingleSubject() + + val job = launch { + hubConnection.on("inc") { p1 -> + value1.setResult(p1) + throw RuntimeException("throw from on handler") + } + } + + val job2 = launch { + hubConnection.on("inc") { p1 -> + value2.setResult(p1) + } + } + + testScheduler.advanceUntilIdle() + + hubConnection.start() + mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\"]}$RECORD_SEPARATOR") + + // Confirming that our handler was called and the correct message was passed in. + assertEquals("Hello World", value1.waitForResult()) + assertEquals("Hello World", value2.waitForResult()) + + val log = testLogger.assertLogEquals("Invoking client side method 'inc' failed: java.lang.RuntimeException: throw from on handler") + assertEquals(Logger.Level.ERROR, log.level) + + job.cancel() + job2.cancel() + hubConnection.stop() + } + + private fun createHubConnection(customTransport: Transport, block: HttpHubConnectionBuilder.() -> Unit = {}): HubConnection { + val builder = HttpHubConnectionBuilder("http://example.com").apply { + transport = customTransport + skipNegotiate = true + transportEnum = TransportEnum.WebSockets + } + builder.apply(block) + + return builder.build() + } +} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/MockTransport.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/MockTransport.kt new file mode 100644 index 0000000..5db12cd --- /dev/null +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/MockTransport.kt @@ -0,0 +1,60 @@ +package eu.lepicekmichal.signalrkore + +import io.ktor.utils.io.core.toByteArray +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.IO +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.launch + +class MockTransport( + private val ignorePings: Boolean = true, + private val autoHandshake: Boolean = true +) : Transport { + + private val receivedMessages: MutableSharedFlow = MutableSharedFlow() + + var nextSentMessage: SingleSubject = SingleSubject() + private set + + lateinit var url: String + private set + + override suspend fun start(url: String) { + this.url = url + if (autoHandshake) { + CoroutineScope(Dispatchers.IO).launch { + receivedMessages.subscriptionCount.filter { it > 0 }.first() + receivedMessages.emit("{}$RECORD_SEPARATOR") + } + } + } + + override suspend fun send(message: ByteArray) { + if (!ignorePings || !isPing(message)) { + nextSentMessage.setResult(io.ktor.utils.io.core.String(message)) + nextSentMessage = SingleSubject() + } + } + + override fun receive(): Flow = receivedMessages.map { it.toByteArray() } + + override suspend fun stop() { } + + suspend fun receiveMessage(message: String, waitForSubscriber: Boolean = true) { + if (waitForSubscriber) { + receivedMessages.subscriptionCount.filter { it > 0 }.first() + } + + receivedMessages.emit(message) + } + + private fun isPing(message: ByteArray): Boolean { + return io.ktor.utils.io.core.String(message) == "{\"type\":6}$RECORD_SEPARATOR" || + (message[0].toInt() == 2 && message[1].toInt() == -111 && message[2].toInt() == 6) + } +} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/SingleSubject.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/SingleSubject.kt new file mode 100644 index 0000000..6d69d72 --- /dev/null +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/SingleSubject.kt @@ -0,0 +1,29 @@ +package eu.lepicekmichal.signalrkore + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.flow.timeout +import kotlinx.coroutines.withContext +import kotlin.time.Duration +import kotlin.time.Duration.Companion.seconds + +class SingleSubject { + private val stateFlow = MutableStateFlow(null) + + val result get() = stateFlow.value + + suspend fun waitForResult(timeout: Duration = 30.seconds) : T = + withContext(Dispatchers.Default) { + stateFlow.filter { it != null }.timeout(timeout).first() as T + } + + fun reset() { + stateFlow.value = null + } + + fun setResult(value: T) { + stateFlow.value = value + } +} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/TestLogger.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/TestLogger.kt new file mode 100644 index 0000000..3aa9b09 --- /dev/null +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/TestLogger.kt @@ -0,0 +1,30 @@ +package eu.lepicekmichal.signalrkore + +import kotlin.test.assertNotNull + +data class LogEvent(val level: Logger.Level, val message: String) + +class TestLogger : Logger { + + private val logs: MutableList = mutableListOf() + + override fun log(level: Logger.Level, message: String) { + logs.add(LogEvent(level, message)) + } + + fun assertLogEquals(message: String): LogEvent { + val log = logs.firstOrNull { it.message == message } + + assertNotNull(log, "Log message '$message' not found") + + return log + } + + fun assertLogContains(message: String): LogEvent { + val log = logs.firstOrNull { it.message.contains(message) } + + assertNotNull(log, "Log message containing '$message' not found") + + return log + } +} \ No newline at end of file From 274ac1976b47f56e4ee9fb95a81a903a74e1a19a Mon Sep 17 00:00:00 2001 From: maca88 Date: Tue, 30 Apr 2024 15:02:18 +0200 Subject: [PATCH 2/7] removed unneeded code --- .../signalrkore/HubCommunication.kt | 24 ++++--------------- .../signalrkore/HubConnection.kt | 2 +- 2 files changed, 5 insertions(+), 21 deletions(-) diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt index 749aeed..390768d 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt @@ -15651,30 +15651,14 @@ abstract class HubCommunication { return receivedInvocations .run { - if (!hasResult) { - this - .filter { it.target == target } - .onEach { - if (it is HubMessage.Invocation.Blocking) { - logger.log( - level = Logger.Level.WARNING, - message = "There is no result provider for ${it.target} despite server expecting it.", - ) - complete( - HubMessage.Completion.Error( - invocationId = it.invocationId, - error = "Client did not provide a result." - ), - ) - } - } - } else { - this + when (hasResult) { + true -> this .onSubscription { resultProviderRegistry.add(target) } .onCompletion { resultProviderRegistry.remove(target) } - .filter { it.target == target } + else -> this } } + .filter { it.target == target } .onEach { logger.log(Logger.Level.INFO, "Received invocation: $it") } } diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt index 74ba569..0b5752c 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt @@ -409,7 +409,7 @@ class HubConnection private constructor( is HubMessage.Invocation -> { if (message is HubMessage.Invocation.Blocking && !resultProviderRegistry.contains(message.target)) { - logger.log(Logger.Level.ERROR, "Failed to find a value returning handler for ${message.target} method. Sending error to server.") + logger.log(Logger.Level.ERROR, "There is no result provider for '${message.target}' despite server expecting it.") complete(HubMessage.Completion.Error( invocationId = message.invocationId, From 5be5301f51d61b5cb92fa079635358b3db518c65 Mon Sep 17 00:00:00 2001 From: maca88 Date: Wed, 1 May 2024 01:01:09 +0200 Subject: [PATCH 3/7] Added missing inline function with no parameters --- .../eu/lepicekmichal/signalrkore/HubCommunication.kt | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt index 390768d..08c2cdf 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt @@ -15818,6 +15818,9 @@ abstract class HubCommunication { } } + suspend inline fun on(target: String, noinline callback: suspend () -> T) where T: Any = + on(target, T::class, callback) + suspend inline fun on(target: String, noinline callback: suspend (T1) -> T) where T: Any, T1: Any = on(target, T::class, T1::class, callback) @@ -16059,14 +16062,6 @@ abstract class HubCommunication { ) where T: Any, T1: Any, T2: Any, T3: Any, T4: Any, T5: Any, T6: Any, T7: Any, T8: Any = on(target, T::class, T1::class, T2::class, T3::class, T4::class, T5::class, T6::class, T7::class, T8::class, callback) - suspend fun on(target: String, callback: suspend () -> Unit) { - on( - target = target, - resultType = Unit::class, - callback = callback, - ) - } - suspend fun on(target: String, param1: KClass, callback: suspend (T1) -> Unit) where T1 : Any { on( target = target, From 2e83b4e50079707cbbb3e36926ac24239541f992 Mon Sep 17 00:00:00 2001 From: maca88 Date: Wed, 1 May 2024 01:18:36 +0200 Subject: [PATCH 4/7] Revert "Added missing inline function with no parameters" This reverts commit 5be5301f51d61b5cb92fa079635358b3db518c65. --- .../eu/lepicekmichal/signalrkore/HubCommunication.kt | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt index 08c2cdf..390768d 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt @@ -15818,9 +15818,6 @@ abstract class HubCommunication { } } - suspend inline fun on(target: String, noinline callback: suspend () -> T) where T: Any = - on(target, T::class, callback) - suspend inline fun on(target: String, noinline callback: suspend (T1) -> T) where T: Any, T1: Any = on(target, T::class, T1::class, callback) @@ -16062,6 +16059,14 @@ abstract class HubCommunication { ) where T: Any, T1: Any, T2: Any, T3: Any, T4: Any, T5: Any, T6: Any, T7: Any, T8: Any = on(target, T::class, T1::class, T2::class, T3::class, T4::class, T5::class, T6::class, T7::class, T8::class, callback) + suspend fun on(target: String, callback: suspend () -> Unit) { + on( + target = target, + resultType = Unit::class, + callback = callback, + ) + } + suspend fun on(target: String, param1: KClass, callback: suspend (T1) -> Unit) where T1 : Any { on( target = target, From ded6136f46a3320d06759a067fb3c073bbb19520 Mon Sep 17 00:00:00 2001 From: maca88 Date: Wed, 1 May 2024 01:53:52 +0200 Subject: [PATCH 5/7] Added required inline on functions to work with no parameters --- .../kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt index 390768d..913e1c6 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunication.kt @@ -10,6 +10,12 @@ import kotlinx.coroutines.flow.onSubscription import kotlinx.serialization.json.JsonElement import kotlin.reflect.KClass +suspend inline fun HubCommunication.on(target: String, noinline callback: suspend () -> T) where T: Any = + on(target, T::class, callback) + +suspend inline fun HubCommunication.on(target: String, noinline callback: () -> T) where T: Any = + on(target, T::class, callback) + abstract class HubCommunication { protected abstract val receivedInvocations: SharedFlow From 6c81385808ee22466b6b1e23217c7641872bfe86 Mon Sep 17 00:00:00 2001 From: maca88 Date: Thu, 9 May 2024 21:25:51 +0200 Subject: [PATCH 6/7] Fixed failing unit tests - Added the ability to use the `on` method with a callback also for `Unit` result type - Move the logic for sending `Client did not provide a result.` before the message is emitted - Perform the `resultProviderRegistry.contains` check when `on` method is called - Always include the catched exception when rethrowing `RuntimeException` - Simplify tests by using a base class and Unconfined dispatcher --- .../signalrkore/HubCommunicationTask.kt | 2 +- .../signalrkore/HttpHubConnectionBuilder.kt | 11 + .../signalrkore/HubCommunicationLink.kt | 64 +- .../signalrkore/HubConnection.kt | 27 +- .../{CompletableSubject.kt => Completable.kt} | 6 +- .../HubConnectionReturnResultTest.kt | 599 ------------------ .../signalrkore/HubConnectionTest.kt | 448 ------------- .../eu/lepicekmichal/signalrkore/HubTest.kt | 39 ++ .../signalrkore/MockTransport.kt | 5 +- .../eu/lepicekmichal/signalrkore/OnTest.kt | 344 ++++++++++ .../signalrkore/OnWithResultTest.kt | 432 +++++++++++++ .../{SingleSubject.kt => Single.kt} | 15 +- .../lepicekmichal/signalrkore/TestLogger.kt | 38 +- 13 files changed, 911 insertions(+), 1119 deletions(-) rename signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/{CompletableSubject.kt => Completable.kt} (75%) delete mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionReturnResultTest.kt delete mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionTest.kt create mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubTest.kt create mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnTest.kt create mode 100644 signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnWithResultTest.kt rename signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/{SingleSubject.kt => Single.kt} (60%) diff --git a/buildSrc/src/main/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationTask.kt b/buildSrc/src/main/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationTask.kt index 84bd3dc..7bafea8 100644 --- a/buildSrc/src/main/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationTask.kt +++ b/buildSrc/src/main/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationTask.kt @@ -307,7 +307,7 @@ open class HubCommunicationTask : DefaultTask() { returns = Unit::class.asTypeName(), body = { if (!reified) { - addStatement("%L", "on(target = target, hasResult = true)") + addStatement("%L", "on(target = target, hasResult = resultType != Unit::class)") .addCode( format = "%L", """ diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HttpHubConnectionBuilder.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HttpHubConnectionBuilder.kt index 5c2145a..0d3f074 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HttpHubConnectionBuilder.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HttpHubConnectionBuilder.kt @@ -1,6 +1,8 @@ package eu.lepicekmichal.signalrkore import io.ktor.client.HttpClient +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.Dispatchers import kotlinx.serialization.json.Json import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds @@ -12,8 +14,16 @@ class HttpHubConnectionBuilder(private val url: String) { */ var transportEnum: TransportEnum = TransportEnum.All + /** + * The [Transport] to be used by the [eu.lepicekmichal.signalrkore.HubConnection] + */ internal var transport: Transport? = null + /** + * The [CoroutineDispatcher] to be used by the [eu.lepicekmichal.signalrkore.HubConnection] + */ + internal var dispatcher: CoroutineDispatcher = Dispatchers.IO + /** * The [HttpClient] to be used by the [eu.lepicekmichal.signalrkore.HubConnection] */ @@ -78,5 +88,6 @@ class HttpHubConnectionBuilder(private val url: String) { transportEnum, json, logger, + dispatcher ) } \ No newline at end of file diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationLink.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationLink.kt index 6de2d76..5debe37 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationLink.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationLink.kt @@ -4,7 +4,7 @@ import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.cancel import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.flow.Flow -import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.catch import kotlinx.coroutines.flow.filter @@ -13,7 +13,6 @@ import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onCompletion import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onStart -import kotlinx.coroutines.flow.onSubscription import kotlinx.coroutines.launch import kotlinx.serialization.InternalSerializationApi import kotlinx.serialization.SerializationException @@ -26,9 +25,9 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication() protected abstract val scope: CoroutineScope - protected abstract val receivedInvocations: SharedFlow - protected abstract val receivedCompletions: SharedFlow - protected abstract val receivedStreamItems: SharedFlow + private val receivedInvocations = MutableSharedFlow() + private val receivedCompletions = MutableSharedFlow() + private val receivedStreamItems = MutableSharedFlow() private val resultProviderRegistry: MutableSet = mutableSetOf() @@ -96,9 +95,9 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication() try { it.result.fromJson(resultType) } catch (ex: SerializationException) { - throw RuntimeException("Completion result could not be parsed as ${resultType.simpleName}: ${it.result}") + throw RuntimeException("Completion result could not be parsed as ${resultType.simpleName}: ${it.result}", ex) } catch (ex: IllegalArgumentException) { - throw RuntimeException("${resultType.simpleName} could not be initialized from the completion result: ${it.result}") + throw RuntimeException("${resultType.simpleName} could not be initialized from the completion result: ${it.result}", ex) } }, ) @@ -162,9 +161,9 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication() try { it.item.fromJson(itemType) } catch (ex: SerializationException) { - throw RuntimeException("Completion result could not be parsed as ${itemType.simpleName}: ${it.item}") + throw RuntimeException("Completion result could not be parsed as ${itemType.simpleName}: ${it.item}", ex) } catch (ex: IllegalArgumentException) { - throw RuntimeException("${itemType.simpleName} could not be initialized from the completion result: ${it.item}") + throw RuntimeException("${itemType.simpleName} could not be initialized from the completion result: ${it.item}", ex) } } .collect { if (!isClosedForSend) send(it) } @@ -189,6 +188,31 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication() } } + protected suspend fun processReceivedInvocation(message: HubMessage.Invocation) { + if (message is HubMessage.Invocation.Blocking && !resultProviderRegistry.contains(message.target)) { + logger.log( + severity = Logger.Severity.WARNING, + message = "There is no result provider for '${message.target}' despite server expecting it.", + cause = null, + ) + + complete( + HubMessage.Completion.Error( + invocationId = message.invocationId, + error = "Client did not provide a result." + ), + ) + } + + receivedInvocations.emit(message) + } + + protected suspend fun processReceivedStreamItem(message: HubMessage.StreamItem) = + receivedStreamItems.emit(message) + + protected suspend fun processReceivedCompletion(message: HubMessage.Completion) = + receivedCompletions.emit(message) + final override fun Flow.handleIncomingInvocation( resultType: KClass, callback: suspend (HubMessage.Invocation) -> T, @@ -245,36 +269,16 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication() } final override fun on(target: String, hasResult: Boolean): Flow { - if (hasResult && resultProviderRegistry.contains(target)) { + if (hasResult && !resultProviderRegistry.add(target)) { throw IllegalStateException("There can be only one function for returning result on blocking invocation (method: $target)") } return receivedInvocations .run { if (!hasResult) this else this - .onSubscription { resultProviderRegistry.add(target) } .onCompletion { resultProviderRegistry.remove(target) } } .filter { it.target == target } - .run { - if (hasResult) this - else this.onEach { - if (it is HubMessage.Invocation.Blocking) { - logger.log( - severity = Logger.Severity.WARNING, - message = "There is no result provider for ${it.target} despite server expecting it.", - cause = null, - ) - - complete( - HubMessage.Completion.Error( - invocationId = it.invocationId, - error = "Client did not provide a result." - ), - ) - } - } - } .onEach { logger.log(Logger.Severity.INFO, "Received invocation: $it", null) } } } \ No newline at end of file diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt index 64a6462..27a6b92 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubConnection.kt @@ -15,6 +15,7 @@ import io.ktor.serialization.kotlinx.json.* import io.ktor.util.* import io.ktor.utils.io.* import io.ktor.utils.io.core.* +import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -55,10 +56,11 @@ class HubConnection private constructor( private val automaticReconnect: AutomaticReconnect, override val logger: Logger, json: Json, + dispatcher: CoroutineDispatcher, ) : HubCommunicationLink(json) { private val job = SupervisorJob() - override val scope = CoroutineScope(job + Dispatchers.IO) + override val scope = CoroutineScope(job + dispatcher) private val pingReset = MutableSharedFlow(extraBufferCapacity = 1) private val pingTicker = pingReset @@ -82,10 +84,6 @@ class HubConnection private constructor( } } - override val receivedInvocations = MutableSharedFlow() - override val receivedStreamItems = MutableSharedFlow() - override val receivedCompletions = MutableSharedFlow() - private val _connectionState: MutableStateFlow = MutableStateFlow(HubConnectionState.DISCONNECTED) val connectionState: StateFlow = _connectionState.asStateFlow() @@ -103,6 +101,7 @@ class HubConnection private constructor( transportEnum: TransportEnum, json: Json, logger: Logger, + dispatcher: CoroutineDispatcher, ) : this( baseUrl = url.takeIf { it.isNotBlank() } ?: throw IllegalArgumentException("A valid url is required."), protocol = protocol, @@ -118,6 +117,7 @@ class HubConnection private constructor( automaticReconnect = automaticReconnect, json = json, logger = logger, + dispatcher = dispatcher, ) { if (transport != null) { this.transport = transport @@ -394,23 +394,12 @@ class HubConnection private constructor( else stop(message.error) } - is HubMessage.Invocation -> { - if (message is HubMessage.Invocation.Blocking && !resultProviderRegistry.contains(message.target)) { - logger.log(Logger.Level.ERROR, "There is no result provider for '${message.target}' despite server expecting it.") - - complete(HubMessage.Completion.Error( - invocationId = message.invocationId, - error = "Client did not provide a result."), - ) - } - - receivedInvocations.emit(message) - } + is HubMessage.Invocation -> processReceivedInvocation(message) is HubMessage.StreamInvocation -> Unit // not supported yet is HubMessage.Ping -> Unit is HubMessage.CancelInvocation -> Unit // this should not happen according to standard - is HubMessage.StreamItem -> receivedStreamItems.emit(message) - is HubMessage.Completion -> receivedCompletions.emit(message) + is HubMessage.StreamItem -> processReceivedStreamItem(message) + is HubMessage.Completion -> processReceivedCompletion(message) } } } diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/CompletableSubject.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/Completable.kt similarity index 75% rename from signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/CompletableSubject.kt rename to signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/Completable.kt index 4fc3d0f..523b16a 100644 --- a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/CompletableSubject.kt +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/Completable.kt @@ -1,6 +1,7 @@ package eu.lepicekmichal.signalrkore import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first @@ -9,10 +10,11 @@ import kotlinx.coroutines.withContext import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -class CompletableSubject { +class Completable { private val stateFlow = MutableStateFlow(false) - suspend fun waitForCompletion(timeout: Duration = 30.seconds) = withContext(Dispatchers.Default) { + @OptIn(FlowPreview::class) + suspend fun waitForCompletion(timeout: Duration = 5.seconds) = withContext(Dispatchers.Default) { stateFlow.filter { it }.timeout(timeout).first() } diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionReturnResultTest.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionReturnResultTest.kt deleted file mode 100644 index dc194cc..0000000 --- a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionReturnResultTest.kt +++ /dev/null @@ -1,599 +0,0 @@ -package eu.lepicekmichal.signalrkore - -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.runTest -import kotlin.test.Test -import kotlin.test.assertContentEquals -import kotlin.test.assertEquals -import kotlin.test.assertFailsWith - -class HubConnectionReturnResultTest { - - @Test - fun `handler with no params should return a value`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - var called = false - val job = launch { - hubConnection.on("inc", resultType = Int::class) { - called = true - 10 - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":10}$RECORD_SEPARATOR" - - assertEquals(expected, response) - assertEquals(true, called) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with no return value should report an error`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val nonResultCalled = CompletableSubject() - val job = launch { - hubConnection.on("inc") { nonResultCalled.complete() } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Client did not provide a result.\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - nonResultCalled.waitForCompletion() - - job.cancel() - hubConnection.stop() - } - - @Test - fun `missing handler with return value should report an error`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Client did not provide a result.\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - - hubConnection.stop() - } - - @Test - fun `faulty handler should return an error`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - val job = launch { - hubConnection.on("inc", resultType = Int::class) { - throw RuntimeException("Custom error.") - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Custom error.\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `registering multiple handler should raise an exception`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - val job = launch { - hubConnection.on("inc", resultType = String::class) { "value" } - } - - testScheduler.advanceUntilIdle() - - val exception = assertFailsWith { - hubConnection.on("inc", resultType = String::class) { "value2" } - } - assertEquals("There can be only one function for returning result on blocking invocation (method: inc)", exception.message) - - job.cancel() - } - - - @Test - fun `handler with result should log if server does not expect a return value`() = runTest { - val mockTransport = MockTransport() - val testLogger = TestLogger() - val hubConnection = createHubConnection(mockTransport) { - logger = testLogger - } - val nonResultCalled = CompletableSubject() - - val job = launch { - hubConnection.on("m", resultType = Int::class) { 42 } - } - - val job2 = launch { - hubConnection.on("fin") { nonResultCalled.complete() } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"m\",\"arguments\":[]}$RECORD_SEPARATOR") - // send another invocation message and wait for it to be processed to make sure the first invocation was processed - mockTransport.receiveMessage("{\"type\":1,\"target\":\"fin\",\"arguments\":[]}$RECORD_SEPARATOR") - - nonResultCalled.waitForCompletion() - - testLogger.assertLogEquals("Result was returned for 'm' method but server is not expecting any result.") - - job.cancel() - job2.cancel() - hubConnection.stop() - } - - @Test - fun `handler with one param should return a value`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - var calledWith: String? = null - val job = launch { - hubConnection.on("inc", resultType = Int::class, param1 = String::class) { p -> - calledWith = p - 10 - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":10}$RECORD_SEPARATOR" - - assertEquals(expected, response) - assertEquals("1", calledWith) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with two params should return a value`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - var calledWith: String? = null - var calledWith2: Int? = null - val job = launch { - hubConnection.on( - "inc", - resultType = String::class, - param1 = String::class, - param2 = Int::class, - ) { p1, p2 -> - calledWith = p1 - calledWith2 = p2 - "bob" - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\", 13]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with three params should return a value`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - val job = launch { - hubConnection.on( - "inc", - resultType = String::class, - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - ) { p1, p2, p3 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - "bob" - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3]]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with four params should return a value`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - val job = launch { - hubConnection.on( - "inc", - resultType = String::class, - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - ) { p1, p2, p3, p4 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - "bob" - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with five params should return a value`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - var calledWith5: String? = null - val job = launch { - hubConnection.on( - "inc", - resultType = String::class, - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - param5 = String::class, - ) { p1, p2, p3, p4, p5 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - calledWith5 = p5 - "bob" - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\"]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - assertEquals("t", calledWith5) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with six params should return a value`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - var calledWith5: String? = null - var calledWith6: Double? = null - val job = launch { - hubConnection.on( - "inc", - resultType = String::class, - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - param5 = String::class, - param6 = Double::class, - ) { p1, p2, p3, p4, p5, p6 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - calledWith5 = p5 - calledWith6 = p6 - "bob" - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - assertEquals("t", calledWith5) - assertEquals(1.5, calledWith6) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with seven params should return a value`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - var calledWith5: String? = null - var calledWith6: Double? = null - var calledWith7: String? = null - val job = launch { - hubConnection.on( - "inc", - resultType = String::class, - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - param5 = String::class, - param6 = Double::class, - param7 = String::class, - ) { p1, p2, p3, p4, p5, p6, p7 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - calledWith5 = p5 - calledWith6 = p6 - calledWith7 = p7 - "bob" - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5,\"h\"]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - assertEquals("t", calledWith5) - assertEquals(1.5, calledWith6) - assertEquals("h", calledWith7) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with eight params should return a value`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - var calledWith5: String? = null - var calledWith6: Double? = null - var calledWith7: String? = null - var calledWith8: Int? = null - val job = launch { - hubConnection.on( - "inc", - resultType = String::class, - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - param5 = String::class, - param6 = Double::class, - param7 = String::class, - param8 = Int::class, - ) { p1, p2, p3, p4, p5, p6, p7, p8 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - calledWith5 = p5 - calledWith6 = p6 - calledWith7 = p7 - calledWith8 = p8 - "bob" - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5,\"h\",33]}$RECORD_SEPARATOR") - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" - - assertEquals(expected, response) - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - assertEquals("t", calledWith5) - assertEquals(1.5, calledWith6) - assertEquals("h", calledWith7) - assertEquals(33, calledWith8) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with return value should not block other handlers`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val resultCalled = CompletableSubject() - val nonResultCalled = CompletableSubject() - val completeResult = CompletableSubject() - - val job = launch { - hubConnection.on("inc", resultType = String::class) { - resultCalled.complete() - completeResult.waitForCompletion() - "bob" - } - } - - val job2 = launch { - hubConnection.on("inc2") { - nonResultCalled.complete() - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") - resultCalled.waitForCompletion() - - // Send an non-result invocation and make sure it's processed even with a blocking result invocation - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc2\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") - nonResultCalled.waitForCompletion() - - completeResult.complete() - - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"bob\"}$RECORD_SEPARATOR" - assertEquals(expected, response) - - job.cancel() - job2.cancel() - hubConnection.stop() - } - - @Test - fun `handler should return an error if cannot parse argument`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - val job = launch { - hubConnection.on("inc", String::class, Int::class) { _ -> "bob" } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - val sentMessage = mockTransport.nextSentMessage - mockTransport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"inc\",\"arguments\":[\"not int\"]}$RECORD_SEPARATOR") - - val response = sentMessage.waitForResult() - val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Failed to parse literal as 'int' value\\nJSON input: \\\"not int\\\"\"}$RECORD_SEPARATOR" - assertEquals(expected, response) - - job.cancel() - hubConnection.stop() - } - - private fun createHubConnection(customTransport: Transport, block: HttpHubConnectionBuilder.() -> Unit = {}): HubConnection { - val builder = HttpHubConnectionBuilder("http://example.com").apply { - transport = customTransport - skipNegotiate = true - transportEnum = TransportEnum.WebSockets - } - builder.apply(block) - - return builder.build() - } -} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionTest.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionTest.kt deleted file mode 100644 index 208120e..0000000 --- a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubConnectionTest.kt +++ /dev/null @@ -1,448 +0,0 @@ -package eu.lepicekmichal.signalrkore - -import kotlinx.coroutines.launch -import kotlinx.coroutines.test.runTest -import kotlin.test.Test -import kotlin.test.assertContentEquals -import kotlin.test.assertEquals - -class HubConnectionTest { - - @Test - fun `registering multiple handlers with parameter should all be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val completable = CompletableSubject() - val value = SingleSubject() - - val action: (Double) -> Unit = { - value.setResult((value.result ?: 0.0) + it) - if (value.result == 24.0) { - completable.complete() - } - } - val job = launch { hubConnection.on("add", action) } - val job2 = launch { hubConnection.on("add", action) } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"add\",\"arguments\":[12]}$RECORD_SEPARATOR") - - // Confirming that our handler was called and the correct message was passed in. - completable.waitForCompletion() - assertEquals(24.0, value.result) - - job.cancel() - job2.cancel() - hubConnection.stop() - } - - @Test - fun `handler without param should be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - - val completable = CompletableSubject() - val job = launch { - hubConnection.on("inc") { completable.complete() } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[]}$RECORD_SEPARATOR") - completable.waitForCompletion() - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with one param should be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val completable = CompletableSubject() - - var calledWith: String? = null - val job = launch { - hubConnection.on("inc", param1 = String::class) { p1 -> - calledWith = p1 - completable.complete() - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") - completable.waitForCompletion() - - assertEquals("1", calledWith) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with two params should be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val completable = CompletableSubject() - - var calledWith: String? = null - var calledWith2: Int? = null - val job = launch { - hubConnection.on("inc", param1 = String::class, param2 = Int::class) { p1, p2 -> - calledWith = p1 - calledWith2 = p2 - completable.complete() - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13]}$RECORD_SEPARATOR") - completable.waitForCompletion() - - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with three params should be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val completable = CompletableSubject() - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - val job = launch { - hubConnection.on( - "inc", - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - ) { p1, p2, p3 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - completable.complete() - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3]]}$RECORD_SEPARATOR") - completable.waitForCompletion() - - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with four params should be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val completable = CompletableSubject() - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - val job = launch { - hubConnection.on( - "inc", - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - ) { p1, p2, p3, p4 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - completable.complete() - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true]}$RECORD_SEPARATOR") - completable.waitForCompletion() - - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with five params should be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val completable = CompletableSubject() - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - var calledWith5: String? = null - val job = launch { - hubConnection.on( - "inc", - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - param5 = String::class, - ) { p1, p2, p3, p4, p5 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - calledWith5 = p5 - completable.complete() - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\"]}$RECORD_SEPARATOR") - completable.waitForCompletion() - - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - assertEquals("t", calledWith5) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with six params should be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val completable = CompletableSubject() - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - var calledWith5: String? = null - var calledWith6: Double? = null - val job = launch { - hubConnection.on( - "inc", - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - param5 = String::class, - param6 = Double::class, - ) { p1, p2, p3, p4, p5, p6 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - calledWith5 = p5 - calledWith6 = p6 - completable.complete() - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5]}$RECORD_SEPARATOR") - completable.waitForCompletion() - - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - assertEquals("t", calledWith5) - assertEquals(1.5, calledWith6) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with seven params should be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val completable = CompletableSubject() - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - var calledWith5: String? = null - var calledWith6: Double? = null - var calledWith7: String? = null - val job = launch { - hubConnection.on( - "inc", - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - param5 = String::class, - param6 = Double::class, - param7 = String::class, - ) { p1, p2, p3, p4, p5, p6, p7 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - calledWith5 = p5 - calledWith6 = p6 - calledWith7 = p7 - completable.complete() - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5,\"h\"]}$RECORD_SEPARATOR") - completable.waitForCompletion() - - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - assertEquals("t", calledWith5) - assertEquals(1.5, calledWith6) - assertEquals("h", calledWith7) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `handler with eight params should be triggered`() = runTest { - val mockTransport = MockTransport() - val hubConnection = createHubConnection(mockTransport) - val completable = CompletableSubject() - - var calledWith: String? = null - var calledWith2: Int? = null - var calledWith3: IntArray? = null - var calledWith4: Boolean? = null - var calledWith5: String? = null - var calledWith6: Double? = null - var calledWith7: String? = null - var calledWith8: Int? = null - val job = launch { - hubConnection.on( - "inc", - param1 = String::class, - param2 = Int::class, - param3 = IntArray::class, - param4 = Boolean::class, - param5 = String::class, - param6 = Double::class, - param7 = String::class, - param8 = Int::class, - ) { p1, p2, p3, p4, p5, p6, p7, p8 -> - calledWith = p1 - calledWith2 = p2 - calledWith3 = p3 - calledWith4 = p4 - calledWith5 = p5 - calledWith6 = p6 - calledWith7 = p7 - calledWith8 = p8 - completable.complete() - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"1\",13,[1,2,3],true,\"t\",1.5,\"h\",33]}$RECORD_SEPARATOR") - completable.waitForCompletion() - - assertEquals("1", calledWith) - assertEquals(13, calledWith2) - assertContentEquals(intArrayOf(1, 2, 3), calledWith3) - assertEquals(true, calledWith4) - assertEquals("t", calledWith5) - assertEquals(1.5, calledWith6) - assertEquals("h", calledWith7) - assertEquals(33, calledWith8) - - job.cancel() - hubConnection.stop() - } - - @Test - fun `throwing from one handler should not stop triggering other handlers`() = runTest { - val mockTransport = MockTransport() - val testLogger = TestLogger() - val hubConnection = createHubConnection(mockTransport) { - logger = testLogger - } - val value1 = SingleSubject() - val value2 = SingleSubject() - - val job = launch { - hubConnection.on("inc", param1 = String::class) { p1 -> - value1.setResult(p1) - throw RuntimeException("throw from on handler") - } - } - - val job2 = launch { - hubConnection.on("inc", param1 = String::class) { p1 -> - value2.setResult(p1) - } - } - - testScheduler.advanceUntilIdle() - - hubConnection.start() - mockTransport.receiveMessage("{\"type\":1,\"target\":\"inc\",\"arguments\":[\"Hello World\"]}$RECORD_SEPARATOR") - - // Confirming that our handler was called and the correct message was passed in. - assertEquals("Hello World", value1.waitForResult()) - assertEquals("Hello World", value2.waitForResult()) - - val log = testLogger.assertLogEquals("Invoking client side method 'inc' failed: java.lang.RuntimeException: throw from on handler") - assertEquals(Logger.Level.ERROR, log.level) - - job.cancel() - job2.cancel() - hubConnection.stop() - } - - private fun createHubConnection(customTransport: Transport, block: HttpHubConnectionBuilder.() -> Unit = {}): HubConnection { - val builder = HttpHubConnectionBuilder("http://example.com").apply { - transport = customTransport - skipNegotiate = true - transportEnum = TransportEnum.WebSockets - } - builder.apply(block) - - return builder.build() - } -} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubTest.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubTest.kt new file mode 100644 index 0000000..0ed4a91 --- /dev/null +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/HubTest.kt @@ -0,0 +1,39 @@ +package eu.lepicekmichal.signalrkore + +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.test.runTest +import kotlin.test.BeforeTest +import kotlin.test.AfterTest + +abstract class HubTest { + protected lateinit var hubConnection: HubConnection + protected lateinit var transport: MockTransport + protected lateinit var logger: TestLogger + + @BeforeTest + fun setup() { + transport = MockTransport() + logger = TestLogger() + hubConnection = createHubConnection(transport) { + logger = this@HubTest.logger + // By using Unconfined dispatcher, we assure that on method will start collecting before the connection is started + dispatcher = Dispatchers.Unconfined + } + } + + @AfterTest + fun cleanup() = runTest { + hubConnection.stop() + } + + private fun createHubConnection(customTransport: Transport, block: HttpHubConnectionBuilder.() -> Unit = {}): HubConnection { + val builder = HttpHubConnectionBuilder("http://example.com").apply { + transport = customTransport + skipNegotiate = true + transportEnum = TransportEnum.WebSockets + } + builder.apply(block) + + return builder.build() + } +} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/MockTransport.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/MockTransport.kt index 5db12cd..d55fddc 100644 --- a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/MockTransport.kt +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/MockTransport.kt @@ -3,7 +3,6 @@ package eu.lepicekmichal.signalrkore import io.ktor.utils.io.core.toByteArray import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.IO import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.filter @@ -18,7 +17,7 @@ class MockTransport( private val receivedMessages: MutableSharedFlow = MutableSharedFlow() - var nextSentMessage: SingleSubject = SingleSubject() + var nextSentMessage: Single = Single() private set lateinit var url: String @@ -37,7 +36,7 @@ class MockTransport( override suspend fun send(message: ByteArray) { if (!ignorePings || !isPing(message)) { nextSentMessage.setResult(io.ktor.utils.io.core.String(message)) - nextSentMessage = SingleSubject() + nextSentMessage = Single() } } diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnTest.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnTest.kt new file mode 100644 index 0000000..452b89c --- /dev/null +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnTest.kt @@ -0,0 +1,344 @@ +package eu.lepicekmichal.signalrkore + +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertContentEquals +import kotlin.test.assertEquals +import kotlin.test.assertNotNull + +class OnTest : HubTest() { + + @Test + fun `multiple handlers with one parameter should all be triggered`() = runTest { + val completable = Completable() + val value = Single() + val add: suspend (Int) -> Unit = { + value.updateResult { value -> (value ?: 0) + it } + if (value.result == 20) { + completable.complete() + } + } + + hubConnection.on("add", add) + hubConnection.on("add", add) + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"add\",\"arguments\":[10]}$RECORD_SEPARATOR") + + completable.waitForCompletion() + assertEquals(20, value.result) + } + + @Test + fun `handler without parameter should be triggered`() = runTest { + val completable = Completable() + hubConnection.on("test") { completable.complete() } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"test\",\"arguments\":[]}$RECORD_SEPARATOR") + completable.waitForCompletion() + } + + @Test + fun `handler with one parameter should be triggered`() = runTest { + val completable = Completable() + var calledWith: String? = null + + hubConnection.on("process", + resultType = Unit::class, + paramType1 = String::class, + ) { p1 -> + calledWith = p1 + completable.complete() + } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"10\"]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("10", calledWith) + } + + @Test + fun `handler with two parameters should be triggered`() = runTest { + val completable = Completable() + var calledWith: String? = null + var calledWith2: Int? = null + + hubConnection.on("process", + resultType = Unit::class, + paramType1 = String::class, + paramType2 = Int::class, + ) { p1, p2 -> + calledWith = p1 + calledWith2 = p2 + completable.complete() + } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"10\",11]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + } + + @Test + fun `handler with three parameters should be triggered`() = runTest { + val completable = Completable() + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + + hubConnection.on( + "process", + resultType = Unit::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + ) { p1, p2, p3 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + completable.complete() + } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"10\",11,[1,2,3]]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + } + + @Test + fun `handler with four parameters should be triggered`() = runTest { + val completable = Completable() + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + + hubConnection.on( + "process", + resultType = Unit::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + ) { p1, p2, p3, p4 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + completable.complete() + } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"10\",11,[1,2,3],true]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + } + + @Test + fun `handler with five parameters should be triggered`() = runTest { + val completable = Completable() + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + + hubConnection.on( + "process", + resultType = Unit::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + paramType5 = String::class, + ) { p1, p2, p3, p4, p5 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + completable.complete() + } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"10\",11,[1,2,3],true,\"yey\"]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("yey", calledWith5) + } + + @Test + fun `handler with six parameters should be triggered`() = runTest { + val completable = Completable() + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + + hubConnection.on( + "process", + resultType = Unit::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + paramType5 = String::class, + paramType6 = Double::class, + ) { p1, p2, p3, p4, p5, p6 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + completable.complete() + } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"10\",11,[1,2,3],true,\"yey\",1.337]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("yey", calledWith5) + assertEquals(1.337, calledWith6) + } + + @Test + fun `handler with seven parameters should be triggered`() = runTest { + val completable = Completable() + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + var calledWith7: String? = null + + hubConnection.on( + "process", + resultType = Unit::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + paramType5 = String::class, + paramType6 = Double::class, + paramType7 = String::class, + ) { p1, p2, p3, p4, p5, p6, p7 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + calledWith7 = p7 + completable.complete() + } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"10\",11,[1,2,3],true,\"yey\",1.337,\"hey\"]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("yey", calledWith5) + assertEquals(1.337, calledWith6) + assertEquals("hey", calledWith7) + } + + @Test + fun `handler with eight parameters should be triggered`() = runTest { + val completable = Completable() + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + var calledWith7: String? = null + var calledWith8: Int? = null + + hubConnection.on( + "process", + resultType = Unit::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + paramType5 = String::class, + paramType6 = Double::class, + paramType7 = String::class, + paramType8 = Int::class, + ) { p1, p2, p3, p4, p5, p6, p7, p8 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + calledWith7 = p7 + calledWith8 = p8 + completable.complete() + } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"10\",11,[1,2,3],true,\"yey\",1.337,\"hey\",123]}$RECORD_SEPARATOR") + completable.waitForCompletion() + + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("yey", calledWith5) + assertEquals(1.337, calledWith6) + assertEquals("hey", calledWith7) + assertEquals(123, calledWith8) + } + + @Test + fun `one faulty handler should not stop triggering other handlers`() = runTest { + val value1 = Single() + val value2 = Single() + + hubConnection.on("process", resultType = Unit::class, paramType1 = String::class) { p1 -> + value1.setResult(p1) + throw RuntimeException("Not ok") + } + hubConnection.on("process", resultType = Unit::class, paramType1 = String::class) { p1 -> + value2.setResult(p1) + } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"Hello World\"]}$RECORD_SEPARATOR") + + assertEquals("Hello World", value1.waitForResult()) + assertEquals("Hello World", value2.waitForResult()) + + val log = logger.assertLogEquals("Getting result for non-blocking invocation of 'process' method has thrown an exception") + assertEquals(Logger.Severity.ERROR, log.severity) + assertNotNull(log.cause) + assertEquals("Not ok", log.cause.message) + } +} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnWithResultTest.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnWithResultTest.kt new file mode 100644 index 0000000..5550725 --- /dev/null +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnWithResultTest.kt @@ -0,0 +1,432 @@ +package eu.lepicekmichal.signalrkore + +import kotlinx.coroutines.test.runTest +import kotlin.test.Test +import kotlin.test.assertContentEquals +import kotlin.test.assertEquals +import kotlin.test.assertFailsWith + +class OnWithResultTest : HubTest() { + + @Test + fun `handler with no parameters should return a value`() = runTest { + var called = false + + hubConnection.on("process", resultType = Boolean::class) { + called = true + true + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"process\",\"arguments\":[]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":true}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals(true, called) + } + + @Test + fun `handler with no return value should report an error`() = runTest { + val nonResultCalled = Completable() + + hubConnection.on("process") { nonResultCalled.complete() } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"process\",\"arguments\":[]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Client did not provide a result.\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + nonResultCalled.waitForCompletion() + } + + @Test + fun `missing handler with return value should report an error`() = runTest { + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"process\",\"arguments\":[]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Client did not provide a result.\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + } + + @Test + fun `faulty handler should return an error`() = runTest { + hubConnection.on("process", resultType = Int::class) { + throw RuntimeException("Custom error.") + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"process\",\"arguments\":[]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Custom error.\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + } + + @Test + fun `registering multiple handlers for same target should raise an exception`() = runTest { + hubConnection.on("process", resultType = Int::class) { 1 } + + val exception = assertFailsWith { + hubConnection.on("process", resultType = Int::class) { 2 } + } + + assertEquals("There can be only one function for returning result on blocking invocation (method: process)", exception.message) + } + + @Test + fun `handler with result should log if server does not expect a return value`() = runTest { + val value = Single() + + hubConnection.on("process", resultType = Int::class) { value.updateResult { 42 }!! } + hubConnection.start() + + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[]}$RECORD_SEPARATOR") + value.waitForResult() + + logger.assertLogEquals("Result was returned for 'process' method but server is not expecting any result.") + } + + @Test + fun `handler with one param should return a value`() = runTest { + var calledWith: String? = null + + hubConnection.on("ping", resultType = String::class, paramType1 = String::class) { p -> + calledWith = p + "pong" + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"ping\",\"arguments\":[\"10\"]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"pong\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("10", calledWith) + } + + @Test + fun `handler with two params should return a value`() = runTest { + var calledWith: String? = null + var calledWith2: Int? = null + + hubConnection.on( + "ping", + resultType = String::class, + paramType1 = String::class, + paramType2 = Int::class, + ) { p1, p2 -> + calledWith = p1 + calledWith2 = p2 + "pong" + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"ping\",\"arguments\":[\"10\",11]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"pong\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + } + + @Test + fun `handler with three params should return a value`() = runTest { + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + + hubConnection.on( + "ping", + resultType = String::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + ) { p1, p2, p3 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + "pong" + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"ping\",\"arguments\":[\"10\",11,[1,2,3]]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"pong\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + } + + @Test + fun `handler with four params should return a value`() = runTest { + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + + hubConnection.on( + "ping", + resultType = String::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + ) { p1, p2, p3, p4 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + "pong" + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"ping\",\"arguments\":[\"10\",11,[1,2,3],true]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"pong\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + } + + @Test + fun `handler with five params should return a value`() = runTest { + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + + hubConnection.on( + "ping", + resultType = String::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + paramType5 = String::class, + ) { p1, p2, p3, p4, p5 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + "pong" + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"ping\",\"arguments\":[\"10\",11,[1,2,3],true,\"yey\"]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"pong\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("yey", calledWith5) + } + + @Test + fun `handler with six params should return a value`() = runTest { + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + + hubConnection.on( + "ping", + resultType = String::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + paramType5 = String::class, + paramType6 = Double::class, + ) { p1, p2, p3, p4, p5, p6 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + "pong" + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"ping\",\"arguments\":[\"10\",11,[1,2,3],true,\"yey\",1.337]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"pong\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("yey", calledWith5) + assertEquals(1.337, calledWith6) + } + + @Test + fun `handler with seven params should return a value`() = runTest { + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + var calledWith7: String? = null + + hubConnection.on( + "ping", + resultType = String::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + paramType5 = String::class, + paramType6 = Double::class, + paramType7 = String::class, + ) { p1, p2, p3, p4, p5, p6, p7 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + calledWith7 = p7 + "pong" + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"ping\",\"arguments\":[\"10\",11,[1,2,3],true,\"yey\",1.337,\"hey\"]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"pong\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("yey", calledWith5) + assertEquals(1.337, calledWith6) + assertEquals("hey", calledWith7) + } + + @Test + fun `handler with eight params should return a value`() = runTest { + var calledWith: String? = null + var calledWith2: Int? = null + var calledWith3: IntArray? = null + var calledWith4: Boolean? = null + var calledWith5: String? = null + var calledWith6: Double? = null + var calledWith7: String? = null + var calledWith8: Int? = null + + hubConnection.on( + "ping", + resultType = String::class, + paramType1 = String::class, + paramType2 = Int::class, + paramType3 = IntArray::class, + paramType4 = Boolean::class, + paramType5 = String::class, + paramType6 = Double::class, + paramType7 = String::class, + paramType8 = Int::class, + ) { p1, p2, p3, p4, p5, p6, p7, p8 -> + calledWith = p1 + calledWith2 = p2 + calledWith3 = p3 + calledWith4 = p4 + calledWith5 = p5 + calledWith6 = p6 + calledWith7 = p7 + calledWith8 = p8 + "pong" + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"ping\",\"arguments\":[\"10\",11,[1,2,3],true,\"yey\",1.337,\"hey\",123]}$RECORD_SEPARATOR") + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"pong\"}$RECORD_SEPARATOR" + + assertEquals(expected, response) + assertEquals("10", calledWith) + assertEquals(11, calledWith2) + assertContentEquals(intArrayOf(1, 2, 3), calledWith3) + assertEquals(true, calledWith4) + assertEquals("yey", calledWith5) + assertEquals(1.337, calledWith6) + assertEquals("hey", calledWith7) + assertEquals(123, calledWith8) + } + + @Test + fun `handler with return value should not block other handlers`() = runTest { + val resultCalled = Completable() + val nonResultCalled = Completable() + val completeResult = Completable() + + hubConnection.on("ping", resultType = String::class) { + resultCalled.complete() + completeResult.waitForCompletion() + "pong" + } + hubConnection.on("process") { + nonResultCalled.complete() + } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"ping\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") + resultCalled.waitForCompletion() + + // Send an non-result invocation and make sure it's processed even with a blocking result invocation + transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[\"1\"]}$RECORD_SEPARATOR") + nonResultCalled.waitForCompletion() + + completeResult.complete() + + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"result\":\"pong\"}$RECORD_SEPARATOR" + assertEquals(expected, response) + } + + @Test + fun `handler should return an error if cannot parse argument`() = runTest { + hubConnection.on("process", resultType = String::class, paramType1 = Int::class) { _ -> "bob" } + hubConnection.start() + + val sentMessage = transport.nextSentMessage + transport.receiveMessage("{\"type\":1,\"invocationId\":\"1\",\"target\":\"process\",\"arguments\":[\"corrupt int\"]}$RECORD_SEPARATOR") + + val response = sentMessage.waitForResult() + val expected = "{\"type\":3,\"invocationId\":\"1\",\"error\":\"Failed to parse literal as 'int' value\\nJSON input: \\\"corrupt int\\\"\"}$RECORD_SEPARATOR" + assertEquals(expected, response) + } +} \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/SingleSubject.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/Single.kt similarity index 60% rename from signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/SingleSubject.kt rename to signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/Single.kt index 6d69d72..4420b62 100644 --- a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/SingleSubject.kt +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/Single.kt @@ -1,20 +1,26 @@ package eu.lepicekmichal.signalrkore import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.FlowPreview import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.timeout +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import kotlinx.coroutines.withContext import kotlin.time.Duration import kotlin.time.Duration.Companion.seconds -class SingleSubject { +class Single { private val stateFlow = MutableStateFlow(null) + private val mutex = Mutex() val result get() = stateFlow.value - suspend fun waitForResult(timeout: Duration = 30.seconds) : T = + @Suppress("UNCHECKED_CAST") + @OptIn(FlowPreview::class) + suspend fun waitForResult(timeout: Duration = 5.seconds) : T = withContext(Dispatchers.Default) { stateFlow.filter { it != null }.timeout(timeout).first() as T } @@ -26,4 +32,9 @@ class SingleSubject { fun setResult(value: T) { stateFlow.value = value } + + suspend fun updateResult(updater: (T?) -> T?): T? = mutex.withLock { + stateFlow.value = updater(stateFlow.value) + stateFlow.value + } } \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/TestLogger.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/TestLogger.kt index 3aa9b09..9cc57ac 100644 --- a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/TestLogger.kt +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/TestLogger.kt @@ -1,30 +1,38 @@ package eu.lepicekmichal.signalrkore +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.FlowPreview +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.catch +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.firstOrNull +import kotlinx.coroutines.flow.timeout +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext import kotlin.test.assertNotNull +import kotlin.time.Duration.Companion.seconds -data class LogEvent(val level: Logger.Level, val message: String) +data class LogEvent(val severity: Logger.Severity, val message: String, val cause: Throwable?) class TestLogger : Logger { - private val logs: MutableList = mutableListOf() + private val scope = CoroutineScope(Dispatchers.Unconfined) - override fun log(level: Logger.Level, message: String) { - logs.add(LogEvent(level, message)) - } - - fun assertLogEquals(message: String): LogEvent { - val log = logs.firstOrNull { it.message == message } + private val logs: MutableSharedFlow = MutableSharedFlow(replay = 10) - assertNotNull(log, "Log message '$message' not found") - - return log + override fun log(severity: Logger.Severity, message: String, cause: Throwable?) { + scope.launch { + logs.emit(LogEvent(severity, message, cause)) + } } - fun assertLogContains(message: String): LogEvent { - val log = logs.firstOrNull { it.message.contains(message) } + @OptIn(FlowPreview::class) + suspend fun assertLogEquals(message: String): LogEvent = withContext(Dispatchers.Default) { + val log = logs.filter { it.message == message }.timeout(5.seconds).catch { }.firstOrNull() - assertNotNull(log, "Log message containing '$message' not found") + assertNotNull(log, "Log message '$message' not found") - return log + log } } \ No newline at end of file From 80cf84c8ea00bdeb71f27c1ce06f899f0bcf3d35 Mon Sep 17 00:00:00 2001 From: maca88 Date: Sun, 12 May 2024 16:02:37 +0200 Subject: [PATCH 7/7] Implemented the concept from C# official library for on method --- .../signalrkore/HubCommunicationLink.kt | 145 ++++++++++++++++++ .../signalrkore/OnWithResultTest.kt | 44 +++++- 2 files changed, 181 insertions(+), 8 deletions(-) diff --git a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationLink.kt b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationLink.kt index b172139..2d5d9fd 100644 --- a/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationLink.kt +++ b/signalrkore/src/commonMain/kotlin/eu/lepicekmichal/signalrkore/HubCommunicationLink.kt @@ -281,4 +281,149 @@ abstract class HubCommunicationLink(private val json: Json) : HubCommunication() .filter { it.target == target } .onEach { logger.log(Logger.Severity.INFO, "Received invocation: $it", null) } } + + + fun on2(target: String, callback: suspend () -> Unit) { + on2( + target = target, + paramTypes = emptyList(), + callback = { + callback() + }, + ) + } + + @Suppress("UNCHECKED_CAST") + fun on2(target: String, paramType1: KClass, callback: suspend (P1) -> Unit) { + on2( + target = target, + paramTypes = listOf(paramType1), + callback = { + callback(it[0] as P1) + }, + ) + } + + inline fun on2(target: String, noinline callback: suspend (P1) -> Unit) { + on2( + target = target, + paramType1 = P1::class, + callback = callback, + ) + } + + @Suppress("UNCHECKED_CAST") + fun on2(target: String, paramType1: KClass, paramType2: KClass, callback: suspend (P1, P2) -> Unit) { + on2( + target = target, + paramTypes = listOf(paramType1, paramType2), + callback = { + callback(it[0] as P1, it[1] as P2) + }, + ) + } + + inline fun on2(target: String, noinline callback: suspend (P1, P2) -> Unit) { + on2( + target = target, + paramType1 = P1::class, + paramType2 = P2::class, + callback = callback, + ) + } + + fun onWithResult2(target: String, resultType: KClass, callback: suspend () -> RESULT) { + onWithResult2( + target = target, + resultType = resultType, + paramTypes = emptyList(), + callback = { + callback() + }, + ) + } + + inline fun onWithResult2(target: String, noinline callback: suspend () -> RESULT) { + onWithResult2( + target = target, + resultType = RESULT::class, + callback = callback, + ) + } + + @Suppress("UNCHECKED_CAST") + fun onWithResult2(target: String, paramType1: KClass, resultType: KClass, callback: suspend (P1) -> RESULT) { + onWithResult2( + target = target, + resultType = resultType, + paramTypes = listOf(paramType1), + callback = { + callback(it[0] as P1) + }, + ) + } + + inline fun onWithResult2(target: String, noinline callback: suspend (P1) -> RESULT) { + onWithResult2( + target = target, + paramType1 = P1::class, + resultType = RESULT::class, + callback = callback, + ) + } + + @Suppress("UNCHECKED_CAST") + fun onWithResult2(target: String, paramType1: KClass, paramType2: KClass, resultType: KClass, callback: suspend (P1, P2) -> RESULT) { + onWithResult2( + target = target, + resultType = resultType, + paramTypes = listOf(paramType1, paramType2), + callback = { + callback(it[0] as P1, it[1] as P2) + }, + ) + } + + inline fun onWithResult2(target: String, noinline callback: suspend (P1, P2) -> RESULT) { + onWithResult2( + target = target, + paramType1 = P1::class, + paramType2 = P2::class, + resultType = RESULT::class, + callback = callback, + ) + } + + fun onWithResult2(target: String, resultType: KClass, paramTypes: List>, callback: suspend (List) -> RESULT) { + if (!resultProviderRegistry.add(target)) { + throw IllegalStateException("There can be only one function for returning result on blocking invocation (method: $target)") + } + + return receivedInvocations + .onCompletion { resultProviderRegistry.remove(target) } + .filter { it.target == target } + .onEach { logger.log(Logger.Severity.INFO, "Received invocation: $it", null) } + .handleIncomingInvocation( + resultType = resultType, + callback = { + callback( + it.arguments.mapIndexed { index, arg -> arg.fromJson(paramTypes[index]) } + ) + }, + ) + } + + fun on2(target: String, paramTypes: List>, callback: suspend (List) -> Unit) { + return receivedInvocations + .filter { it.target == target } + .onEach { logger.log(Logger.Severity.INFO, "Received invocation: $it", null) } + .handleIncomingInvocation( + resultType = Unit::class, + callback = { + callback( + it.arguments.mapIndexed { index, arg -> arg.fromJson(paramTypes[index]) } + ) + }, + ) + } } \ No newline at end of file diff --git a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnWithResultTest.kt b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnWithResultTest.kt index 5550725..0a6a554 100644 --- a/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnWithResultTest.kt +++ b/signalrkore/src/commonTest/kotlin/eu/lepicekmichal/signalrkore/OnWithResultTest.kt @@ -8,11 +8,39 @@ import kotlin.test.assertFailsWith class OnWithResultTest : HubTest() { + @Test + fun `test syntax`() = runTest { + + hubConnection.onWithResult2("withResultZeroParam", resultType = Boolean::class) { true } + hubConnection.onWithResult2("withResultZeroParamInline") { -> true } + + hubConnection.onWithResult2("withResultOneParam", paramType1 = Int::class, resultType = Int::class) { p1 -> p1 * p1 } + hubConnection.onWithResult2("withResultOneParamInline") { p1: Int -> p1 * p1 } + + hubConnection.onWithResult2("withResultTwoParam", paramType1 = Int::class, paramType2 = Int::class, resultType = Int::class) { p1, p2 -> p1 * p2 } + hubConnection.onWithResult2("withResultTwoParamInline") { p1: Int, p2: Int -> p1 * p2 } + + + hubConnection.onWithResult2("withResult", resultType = Boolean::class) { true } + hubConnection.onWithResult2("withResultInline") { -> true } + + + hubConnection.on2("unitZeroParam") { println("test") } + + hubConnection.on2("unitOneParam", paramType1 = Int::class) { p1 -> println(p1) } + hubConnection.on2("unitOneParamInline") { p1: Int -> println(p1) } + + hubConnection.on2("unitTwoParam", paramType1 = Int::class, paramType2 = String::class) { p1, p2 -> println(p2 + p1) } + hubConnection.on2("unitTwoParamInline") { p1: Int, p2: String -> println(p2 + p1) } + + hubConnection.start() + } + @Test fun `handler with no parameters should return a value`() = runTest { var called = false - hubConnection.on("process", resultType = Boolean::class) { + hubConnection.onWithResult2("process", resultType = Boolean::class) { called = true true } @@ -31,7 +59,7 @@ class OnWithResultTest : HubTest() { fun `handler with no return value should report an error`() = runTest { val nonResultCalled = Completable() - hubConnection.on("process") { nonResultCalled.complete() } + hubConnection.on2("process") { nonResultCalled.complete() } hubConnection.start() val sentMessage = transport.nextSentMessage @@ -57,7 +85,7 @@ class OnWithResultTest : HubTest() { @Test fun `faulty handler should return an error`() = runTest { - hubConnection.on("process", resultType = Int::class) { + hubConnection.onWithResult2("process", resultType = Int::class) { throw RuntimeException("Custom error.") } hubConnection.start() @@ -72,10 +100,10 @@ class OnWithResultTest : HubTest() { @Test fun `registering multiple handlers for same target should raise an exception`() = runTest { - hubConnection.on("process", resultType = Int::class) { 1 } + hubConnection.onWithResult2("process", resultType = Int::class) { 1 } val exception = assertFailsWith { - hubConnection.on("process", resultType = Int::class) { 2 } + hubConnection.onWithResult2("process", resultType = Int::class) { 2 } } assertEquals("There can be only one function for returning result on blocking invocation (method: process)", exception.message) @@ -85,7 +113,7 @@ class OnWithResultTest : HubTest() { fun `handler with result should log if server does not expect a return value`() = runTest { val value = Single() - hubConnection.on("process", resultType = Int::class) { value.updateResult { 42 }!! } + hubConnection.onWithResult2("process", resultType = Int::class) { value.updateResult { 42 }!! } hubConnection.start() transport.receiveMessage("{\"type\":1,\"target\":\"process\",\"arguments\":[]}$RECORD_SEPARATOR") @@ -98,7 +126,7 @@ class OnWithResultTest : HubTest() { fun `handler with one param should return a value`() = runTest { var calledWith: String? = null - hubConnection.on("ping", resultType = String::class, paramType1 = String::class) { p -> + hubConnection.onWithResult2("ping", resultType = String::class, paramType1 = String::class) { p -> calledWith = p "pong" } @@ -118,7 +146,7 @@ class OnWithResultTest : HubTest() { var calledWith: String? = null var calledWith2: Int? = null - hubConnection.on( + hubConnection.onWithResult2( "ping", resultType = String::class, paramType1 = String::class,