Skip to content

Commit

Permalink
Add optional "useUnsafeChunkedAsyncGenerator"
Browse files Browse the repository at this point in the history
if this flag is set to true, pullChanges is expected to be an
AsyncGenerator that yields multiple SyncPullResults pullChanges is
expected to be an AsyncGenerator that yields multiple SyncPullResults.
  • Loading branch information
tlonny committed Dec 5, 2024
1 parent f5c34eb commit 09f4f00
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 97 deletions.
13 changes: 1 addition & 12 deletions src/sync/impl/synchronize.d.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
import type { SyncArgs } from '../index'

export default function synchronize({
database,
pullChanges,
onDidPullChanges,
pushChanges,
sendCreatedAsUpdated,
migrationsEnabledAtVersion,
log,
conflictResolver,
_unsafeBatchPerCollection,
unsafeTurbo,
}: SyncArgs): Promise<void>
export default function synchronize(params: SyncArgs): Promise<void>
154 changes: 82 additions & 72 deletions src/sync/impl/synchronize.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@ import {
import { ensureSameDatabase, isChangeSetEmpty, changeSetCount } from './helpers'
import type { SyncArgs, Timestamp, SyncPullStrategy } from '../index'

export default async function synchronize({
database,
pullChanges,
onWillApplyRemoteChanges,
onDidPullChanges,
pushChanges,
sendCreatedAsUpdated = false,
migrationsEnabledAtVersion,
log,
conflictResolver,
_unsafeBatchPerCollection,
unsafeTurbo,
}: SyncArgs): Promise<void> {
async function* liftToAsyncGenerator<T>(promise : Promise<T>) : AsyncGenerator<T, void, void> {
yield await promise
}

export default async function synchronize(params : SyncArgs) : Promise<void> {
const {
database,
onWillApplyRemoteChanges,
onDidPullChanges,
pushChanges,
sendCreatedAsUpdated = false,
migrationsEnabledAtVersion,
log,
conflictResolver,
_unsafeBatchPerCollection,
unsafeTurbo,
} = params
const resetCount = database._resetCount
log && (log.startedAt = new Date())
log && (log.phase = 'starting')
Expand All @@ -46,78 +50,84 @@ export default async function synchronize({
log && (log.phase = 'ready to pull')

// $FlowFixMe
const pullResult = await pullChanges({
lastPulledAt,
schemaVersion,
migration,
})
const pullChunks = params.useUnsafeChunkedAsyncGenerator
? params.pullChanges({ lastPulledAt, schemaVersion, migration })
: liftToAsyncGenerator(params.pullChanges({ lastPulledAt, schemaVersion, migration }))
log && (log.phase = 'pulled')

let newLastPulledAt: Timestamp = (pullResult: any).timestamp
const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN
let newLastPulledAt: Timestamp | null = null
for await (const pullResult of pullChunks) {
let newLastPulledAt: Timestamp = (pullResult: any).timestamp
const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN

if (onWillApplyRemoteChanges) {
await onWillApplyRemoteChanges({ remoteChangeCount })
}

await database.write(async () => {
ensureSameDatabase(database, resetCount)
invariant(
lastPulledAt === (await getLastPulledAt(database)),
'[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.',
)
if (onWillApplyRemoteChanges) {
await onWillApplyRemoteChanges({ remoteChangeCount })
}

if (unsafeTurbo) {
await database.write(async () => {
ensureSameDatabase(database, resetCount)
invariant(
!_unsafeBatchPerCollection,
'unsafeTurbo must not be used with _unsafeBatchPerCollection',
lastPulledAt === (await getLastPulledAt(database)),
'[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.',
)

if (unsafeTurbo) {
invariant(
!_unsafeBatchPerCollection,
'unsafeTurbo must not be used with _unsafeBatchPerCollection',
)
invariant(
'syncJson' in pullResult || 'syncJsonId' in pullResult,
'missing syncJson/syncJsonId',
)
invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync')

const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000)

if (pullResult.syncJson) {
await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson)
}

const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId)
newLastPulledAt = resultRest.timestamp
onDidPullChanges && onDidPullChanges(resultRest)
}

log && (log.newLastPulledAt = newLastPulledAt)
invariant(
'syncJson' in pullResult || 'syncJsonId' in pullResult,
'missing syncJson/syncJsonId',
typeof newLastPulledAt === 'number' && newLastPulledAt > 0,
`pullChanges() returned invalid timestamp ${newLastPulledAt}. timestamp must be a non-zero number`,
)
invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync')

const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000)

if (pullResult.syncJson) {
await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson)
if (!unsafeTurbo) {
// $FlowFixMe
const { changes: remoteChanges, ...resultRest } = pullResult
log && (log.remoteChangeCount = remoteChangeCount)
// $FlowFixMe
await applyRemoteChanges(remoteChanges, {
db: database,
strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy),
sendCreatedAsUpdated,
log,
conflictResolver,
_unsafeBatchPerCollection,
})
onDidPullChanges && onDidPullChanges(resultRest)
}

const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId)
newLastPulledAt = resultRest.timestamp
onDidPullChanges && onDidPullChanges(resultRest)
}
log && (log.phase = 'applied remote changes')
await setLastPulledAt(database, newLastPulledAt)

log && (log.newLastPulledAt = newLastPulledAt)
invariant(
typeof newLastPulledAt === 'number' && newLastPulledAt > 0,
`pullChanges() returned invalid timestamp ${newLastPulledAt}. timestamp must be a non-zero number`,
)

if (!unsafeTurbo) {
// $FlowFixMe
const { changes: remoteChanges, ...resultRest } = pullResult
log && (log.remoteChangeCount = remoteChangeCount)
// $FlowFixMe
await applyRemoteChanges(remoteChanges, {
db: database,
strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy),
sendCreatedAsUpdated,
log,
conflictResolver,
_unsafeBatchPerCollection,
})
onDidPullChanges && onDidPullChanges(resultRest)
}
if (shouldSaveSchemaVersion) {
await setLastPulledSchemaVersion(database, schemaVersion)
}
}, 'sync-synchronize-apply')

log && (log.phase = 'applied remote changes')
await setLastPulledAt(database, newLastPulledAt)
}

