Skip to content

Commit

Permalink
Fix Conflate Stale to Work with Short Circuit
Browse files Browse the repository at this point in the history
Short circuit if there is no changed state while conflating.
  • Loading branch information
steve-the-edwards committed Feb 20, 2025
1 parent 59301c7 commit ce54070
Show file tree
Hide file tree
Showing 2 changed files with 265 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,12 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
}

/**
* If [runtimeConfig] contains [RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES] then
* send any output, but return true which means restart the runtime loop and process another
* action.
* If [runtimeConfig] contains [RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES] and
* we have not changed state, then return true to short circuit the render loop.
*/
suspend fun shortCircuitForUnchangedState(actionResult: ActionProcessingResult): Boolean {
if (runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES) &&
fun shouldShortCircuitForUnchangedState(actionResult: ActionProcessingResult): Boolean {
return runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES) &&
actionResult is ActionApplied<*> && !actionResult.stateChanged
) {
// Possibly send output and process more actions. No state change so no re-render.
sendOutput(actionResult, onOutput)
return true
}
return false
}

scope.launch {
Expand All @@ -183,34 +176,40 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
// launched.
var actionResult: ActionProcessingResult = runner.processAction()

if (shortCircuitForUnchangedState(actionResult)) continue
if (shouldShortCircuitForUnchangedState(actionResult)) {
sendOutput(actionResult, onOutput)
continue
}

// After resuming from runner.processAction() our coroutine could now be cancelled, check so
// we don't surprise anyone with an unexpected rendering pass. Show's over, go home.
if (!isActive) return@launch

// Next Render Pass.
var nextRenderAndSnapshot: RenderingAndSnapshot<RenderingT> = runner.nextRendering()

if (runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) {
// Only null will allow us to continue processing actions and conflating stale renderings.
// If this is not null, then we had an Output and we want to send it with the Rendering
// (stale or not).
while (actionResult is ActionApplied<*> && actionResult.output == null) {
// We have more actions we can process, so this rendering is stale.
while (isActive && actionResult is ActionApplied<*> && actionResult.output == null) {
// We may have more actions we can process, this rendering could be stale.
actionResult = runner.processAction(waitForAnAction = false)

if (!isActive) return@launch

// If no actions processed, then no new rendering needed.
// If no actions processed, then no new rendering needed. Pass on to UI.
if (actionResult == ActionsExhausted) break

// Skip rendering if we had unchanged state, keep draining actions.
if (shouldShortCircuitForUnchangedState(actionResult)) continue

// Make sure the runtime has not been cancelled from runner.processAction()
if (!isActive) return@launch

nextRenderAndSnapshot = runner.nextRendering()
}
}

// Pass on to the UI.
// Pass on the rendering to the UI.
renderingsAndSnapshots.value = nextRenderAndSnapshot
// And emit the Output.

// Emit the Output
sendOutput(actionResult, onOutput)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.map
Expand All @@ -21,6 +22,7 @@ import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import okio.ByteString
import kotlin.test.Test
Expand Down Expand Up @@ -1180,6 +1182,248 @@ class RenderWorkflowInTest {
}
}

@Test
fun for_render_on_change_only_and_conflate_we_drain_action_but_do_not_render_no_state_changed() {
runtimeTestRunner.runParametrizedTest(
paramSource = runtimeOptions.filter {
it.first.contains(RENDER_ONLY_WHEN_STATE_CHANGES) && it.first.contains(
CONFLATE_STALE_RENDERINGS
)
},
before = ::setup,
) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) ->
runTest(UnconfinedTestDispatcher()) {
check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS))
check(runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES))

var renderCount = 0
var childHandlerActionExecuted = 0
var workerActionExecuted = 0
val trigger = MutableSharedFlow<String>()

val childWorkflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
runningWorker(
trigger.asWorker()
) {
action("") {
state = it
setOutput(it)
}
}
renderState
}
)
val workflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
renderChild(childWorkflow) { childOutput ->
action("childHandler") {
childHandlerActionExecuted++
state = childOutput
}
}
runningWorker(
trigger.asWorker()
) {
action("") {
workerActionExecuted++
state = it
}
}
renderState.also {
renderCount++
}
}
)
val props = MutableStateFlow(Unit)
renderWorkflowIn(
workflow = workflow,
scope = backgroundScope,
props = props,
runtimeConfig = runtimeConfig,
workflowTracer = workflowTracer,
) {}

