@@ -11,9 +11,11 @@ import kotlinx.coroutines.channels.SendChannel
11
11
import kotlinx.coroutines.flow.Flow
12
12
import kotlinx.coroutines.flow.FlowCollector
13
13
import kotlinx.coroutines.flow.flow
14
- import java.util.concurrent.ConcurrentHashMap
14
+ import kotlinx.coroutines.sync.Mutex
15
+ import kotlinx.coroutines.sync.withLock
15
16
import kotlin.coroutines.CoroutineContext
16
17
import kotlin.coroutines.EmptyCoroutineContext
18
+ import kotlin.coroutines.suspendCoroutine
17
19
import kotlin.reflect.KClass
18
20
19
21
/* *
@@ -221,7 +223,8 @@ class CoroutinesSubtypeEffectHandlerBuilder<F : Any, E : Any> {
221
223
fun build (coroutineContext : CoroutineContext = EmptyCoroutineContext ) = Connectable { eventConsumer ->
222
224
val scope = CoroutineScope (coroutineContext)
223
225
val eventsChannel = Channel <E >()
224
- val subEffectChannels = ConcurrentHashMap <KClass <out F >, Channel <F >>()
226
+ val subEffectChannelsMap = mutableMapOf<KClass <out F >, Channel <F >>()
227
+ val mutex = Mutex ()
225
228
226
229
// Connects the eventConsumer
227
230
scope.launch {
@@ -233,26 +236,39 @@ class CoroutinesSubtypeEffectHandlerBuilder<F : Any, E : Any> {
233
236
object : Connection <F > {
234
237
override fun accept (effect : F ) {
235
238
scope.launch {
236
- // Creates an effectChannel if this is the first time the effect is processed
237
- val subEffectChannel = subEffectChannels.computeIfAbsent(effect::class ) {
238
- val subEffectChannel = Channel <F >()
239
- val effectHandler =
240
- effectsHandlersMap[effect::class ] ? : error(" No effectHandler for $effect " )
241
- // Connects the effectHandler if this is the first time the effect is processed
242
- scope.launch {
243
- if (isActive) effectHandler.handleEffects(subEffectChannel, eventsChannel)
239
+ val subEffectChannel = mutex.withLock {
240
+ // Prevents the creation of an effectChannel if the scope is not active
241
+ if (! isActive) return @launch
242
+
243
+ subEffectChannelsMap.getOrPut(effect::class ) {
244
+ // Creates an effectChannel the first time the effect is processed
245
+ val subEffectChannel = Channel <F >()
246
+ val effectHandler = effectsHandlersMap[effect::class ]
247
+ ? : error(" No effectHandler for $effect " )
248
+ // Connects the effectHandler the first time the effect is processed
249
+ scope.launch {
250
+ if (isActive) effectHandler.handleEffects(
251
+ subEffectChannel, eventsChannel
252
+ )
253
+ }
254
+ subEffectChannel
244
255
}
245
- subEffectChannel
246
256
}
247
257
258
+ // Prevents the processing of the effect if the scope is not active
248
259
if (isActive) subEffectChannel.send(effect)
249
260
}
250
261
}
251
262
252
263
override fun dispose () {
253
264
scope.cancel(" Effect Handler disposed" )
254
265
eventsChannel.close()
255
- subEffectChannels.forEachValue(1 ) { it.close() }
266
+ runBlocking {
267
+ mutex.withLock {
268
+ subEffectChannelsMap.values.forEach { it.close() }
269
+ subEffectChannelsMap.clear()
270
+ }
271
+ }
256
272
}
257
273
}
258
274
}
0 commit comments