if (shouldSaveSchemaVersion) {
await setLastPulledSchemaVersion(database, schemaVersion)
}
}, 'sync-synchronize-apply')
if(newLastPulledAt === null) {
throw new Error('An empty generator was used')
}

// push phase
if (pushChanges) {
Expand Down Expand Up @@ -145,4 +155,4 @@ export default async function synchronize({

log && (log.finishedAt = new Date())
log && (log.phase = 'done')
}
}
38 changes: 26 additions & 12 deletions src/sync/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,35 +57,49 @@ export type SyncConflictResolver = (
) => DirtyRaw

export type SyncArgs = $Exact<{
database: Database
pullChanges: (_: SyncPullArgs) => Promise<SyncPullResult>
pushChanges?: (_: SyncPushArgs) => Promise<SyncPushResult | undefined | void>
database: Database,
pushChanges?: (SyncPushArgs) => Promise<?SyncPushResult>,
// version at which support for migration syncs was added - the version BEFORE first syncable migration
migrationsEnabledAtVersion?: SchemaVersion
sendCreatedAsUpdated?: boolean
log?: SyncLog
migrationsEnabledAtVersion?: SchemaVersion,
sendCreatedAsUpdated?: boolean,
log?: SyncLog,
// Advanced (unsafe) customization point. Useful when you have subtle invariants between multiple
// columns and want to have them updated consistently, or to implement partial sync
// It's called for every record being updated locally, so be sure that this function is FAST.
// If you don't want to change default behavior for a given record, return `resolved` as is
// Note that it's safe to mutate `resolved` object, so you can skip copying it for performance.
conflictResolver?: SyncConflictResolver
conflictResolver?: SyncConflictResolver,
// commits changes in multiple batches, and not one - temporary workaround for memory issue
_unsafeBatchPerCollection?: boolean
_unsafeBatchPerCollection?: boolean,
// Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code.
// This can only be used on initial (login) sync, not for incremental syncs.
// This can only be used with SQLiteAdapter with JSI enabled.
// The exact API may change between versions of WatermelonDB.
// See documentation for more details.
unsafeTurbo?: boolean
// Called after pullChanges with whatever was returned by pullChanges, minus `changes`. Useful
unsafeTurbo?: boolean,
// Called after changes are pulled with whatever was returned by pullChanges, minus `changes`. Useful
// when using turbo mode
onDidPullChanges?: (_: Object) => Promise<void>
onDidPullChanges?: (Object) => Promise<void>,
// Called after pullChanges is done, but before these changes are applied. Some stats about the pulled
// changes are passed as arguments. An advanced user can use this for example to show some UI to the user
// when processing a very large sync (could be useful for replacement syncs). Note that remote change count
// is NaN in turbo mode.
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>,
useUnsafeChunkedAsyncGenerator: true,
pullChanges: (SyncPullArgs) => AsyncGenerator<SyncPullResult, void, void>,
}> | $Exact<{
database: Database,
pushChanges?: (SyncPushArgs) => Promise<?SyncPushResult>,
migrationsEnabledAtVersion?: SchemaVersion,
sendCreatedAsUpdated?: boolean,
log?: SyncLog,
conflictResolver?: SyncConflictResolver,
_unsafeBatchPerCollection?: boolean,
unsafeTurbo?: boolean,
onDidPullChanges?: (Object) => Promise<void>,
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>,
useUnsafeChunkedAsyncGenerator: false,
pullChanges: (SyncPullArgs) => Promise<SyncPullResult>,
}>

export function synchronize(args: SyncArgs): Promise<void>
Expand Down
17 changes: 16 additions & 1 deletion src/sync/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ export type SyncConflictResolver = (
// TODO: JSDoc'ify this
export type SyncArgs = $Exact<{
database: Database,
pullChanges: (SyncPullArgs) => Promise<SyncPullResult>,
pushChanges?: (SyncPushArgs) => Promise<?SyncPushResult>,
// version at which support for migration syncs was added - the version BEFORE first syncable migration
migrationsEnabledAtVersion?: SchemaVersion,
Expand Down Expand Up @@ -113,6 +112,22 @@ export type SyncArgs = $Exact<{
// when processing a very large sync (could be useful for replacement syncs). Note that remote change count
// is NaN in turbo mode.
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>,
// If this flag is set to true, then pullChanges is expected to return an async generator that yields multiple SyncPullResults. This allows WatermelonDB to process incoming sync data in chunks, which can be useful for syncs that end up being particularly large and might otherwise cause an OOM if held in memory all at once. It is not recommended to use this flag unless you are sure - should a sync fail midway through, the database will be in a "partially synced" and potentially inconsistent state. Although it will eventually become consistent after a subsequent sync is completed, it might cause unexpected issues depending on your particular use case.
useUnsafeChunkedAsyncGenerator?: false,
pullChanges: (SyncPullArgs) => Promise<SyncPullResult>,
}> | $Exact<{
database: Database,
pushChanges?: (SyncPushArgs) => Promise<?SyncPushResult>,
migrationsEnabledAtVersion?: SchemaVersion,
sendCreatedAsUpdated?: boolean,
log?: SyncLog,
conflictResolver?: SyncConflictResolver,
_unsafeBatchPerCollection?: boolean,
unsafeTurbo?: boolean,
onDidPullChanges?: (Object) => Promise<void>,
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>,
useUnsafeChunkedAsyncGenerator: true,
pullChanges: (SyncPullArgs) => AsyncGenerator<SyncPullResult, void, void>,
}>

/**
Expand Down

0 comments on commit 09f4f00

Please sign in to comment.