launch {
trigger.emit("changed state")
}
advanceUntilIdle()

assertEquals(2, renderCount)
assertEquals(1, childHandlerActionExecuted)
assertEquals(1, workerActionExecuted)
}
}
}

@Test
fun for_conflate_we_conflate_stacked_actions_into_one_rendering() {
runtimeTestRunner.runParametrizedTest(
paramSource = runtimeOptions
.filter {
it.first.contains(CONFLATE_STALE_RENDERINGS)
},
before = ::setup,
) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) ->
runTest(StandardTestDispatcher()) {
check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS))

var childHandlerActionExecuted = false
val trigger = MutableSharedFlow<String>()
val emitted = mutableListOf<String>()

val childWorkflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
runningWorker(
trigger.asWorker()
) {
action("") {
state = it
setOutput(it)
}
}
renderState
}
)
val workflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
renderChild(childWorkflow) { childOutput ->
action("childHandler") {
childHandlerActionExecuted = true
state = childOutput
}
}
runningWorker(
trigger.asWorker()
) {
action("") {
// Update the rendering in order to show conflation.
state = "$it+update"
}
}
renderState
}
)
val props = MutableStateFlow(Unit)
val renderings = renderWorkflowIn(
workflow = workflow,
scope = backgroundScope,
props = props,
runtimeConfig = runtimeConfig,
workflowTracer = workflowTracer,
) {}

launch {
trigger.emit("changed state")
}
val collectionJob = launch(UnconfinedTestDispatcher(testScheduler)) {
// Collect this unconfined so we can get all the renderings faster than actions can
// be processed.
renderings.collect {
emitted += it.rendering
}
}
advanceUntilIdle()
runCurrent()

collectionJob.cancel()

// 2 renderings (initial and then the update.) Not *3* renderings.
assertEquals(2, emitted.size)
assertEquals("changed state+update", emitted.last())
assertTrue(childHandlerActionExecuted)
}
}
}

@Test
fun for_conflate_we_do_not_conflate_stacked_actions_into_one_rendering_if_output() {
runtimeTestRunner.runParametrizedTest(
paramSource = runtimeOptions
.filter {
it.first.contains(CONFLATE_STALE_RENDERINGS)
},
before = ::setup,
) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) ->
runTest(StandardTestDispatcher()) {
check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS))

var childHandlerActionExecuted = false
val trigger = MutableSharedFlow<String>()
val emitted = mutableListOf<String>()

val childWorkflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
runningWorker(
trigger.asWorker()
) {
action("") {
state = it
setOutput(it)
}
}
renderState
}
)
val workflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
renderChild(childWorkflow) { childOutput ->
action("childHandler") {
childHandlerActionExecuted = true
state = childOutput
setOutput(childOutput)
}
}
runningWorker(
trigger.asWorker()
) {
action("") {
// Update the rendering in order to show conflation.
state = "$it+update"
setOutput("$it+update")
}
}
renderState
}
)
val props = MutableStateFlow(Unit)
val renderings = renderWorkflowIn(
workflow = workflow,
scope = backgroundScope,
props = props,
runtimeConfig = runtimeConfig,
workflowTracer = workflowTracer,
) {}

launch {
trigger.emit("changed state")
}
val collectionJob = launch(UnconfinedTestDispatcher(testScheduler)) {
// Collect this unconfined so we can get all the renderings faster than actions can
// be processed.
renderings.collect {
emitted += it.rendering
}
}
advanceUntilIdle()
runCurrent()

collectionJob.cancel()

// 3 renderings because each had output.
assertEquals(3, emitted.size)
assertEquals("changed state+update", emitted.last())
assertTrue(childHandlerActionExecuted)
}
}
}

private class ExpectedException : RuntimeException()

private fun <T1, T2> cartesianProduct(
Expand Down

0 comments on commit ce54070

Please sign in to comment.