Skip to content

Commit

Permalink
Merge pull request #1263 from square/sedwards/fix-conflate
Browse files Browse the repository at this point in the history
Fix Conflate Stale Rendering Logic for Short Circuit
  • Loading branch information
steve-the-edwards authored Feb 21, 2025
2 parents e922f71 + 1b1aa06 commit 56e6ee0
Show file tree
Hide file tree
Showing 2 changed files with 266 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,12 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
}

/**
* If [runtimeConfig] contains [RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES] then
* send any output, but return true which means restart the runtime loop and process another
* action.
* If [runtimeConfig] contains [RuntimeConfigOptions.RENDER_ONLY_WHEN_STATE_CHANGES] and
* we have not changed state, then return true to short circuit the render loop.
*/
suspend fun shortCircuitForUnchangedState(actionResult: ActionProcessingResult): Boolean {
if (runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES) &&
fun shouldShortCircuitForUnchangedState(actionResult: ActionProcessingResult): Boolean {
return runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES) &&
actionResult is ActionApplied<*> && !actionResult.stateChanged
) {
// Possibly send output and process more actions. No state change so no re-render.
sendOutput(actionResult, onOutput)
return true
}
return false
}

scope.launch {
Expand All @@ -183,34 +176,40 @@ public fun <PropsT, OutputT, RenderingT> renderWorkflowIn(
// launched.
var actionResult: ActionProcessingResult = runner.processAction()

if (shortCircuitForUnchangedState(actionResult)) continue
if (shouldShortCircuitForUnchangedState(actionResult)) {
sendOutput(actionResult, onOutput)
continue
}

// After resuming from runner.processAction() our coroutine could now be cancelled, check so
// we don't surprise anyone with an unexpected rendering pass. Show's over, go home.
if (!isActive) return@launch

// Next Render Pass.
var nextRenderAndSnapshot: RenderingAndSnapshot<RenderingT> = runner.nextRendering()

if (runtimeConfig.contains(CONFLATE_STALE_RENDERINGS)) {
// Only null will allow us to continue processing actions and conflating stale renderings.
// If this is not null, then we had an Output and we want to send it with the Rendering
// (stale or not).
while (actionResult is ActionApplied<*> && actionResult.output == null) {
// We have more actions we can process, so this rendering is stale.
while (isActive && actionResult is ActionApplied<*> && actionResult.output == null) {
// We may have more actions we can process, this rendering could be stale.
actionResult = runner.processAction(waitForAnAction = false)

if (!isActive) return@launch

// If no actions processed, then no new rendering needed.
// If no actions processed, then no new rendering needed. Pass on to UI.
if (actionResult == ActionsExhausted) break

// Skip rendering if we had unchanged state, keep draining actions.
if (shouldShortCircuitForUnchangedState(actionResult)) continue

// Make sure the runtime has not been cancelled from runner.processAction()
if (!isActive) return@launch

nextRenderAndSnapshot = runner.nextRendering()
}
}

// Pass on to the UI.
// Pass on the rendering to the UI.
renderingsAndSnapshots.value = nextRenderAndSnapshot
// And emit the Output.

// Emit the Output
sendOutput(actionResult, onOutput)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.cancel
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.map
Expand All @@ -21,6 +22,7 @@ import kotlinx.coroutines.test.StandardTestDispatcher
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.advanceUntilIdle
import kotlinx.coroutines.test.runCurrent
import kotlinx.coroutines.test.runTest
import okio.ByteString
import kotlin.test.Test
Expand Down Expand Up @@ -1180,6 +1182,249 @@ class RenderWorkflowInTest {
}
}

@Test
fun for_render_on_change_only_and_conflate_we_drain_action_but_do_not_render_no_state_changed() {
runtimeTestRunner.runParametrizedTest(
paramSource = runtimeOptions.filter {
it.first.contains(RENDER_ONLY_WHEN_STATE_CHANGES) && it.first.contains(
CONFLATE_STALE_RENDERINGS
)
},
before = ::setup,
) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) ->
runTest(UnconfinedTestDispatcher()) {
check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS))
check(runtimeConfig.contains(RENDER_ONLY_WHEN_STATE_CHANGES))

var renderCount = 0
var childHandlerActionExecuted = 0
var workerActionExecuted = 0
val trigger = MutableSharedFlow<String>()

val childWorkflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
runningWorker(
trigger.asWorker()
) {
action("") {
state = it
setOutput(it)
}
}
renderState
}
)
val workflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
renderChild(childWorkflow) { childOutput ->
action("childHandler") {
childHandlerActionExecuted++
state = childOutput
}
}
runningWorker(
trigger.asWorker()
) {
action("") {
workerActionExecuted++
state = it
}
}
renderState.also {
renderCount++
}
}
)
val props = MutableStateFlow(Unit)
renderWorkflowIn(
workflow = workflow,
scope = backgroundScope,
props = props,
runtimeConfig = runtimeConfig,
workflowTracer = workflowTracer,
) {}

