Skip to content

Reducer snippet concurrency

Victor Oliveira edited this page Dec 6, 2022 · 1 revision

private val multimap = Multimap<TIntent, Job>() private val mutex = Mutex(locked = false)

public val state: MutableStateFlow<TState> = MutableStateFlow(initialState)

public fun executeIntent(intent: TIntent) {
    logger?.logIntent(intent)

    val job = coroutineScope.launch {
        if (intent is UniqueIntent) {
            cleanIntentJobsOfType(intent::class)
        }

        try {
            intentExecutor.executeIntent(intent).collect {
                reduce(it)
            }
        } catch (throwable: Throwable) {
            if (coroutineScope.isActive && throwable !is TerminatedIntentException) {
                logger?.logFailedIntent(intent, throwable)
            }
        }
    }

    val entry = multimap.put(intent, job)

    job.invokeOnCompletion { multimap.remove(entry) }
}

public suspend fun <T : TIntent> cleanIntentJobsOfType(intentClass: KClass<T>) {
    val uniqueJob = coroutineContext[Job]
    multimap[intentClass].forEach {
        if (it.value != uniqueJob) {
            it.value.cancel(TerminatedIntentException())
        }
    }
}

public inline fun <reified T : TState> requireState(): T = state.value as T

private suspend fun reduce(transform: StateTransform<TState>) {
    return mutex.withLock {
        val previousState = state.value
        try {
            val newState = transform.reduce(previousState, defaultDispatcher).also { newState ->
                logger?.logTransformedNewState(transform, previousState, newState)
            }

            state.value = newState
        } catch (throwable: Throwable) {
            logger?.logFailedTransformNewState(transform, previousState, throwable)
        }
    }
}
Clone this wiki locally