Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADD replication pause and toggleOnDocumentVisible #6819

Merged
merged 7 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# RxDB Changelog

<!-- CHANGELOG NEWEST -->
- ADD option `toggleOnDocumentVisible` to `replicateRxCollection()`. [See](https://github.com/pubkey/rxdb/issues/6810)
- ADD `RxReplicationState.pause()`
- ADD `RxReplicationState.isPaused()`
- ADD tests to replication to ensure running the same replication in multiple tabs at once does not fail.

<!-- ADD new changes here! -->

Expand Down
33 changes: 31 additions & 2 deletions docs-src/docs/replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -510,15 +510,25 @@ setInterval(() => myRxReplicationState.reSync(), 10 * 1000);
Cancels the replication. Returns a promise that resolved when everything has been cleaned up.

```ts
await myRxReplicationState.cancel()
await myRxReplicationState.cancel();
```

### pause()

Pauses a running replication. The replication can later be resumed with `RxReplicationState.start()`.

```ts
await myRxReplicationState.pause();
await myRxReplicationState.start(); // restart
```


### remove()

Cancels the replication and deletes the metadata of the replication state. This can be used to restart the replication "from scratch". Calling `.remove()` will only delete the replication metadata, it will NOT delete the documents from the collection of the replication.

```ts
await myRxReplicationState.remove()
await myRxReplicationState.remove();
```

### isStopped()
Expand All @@ -529,6 +539,14 @@ Returns `true` if the replication is stopped. This can be if a non-live replicat
replicationState.isStopped(); // true/false
```

### isPaused()

Returns `true` if the replication is paused.

```js
replicationState.isPaused(); // true/false
```

### Setting a custom initialCheckpoint

By default, the push replication will start from the beginning of time and push all documents from there to the remote.
Expand Down Expand Up @@ -569,7 +587,18 @@ const replicationState = replicateRxCollection({
});
```

### toggleOnDocumentVisible

`(experimental)`

Set this to true to ensure the replication also runs if the tab is currently `visbile`. This fixes problem in browsers where the replicating leader-elected tab becomes stale or hibernated by the browser to save battery life. If the tab is losing visibility, the replication will be paused automatically and then restarted if either the tab becomes leader or the tab becomes visible again.

```ts
const replicationState = replicateRxCollection({
toggleOnDocumentVisible: true,
/* ... */
});
```

### Attachment replication (beta)

Expand Down
84 changes: 75 additions & 9 deletions src/plugins/replication/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ import {
preventHibernateBrowserTab
} from './replication-helper.ts';
import {
addConnectedStorageToCollection, removeConnectedStorageFromCollection
addConnectedStorageToCollection,
removeConnectedStorageFromCollection
} from '../../rx-database-internal-store.ts';
import { addRxPlugin } from '../../plugin.ts';
import { hasEncryption } from '../../rx-storage-helper.ts';
Expand Down Expand Up @@ -87,6 +88,8 @@ export class RxReplicationState<RxDocType, CheckpointType> {
readonly canceled$: Observable<any> = this.subjects.canceled.asObservable();
readonly active$: Observable<boolean> = this.subjects.active.asObservable();

wasStarted: boolean = false;

readonly metaInfoPromise: Promise<{ collectionName: string, schema: RxJsonSchema<RxDocumentData<RxStorageReplicationMeta<RxDocType, any>>> }>;

public startPromise: Promise<void>;
Expand All @@ -106,6 +109,7 @@ export class RxReplicationState<RxDocType, CheckpointType> {
public readonly live?: boolean,
public retryTime?: number,
public autoStart?: boolean,
public toggleOnDocumentVisible?: boolean
) {
this.metaInfoPromise = (async () => {
const metaInstanceCollectionName = 'rx-replication-meta-' + await collection.database.hashFunction([
Expand Down Expand Up @@ -156,15 +160,30 @@ export class RxReplicationState<RxDocType, CheckpointType> {
return;
}

preventHibernateBrowserTab(this);
if (this.internalReplicationState) {
this.internalReplicationState.events.paused.next(false);
}

/**
* If started after a pause,
* just re-sync once and continue.
*/
if (this.wasStarted) {
this.reSync();
return;
}
this.wasStarted = true;


if (!this.toggleOnDocumentVisible) {
preventHibernateBrowserTab(this);
}

// fill in defaults for pull & push
const pullModifier = this.pull && this.pull.modifier ? this.pull.modifier : DEFAULT_MODIFIER;
const pushModifier = this.push && this.push.modifier ? this.push.modifier : DEFAULT_MODIFIER;

const database = this.collection.database;

const metaInfo = await this.metaInfoPromise;

const [metaInstance] = await Promise.all([
Expand Down Expand Up @@ -231,7 +250,7 @@ export class RxReplicationState<RxDocType, CheckpointType> {
*/
let done = false;
let result: ReplicationPullHandlerResult<RxDocType, CheckpointType> = {} as any;
while (!done && !this.isStopped()) {
while (!done && !this.isStoppedOrPaused()) {
try {
result = await this.pull.handler(
checkpoint,
Expand All @@ -249,7 +268,7 @@ export class RxReplicationState<RxDocType, CheckpointType> {
}
}

if (this.isStopped()) {
if (this.isStoppedOrPaused()) {
return {
checkpoint: null,
documents: []
Expand Down Expand Up @@ -304,7 +323,7 @@ export class RxReplicationState<RxDocType, CheckpointType> {
result = [];
}

while (!done && !this.isStopped()) {
while (!done && !this.isStoppedOrPaused()) {
try {
result = await this.push.handler(useRows);
/**
Expand Down Expand Up @@ -334,7 +353,7 @@ export class RxReplicationState<RxDocType, CheckpointType> {
await awaitRetry(this.collection, ensureNotFalsy(this.retryTime));
}
}
if (this.isStopped()) {
if (this.isStoppedOrPaused()) {
return [];
}

Expand Down Expand Up @@ -375,7 +394,9 @@ export class RxReplicationState<RxDocType, CheckpointType> {
this.subs.push(
this.pull.stream$.subscribe({
next: ev => {
this.remoteEvents$.next(ev);
if (!this.isStoppedOrPaused()) {
this.remoteEvents$.next(ev);
}
},
error: err => {
this.subjects.error.next(err);
Expand All @@ -396,13 +417,25 @@ export class RxReplicationState<RxDocType, CheckpointType> {
this.callOnStart();
}

pause() {
ensureNotFalsy(this.internalReplicationState).events.paused.next(true);
}

isPaused(): boolean {
return this.internalReplicationState ? this.internalReplicationState.events.paused.getValue() : false;
}

isStopped(): boolean {
if (this.subjects.canceled.getValue()) {
return true;
}
return false;
}

isStoppedOrPaused() {
return this.isPaused() || this.isStopped();
}

async awaitInitialReplication(): Promise<void> {
await this.startPromise;
return awaitRxStorageReplicationFirstInSync(
Expand Down Expand Up @@ -510,6 +543,7 @@ export function replicateRxCollection<RxDocType, CheckpointType>(
retryTime = 1000 * 5,
waitForLeadership = true,
autoStart = true,
toggleOnDocumentVisible = false
}: ReplicationOptions<RxDocType, CheckpointType>
): RxReplicationState<RxDocType, CheckpointType> {
addRxPlugin(RxDBLeaderElectionPlugin);
Expand All @@ -536,10 +570,42 @@ export function replicateRxCollection<RxDocType, CheckpointType>(
push,
live,
retryTime,
autoStart
autoStart,
toggleOnDocumentVisible
);


if (
toggleOnDocumentVisible &&
typeof document !== 'undefined' &&
typeof document.addEventListener === 'function' &&
typeof document.visibilityState === 'string'
) {
const handler = () => {
if (replicationState.isStopped()) {
return;
}
const isVisible = document.visibilityState;
if (isVisible) {
replicationState.start();
} else {
/**
* Only pause if not the current leader.
* If no tab is visible, the elected leader should still continue
* the replication.
*/
if (!collection.database.isLeader()) {
replicationState.pause();
}
}
}
document.addEventListener('visibilitychange', handler);
replicationState.onCancel.push(
() => document.removeEventListener('visibilitychange', handler)
);
}


startReplicationOnLeaderShip(waitForLeadership, replicationState);
return replicationState as any;
}
Expand Down
1 change: 1 addition & 0 deletions src/replication-protocol/checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ export async function setCheckpoint<RxDocType, CheckpointType>(
}
}
}

});
await state.checkpointQueue;
}
Expand Down
1 change: 1 addition & 0 deletions src/replication-protocol/downstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ export async function startReplicationDownstream<RxDocType, CheckpointType = any
state.stats.down.masterChangeStreamEmit = state.stats.down.masterChangeStreamEmit + 1;
addNewTask(task);
});
// unsubscribe when replication is canceled
firstValueFrom(
state.events.canceled.pipe(
filter(canceled => !!canceled)
Expand Down
2 changes: 1 addition & 1 deletion src/replication-protocol/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ export function replicateRxStorageInstance<RxDocType>(
downstreamBulkWriteFlag: checkpointKeyPromise.then(checkpointKey => 'replication-downstream-' + checkpointKey),
events: {
canceled: new BehaviorSubject<boolean>(false),
paused: new BehaviorSubject<boolean>(false),
active: {
down: new BehaviorSubject<boolean>(true),
up: new BehaviorSubject<boolean>(true)
Expand Down Expand Up @@ -202,7 +203,6 @@ export function rxStorageInstanceToReplicationHandler<RxDocType, MasterCheckpoin
undefined
);
}

return docData;
})
)
Expand Down
34 changes: 30 additions & 4 deletions src/replication-protocol/upstream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ export async function startReplicationUpstream<RxDocType, CheckpointType>(
let timer = 0;
let initialSyncStartTime = -1;

type Task = EventBulk<RxStorageChangeEvent<RxDocType>, any>;
type Task = EventBulk<RxStorageChangeEvent<RxDocType>, any> | 'RESYNC';
type TaskWithTime = {
task: Task;
time: number;
Expand All @@ -95,6 +95,11 @@ export async function startReplicationUpstream<RxDocType, CheckpointType>(

const sub = state.input.forkInstance.changeStream()
.subscribe((eventBulk) => {
if (state.events.paused.getValue()) {
return;
}


state.stats.up.forkChangeStreamEmit = state.stats.up.forkChangeStreamEmit + 1;
openTasks.push({
task: eventBulk,
Expand All @@ -110,11 +115,28 @@ export async function startReplicationUpstream<RxDocType, CheckpointType>(
return processTasks();
}
});
const subResync = replicationHandler
.masterChangeStream$
.pipe(
filter(ev => ev === 'RESYNC')
)
.subscribe(() => {
openTasks.push({
task: 'RESYNC',
time: timer++
});
processTasks();
});

// unsubscribe when replication is canceled
firstValueFrom(
state.events.canceled.pipe(
filter(canceled => !!canceled)
)
).then(() => sub.unsubscribe());
).then(() => {
sub.unsubscribe();
subResync.unsubscribe();
});


async function upstreamInitialSync() {
Expand All @@ -141,7 +163,6 @@ export async function startReplicationUpstream<RxDocType, CheckpointType>(
if (promises.size > 3) {
await Promise.race(Array.from(promises));
}

const upResult = await getChangedDocumentsSince(
state.input.forkInstance,
state.input.pushBatchSize,
Expand Down Expand Up @@ -209,6 +230,12 @@ export async function startReplicationUpstream<RxDocType, CheckpointType>(
continue;
}

if (taskWithTime.task === 'RESYNC') {
state.events.active.up.next(false);
await upstreamInitialSync();
return;
}

/**
* If the task came from the downstream, we can ignore these documents
* because we know they are replicated already.
Expand Down Expand Up @@ -276,7 +303,6 @@ export async function startReplicationUpstream<RxDocType, CheckpointType>(
* we continue at the correct position and do not have to load
* these documents from the storage again when the replication is restarted.
*/

function rememberCheckpointBeforeReturn() {
return setCheckpoint(
state,
Expand Down
3 changes: 3 additions & 0 deletions src/types/plugins/replication.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,9 @@ export type ReplicationOptions<RxDocType, CheckpointType> = {
* @default true
*/
waitForLeadership?: boolean;

toggleOnDocumentVisible?: boolean;

/**
* If this is set to `false`,
* the replication will not start automatically
Expand Down
5 changes: 5 additions & 0 deletions src/types/replication-protocol.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ export type RxStorageInstanceReplicationState<RxDocType> = {
* Emit true here to cancel the replication.
*/
canceled: BehaviorSubject<boolean>;
/**
* Contains the pause state.
* Emit true here to pause the replication.
*/
paused: BehaviorSubject<boolean>;
/**
* Contains true if the replication is doing something
* at this point in time.
Expand Down
Loading