launch {
trigger.emit("changed state")
}
advanceUntilIdle()

// 2 renderings (initial and then the update.) Not *3* renderings.
assertEquals(2, renderCount)
assertEquals(1, childHandlerActionExecuted)
assertEquals(1, workerActionExecuted)
}
}
}

@Test
fun for_conflate_we_conflate_stacked_actions_into_one_rendering() {
runtimeTestRunner.runParametrizedTest(
paramSource = runtimeOptions
.filter {
it.first.contains(CONFLATE_STALE_RENDERINGS)
},
before = ::setup,
) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) ->
runTest(StandardTestDispatcher()) {
check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS))

var childHandlerActionExecuted = false
val trigger = MutableSharedFlow<String>()
val emitted = mutableListOf<String>()

val childWorkflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
runningWorker(
trigger.asWorker()
) {
action("") {
state = it
setOutput(it)
}
}
renderState
}
)
val workflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
renderChild(childWorkflow) { childOutput ->
action("childHandler") {
childHandlerActionExecuted = true
state = childOutput
}
}
runningWorker(
trigger.asWorker()
) {
action("") {
// Update the rendering in order to show conflation.
state = "$it+update"
}
}
renderState
}
)
val props = MutableStateFlow(Unit)
val renderings = renderWorkflowIn(
workflow = workflow,
scope = backgroundScope,
props = props,
runtimeConfig = runtimeConfig,
workflowTracer = workflowTracer,
) {}

launch {
trigger.emit("changed state")
}
val collectionJob = launch(UnconfinedTestDispatcher(testScheduler)) {
// Collect this unconfined so we can get all the renderings faster than actions can
// be processed.
renderings.collect {
emitted += it.rendering
}
}
advanceUntilIdle()
runCurrent()

collectionJob.cancel()

// 2 renderings (initial and then the update.) Not *3* renderings.
assertEquals(2, emitted.size)
assertEquals("changed state+update", emitted.last())
assertTrue(childHandlerActionExecuted)
}
}
}

@Test
fun for_conflate_we_do_not_conflate_stacked_actions_into_one_rendering_if_output() {
runtimeTestRunner.runParametrizedTest(
paramSource = runtimeOptions
.filter {
it.first.contains(CONFLATE_STALE_RENDERINGS)
},
before = ::setup,
) { (runtimeConfig: RuntimeConfig, workflowTracer: WorkflowTracer?) ->
runTest(StandardTestDispatcher()) {
check(runtimeConfig.contains(CONFLATE_STALE_RENDERINGS))

var childHandlerActionExecuted = false
val trigger = MutableSharedFlow<String>()
val emitted = mutableListOf<String>()

val childWorkflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
runningWorker(
trigger.asWorker()
) {
action("") {
state = it
setOutput(it)
}
}
renderState
}
)
val workflow = Workflow.stateful<String, String, String>(
initialState = "unchanging state",
render = { renderState ->
renderChild(childWorkflow) { childOutput ->
action("childHandler") {
childHandlerActionExecuted = true
state = childOutput
setOutput(childOutput)
}
}
runningWorker(
trigger.asWorker()
) {
action("") {
// Update the rendering in order to show conflation.
state = "$it+update"
setOutput("$it+update")
}
}
renderState
}
)
val props = MutableStateFlow(Unit)
val renderings = renderWorkflowIn(
workflow = workflow,
scope = backgroundScope,
props = props,
runtimeConfig = runtimeConfig,
workflowTracer = workflowTracer,
) {}

launch {
trigger.emit("changed state")
}
val collectionJob = launch(UnconfinedTestDispatcher(testScheduler)) {
// Collect this unconfined so we can get all the renderings faster than actions can
// be processed.
renderings.collect {
emitted += it.rendering
}
}
advanceUntilIdle()
runCurrent()

collectionJob.cancel()

// 3 renderings because each had output.
assertEquals(3, emitted.size)
assertEquals("changed state+update", emitted.last())
assertTrue(childHandlerActionExecuted)
}
}
}

private class ExpectedException : RuntimeException()

private fun <T1, T2> cartesianProduct(
Expand Down

0 comments on commit 56e6ee0

Please sign in to comment.