Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent zombie presence events #208

Merged
merged 6 commits into from
Jun 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ jobs:
- uses: actions/download-artifact@v2
- uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
verbose: false
token: ${{ secrets.CODECOV_TOKEN }}

microbenchmarks:
name: Microbenchmarks
Expand Down
47 changes: 46 additions & 1 deletion yorkie/src/androidTest/kotlin/dev/yorkie/core/PresenceTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ class PresenceTest {
}
}

assertEquals(3, d1Events.size)
assertIs<Others.Watched>(d1Events.first())
assertIs<Others.PresenceChanged>(d1Events[1])
assertIs<Others.Unwatched>(d1Events.last())
Expand Down Expand Up @@ -827,5 +826,51 @@ class PresenceTest {
}
}

@Test
fun test_whether_presence_event_queue_is_empty_after_consecutive_presence_changes() {
withTwoClientsAndDocuments { c1, _, d1, d2, _ ->
val d1PresenceEvents = mutableListOf<MyPresence.PresenceChanged>()
val d2PresenceEvents = mutableListOf<Others.PresenceChanged>()
val jobs = listOf(
launch(start = CoroutineStart.UNDISPATCHED) {
d1.events.filterIsInstance<MyPresence.PresenceChanged>()
.collect(d1PresenceEvents::add)
},
launch(start = CoroutineStart.UNDISPATCHED) {
d2.events.filterIsInstance<Others.PresenceChanged>()
.collect(d2PresenceEvents::add)
},
)

d1.updateAsync { _, presence ->
repeat(10) {
presence.put(mapOf("a" to "${it + 1}"))
}
}.await()

val lastD1PresenceEvent = MyPresence.PresenceChanged(
PresenceInfo(c1.requireClientId(), mapOf("a" to "10")),
)
val lastD2PresenceEvent = Others.PresenceChanged(
PresenceInfo(c1.requireClientId(), mapOf("a" to "10")),
)

withTimeout(GENERAL_TIMEOUT) {
while (lastD1PresenceEvent !in d1PresenceEvents ||
lastD2PresenceEvent !in d2PresenceEvents
) {
delay(50)
}
}

assertEquals(lastD1PresenceEvent, d1PresenceEvents.last())
assertTrue(d1.presenceEventQueue.isEmpty())
assertEquals(lastD2PresenceEvent, d2PresenceEvents.last())
assertTrue(d2.presenceEventQueue.isEmpty())

jobs.forEach(Job::cancel)
}
}

private data class Cursor(val x: Int, val y: Int)
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import dev.yorkie.TreeTest
import dev.yorkie.core.Client.SyncMode.Manual
import dev.yorkie.core.withTwoClientsAndDocuments
import dev.yorkie.document.json.JsonTreeTest.Companion.rootTree
import dev.yorkie.document.json.JsonTreeTest.Companion.updateAndSync
import kotlin.test.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
Expand All @@ -16,7 +17,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_split_at_the_same_position() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -31,7 +32,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -48,7 +49,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_split_at_different_positions_on_the_same_node() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -63,7 +64,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>abc</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -81,7 +82,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_insert_into_the_split_position() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -96,7 +97,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -114,7 +115,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_insert_into_original_node() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -129,7 +130,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -147,7 +148,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_insert_into_split_node() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -162,7 +163,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand All @@ -180,7 +181,7 @@ class JsonTreeSplitMergeTest {
@Test
fun test_contained_split_and_delete_contents_in_split_node() {
withTwoClientsAndDocuments(syncMode = Manual) { c1, c2, d1, d2, _ ->
JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.setNewTree(
"t",
Expand All @@ -195,7 +196,7 @@ class JsonTreeSplitMergeTest {
)
JsonTreeTest.assertTreesXmlEquals("<r><p>ab</p></r>", d1, d2)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 2, 1)
},
Expand Down Expand Up @@ -232,7 +233,7 @@ class JsonTreeSplitMergeTest {
}.await()
JsonTreeTest.assertTreesXmlEquals("<doc><p></p><p>ab</p></doc>", d1)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(1, 3)
},
Expand Down Expand Up @@ -268,7 +269,7 @@ class JsonTreeSplitMergeTest {
d1.getRoot().rootTree().toXml(),
)

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 6)
},
Expand Down Expand Up @@ -309,7 +310,7 @@ class JsonTreeSplitMergeTest {
}.await()
assertEquals("<doc><p>a</p><p>c</p><p>b</p></doc>", d1.getRoot().rootTree().toXml())

JsonTreeTest.updateAndSync(
updateAndSync(
JsonTreeTest.Companion.Updater(c1, d1) { root, _ ->
root.rootTree().edit(2, 7)
},
Expand Down
14 changes: 11 additions & 3 deletions yorkie/src/main/kotlin/dev/yorkie/document/Document.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public class Document(
public val garbageLength: Int
get() = root.garbageLength

private val presenceEventQueue = mutableListOf<PresenceChanged>()
@VisibleForTesting
internal val presenceEventQueue = mutableListOf<PresenceChanged>()
private val pendingPresenceEvents = mutableListOf<PresenceChanged>()

private val onlineClients = MutableStateFlow(setOf<ActorID>())
Expand Down Expand Up @@ -360,6 +361,7 @@ public class Document(
*/
private suspend fun publishPresenceEvent(presences: Presences) {
val iterator = presenceEventQueue.listIterator()
var clearPresenceEventQueue = false
while (iterator.hasNext()) {
val event = iterator.next()
if (event is Others && event.changed.actorID == changeID.actor) {
Expand All @@ -368,10 +370,16 @@ public class Document(
}

if (presenceEventReadyToBePublished(event, presences)) {
if (presenceEventQueue.first() != event) {
clearPresenceEventQueue = true
}
eventStream.emit(event)
iterator.remove()
}
}
if (clearPresenceEventQueue) {
presenceEventQueue.clear()
}
}

private fun presenceEventReadyToBePublished(
Expand All @@ -382,14 +390,14 @@ public class Document(
is MyPresence.Initialized -> presences.keys.containsAll(event.initialized.keys)
is MyPresence.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
event.changed.presence == presences[actorID]
}

is Others.Watched -> event.changed.actorID in presences
is Others.Unwatched -> event.changed.actorID !in presences
is Others.PresenceChanged -> {
val actorID = event.changed.actorID
actorID !in presences || event.changed.presence == presences[actorID]
event.changed.presence == presences[actorID]
}
}
}
Expand Down
Loading