Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Commit

Permalink
Reduce replication traffic due to reflected cache stream POSITION (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
erikjohnston authored Oct 27, 2023
1 parent c02406a commit 0680d76
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
1 change: 1 addition & 0 deletions changelog.d/16557.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.
19 changes: 18 additions & 1 deletion synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
from synapse.replication.tcp.streams import EventsStream
from synapse.replication.tcp.streams._base import StreamRow, Token
from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token
from synapse.util.metrics import Measure

if TYPE_CHECKING:
Expand Down Expand Up @@ -204,6 +204,23 @@ async def _run_notifier_loop(self) -> None:
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
#
# There are two reasons for this: 1) this instance
# requested a stream ID but didn't use it, or 2)
# this instance advanced its own stream position due
# to receiving notifications about other instances
# advancing their stream position.

# We skip sending `POSITION` for the `caches` stream
# for the second case as a) it generates a lot of
# traffic as every worker would echo each write, and
# b) nothing cares if a given worker's caches stream
# position lags.
if stream.NAME == CachesStream.NAME:
# If there haven't been any writes since the
# `last_token` then we're in the second case.
if stream.minimal_local_current_token() <= last_token:
continue

# Note: `last_token` may not *actually* be the
# last token we sent out in a RDATA or POSITION.
Expand Down

0 comments on commit 0680d76

Please sign in to comment.