Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal for chunked pulls #1866

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions babel.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const plugins = [
'@babel/plugin-proposal-nullish-coalescing-operator',
'@babel/plugin-transform-shorthand-properties',
'@babel/plugin-transform-spread',
'@babel/plugin-transform-async-generator-functions',
[
'@babel/plugin-proposal-object-rest-spread',
{
Expand Down
8 changes: 4 additions & 4 deletions examples/typescript/yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
# yarn lockfile v1


"@babel/runtime@7.24.7":
version "7.24.7"
resolved "https://registry.yarnpkg.com/@babel/runtime/-/runtime-7.24.7.tgz#f4f0d5530e8dbdf59b3451b9b3e594b6ba082e12"
integrity sha512-UwgBRMjJP+xv857DCngvqXI3Iq6J4v0wXmwc6sapg+zyhbwmQX67LUEFrkK5tbyJ30jGuG3ZvWpBiB9LCy1kWw==
"@babel/runtime@7.26.0":
version "7.26.0"
resolved "https://registry.yarnpkg.com/@babel/runtime/-/runtime-7.26.0.tgz#8600c2f595f277c60815256418b85356a65173c1"
integrity sha512-FDSOghenHTiToteC/QRlv2q3DhPZ/oOXTBoirfWNx1Cx3TMVcGWQtMMmQcSvb/JjpNeGzx8Pq/b4fKEJuWm1sw==
dependencies:
regenerator-runtime "^0.14.0"

Expand Down
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
"@babel/plugin-syntax-flow": "^7.24.7",
"@babel/plugin-syntax-jsx": "^7.24.7",
"@babel/plugin-transform-arrow-functions": "^7.24.7",
"@babel/plugin-transform-async-generator-functions": "^7.25.9",
"@babel/plugin-transform-async-to-generator": "^7.24.7",
"@babel/plugin-transform-block-scoping": "^7.24.7",
"@babel/plugin-transform-classes": "^7.24.7",
Expand All @@ -113,6 +114,10 @@
"@babel/plugin-transform-template-literals": "^7.24.7",
"@babel/plugin-transform-unicode-regex": "^7.24.7",
"@nozbe/watermelondb_expect": "npm:[email protected]",
"@react-native/babel-preset": "0.73.21",
"@react-native/eslint-config": "0.73.2",
"@react-native/metro-config": "0.73.5",
"@react-native/typescript-config": "0.73.1",
"@testing-library/react-hooks": "^8.0.1",
"@types/hoist-non-react-statics": "^3.3.5",
"@types/react": "^16.8.6",
Expand Down Expand Up @@ -161,10 +166,6 @@
"react": "18.3.1",
"react-dom": "18.3.1",
"react-native": "0.73.11",
"@react-native/metro-config": "0.73.5",
"@react-native/babel-preset": "0.73.21",
"@react-native/eslint-config": "0.73.2",
"@react-native/typescript-config": "0.73.1",
"react-test-renderer": "18.3.1",
"rimraf": "^4.1.2",
"semver": "^7.6.2",
Expand Down
13 changes: 1 addition & 12 deletions src/sync/impl/synchronize.d.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
import type { SyncArgs } from '../index'

export default function synchronize({
database,
pullChanges,
onDidPullChanges,
pushChanges,
sendCreatedAsUpdated,
migrationsEnabledAtVersion,
log,
conflictResolver,
_unsafeBatchPerCollection,
unsafeTurbo,
}: SyncArgs): Promise<void>
export default function synchronize(params: SyncArgs): Promise<void>
164 changes: 92 additions & 72 deletions src/sync/impl/synchronize.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,23 @@ import {
import { ensureSameDatabase, isChangeSetEmpty, changeSetCount } from './helpers'
import type { SyncArgs, Timestamp, SyncPullStrategy } from '../index'

export default async function synchronize({
database,
pullChanges,
onWillApplyRemoteChanges,
onDidPullChanges,
pushChanges,
sendCreatedAsUpdated = false,
migrationsEnabledAtVersion,
log,
conflictResolver,
_unsafeBatchPerCollection,
unsafeTurbo,
}: SyncArgs): Promise<void> {
async function* liftToAsyncGenerator<T>(promise: Promise<T>): AsyncGenerator<T, void, void> {
yield await promise
}

export default async function synchronize(params: SyncArgs): Promise<void> {
const {
database,
onWillApplyRemoteChanges,
onDidPullChanges,
pushChanges,
sendCreatedAsUpdated = false,
migrationsEnabledAtVersion,
log,
conflictResolver,
_unsafeBatchPerCollection,
unsafeTurbo,
} = params
const resetCount = database._resetCount
log && (log.startedAt = new Date())
log && (log.phase = 'starting')
Expand All @@ -46,78 +50,94 @@ export default async function synchronize({
log && (log.phase = 'ready to pull')

// $FlowFixMe
const pullResult = await pullChanges({
lastPulledAt,
schemaVersion,
migration,
})
log && (log.phase = 'pulled')
const pullChunks = params.useUnsafeChunkedAsyncGenerator
? params.pullChanges({ lastPulledAt, schemaVersion, migration })
: liftToAsyncGenerator(params.pullChanges({ lastPulledAt, schemaVersion, migration }))


let newLastPulledAt: Timestamp = (pullResult: any).timestamp
const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN
let newLastPulledAt: Timestamp | null = null
let result = await pullChunks.next()
log && (log.phase = 'pulled')

if (onWillApplyRemoteChanges) {
await onWillApplyRemoteChanges({ remoteChangeCount })
}
// To answer your question - yes it would be nice to use a for await loop here
// however, because of the use of 'fast-async' (see babel config), this syntax is *not* supported and will cause the code to hang...
while(!result.done) {
const pullResult = result.value
let chunkNewLastPulledAt: Timestamp = (pullResult: any).timestamp
newLastPulledAt = chunkNewLastPulledAt
const remoteChangeCount = pullResult.changes ? changeSetCount(pullResult.changes) : NaN

await database.write(async () => {
ensureSameDatabase(database, resetCount)
invariant(
lastPulledAt === (await getLastPulledAt(database)),
'[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.',
)
if (onWillApplyRemoteChanges) {
await onWillApplyRemoteChanges({ remoteChangeCount })
}

if (unsafeTurbo) {
await database.write(async () => {
ensureSameDatabase(database, resetCount)
invariant(
!_unsafeBatchPerCollection,
'unsafeTurbo must not be used with _unsafeBatchPerCollection',
lastPulledAt === (await getLastPulledAt(database)),
'[Sync] Concurrent synchronization is not allowed. More than one synchronize() call was running at the same time, and the later one was aborted before committing results to local database.',
)

if (unsafeTurbo) {
invariant(
!_unsafeBatchPerCollection,
'unsafeTurbo must not be used with _unsafeBatchPerCollection',
)
invariant(
'syncJson' in pullResult || 'syncJsonId' in pullResult,
'missing syncJson/syncJsonId',
)
invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync')

const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000)

if (pullResult.syncJson) {
await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson)
}

const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId)
chunkNewLastPulledAt = resultRest.timestamp
onDidPullChanges && onDidPullChanges(resultRest)
}

log && (log.newLastPulledAt = chunkNewLastPulledAt)
invariant(
'syncJson' in pullResult || 'syncJsonId' in pullResult,
'missing syncJson/syncJsonId',
typeof chunkNewLastPulledAt === 'number' && chunkNewLastPulledAt > 0,
`pullChanges() returned invalid timestamp ${chunkNewLastPulledAt}. timestamp must be a non-zero number`,
)
invariant(lastPulledAt === null, 'unsafeTurbo can only be used as the first sync')

const syncJsonId = pullResult.syncJsonId || Math.floor(Math.random() * 1000000000)

if (pullResult.syncJson) {
await database.adapter.provideSyncJson(syncJsonId, pullResult.syncJson)
if (!unsafeTurbo) {
// $FlowFixMe
const { changes: remoteChanges, ...resultRest } = pullResult
log && (log.remoteChangeCount = remoteChangeCount)
// $FlowFixMe
await applyRemoteChanges(remoteChanges, {
db: database,
strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy),
sendCreatedAsUpdated,
log,
conflictResolver,
_unsafeBatchPerCollection,
})
onDidPullChanges && onDidPullChanges(resultRest)
}

const resultRest = await database.adapter.unsafeLoadFromSync(syncJsonId)
newLastPulledAt = resultRest.timestamp
onDidPullChanges && onDidPullChanges(resultRest)
}

log && (log.newLastPulledAt = newLastPulledAt)
invariant(
typeof newLastPulledAt === 'number' && newLastPulledAt > 0,
`pullChanges() returned invalid timestamp ${newLastPulledAt}. timestamp must be a non-zero number`,
)

if (!unsafeTurbo) {
// $FlowFixMe
const { changes: remoteChanges, ...resultRest } = pullResult
log && (log.remoteChangeCount = remoteChangeCount)
// $FlowFixMe
await applyRemoteChanges(remoteChanges, {
db: database,
strategy: ((pullResult: any).experimentalStrategy: ?SyncPullStrategy),
sendCreatedAsUpdated,
log,
conflictResolver,
_unsafeBatchPerCollection,
})
onDidPullChanges && onDidPullChanges(resultRest)
}
log && (log.phase = 'applied remote changes')
await setLastPulledAt(database, chunkNewLastPulledAt)

log && (log.phase = 'applied remote changes')
await setLastPulledAt(database, newLastPulledAt)
if (shouldSaveSchemaVersion) {
await setLastPulledSchemaVersion(database, schemaVersion)
}
}, 'sync-synchronize-apply')
result = await pullChunks.next()
}

if (shouldSaveSchemaVersion) {
await setLastPulledSchemaVersion(database, schemaVersion)
}
}, 'sync-synchronize-apply')
if (newLastPulledAt === null) {
invariant(
typeof newLastPulledAt === 'number',
`A pullChanges() function must yield at least one result`,
)
}

// push phase
if (pushChanges) {
Expand Down
78 changes: 47 additions & 31 deletions src/sync/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,53 @@ export type SyncConflictResolver = (
resolved: DirtyRaw,
) => DirtyRaw

export type SyncArgs = $Exact<{
database: Database
pullChanges: (_: SyncPullArgs) => Promise<SyncPullResult>
pushChanges?: (_: SyncPushArgs) => Promise<SyncPushResult | undefined | void>
// version at which support for migration syncs was added - the version BEFORE first syncable migration
migrationsEnabledAtVersion?: SchemaVersion
sendCreatedAsUpdated?: boolean
log?: SyncLog
// Advanced (unsafe) customization point. Useful when you have subtle invariants between multiple
// columns and want to have them updated consistently, or to implement partial sync
// It's called for every record being updated locally, so be sure that this function is FAST.
// If you don't want to change default behavior for a given record, return `resolved` as is
// Note that it's safe to mutate `resolved` object, so you can skip copying it for performance.
conflictResolver?: SyncConflictResolver
// commits changes in multiple batches, and not one - temporary workaround for memory issue
_unsafeBatchPerCollection?: boolean
// Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code.
// This can only be used on initial (login) sync, not for incremental syncs.
// This can only be used with SQLiteAdapter with JSI enabled.
// The exact API may change between versions of WatermelonDB.
// See documentation for more details.
unsafeTurbo?: boolean
// Called after pullChanges with whatever was returned by pullChanges, minus `changes`. Useful
// when using turbo mode
onDidPullChanges?: (_: Object) => Promise<void>
// Called after pullChanges is done, but before these changes are applied. Some stats about the pulled
// changes are passed as arguments. An advanced user can use this for example to show some UI to the user
// when processing a very large sync (could be useful for replacement syncs). Note that remote change count
// is NaN in turbo mode.
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>
}>
export type SyncArgs =
| $Exact<{
database: Database
pushChanges?: (_: SyncPushArgs) => Promise<SyncPushResult | undefined | void>
// version at which support for migration syncs was added - the version BEFORE first syncable migration
migrationsEnabledAtVersion?: SchemaVersion
sendCreatedAsUpdated?: boolean
log?: SyncLog
// Advanced (unsafe) customization point. Useful when you have subtle invariants between multiple
// columns and want to have them updated consistently, or to implement partial sync
// It's called for every record being updated locally, so be sure that this function is FAST.
// If you don't want to change default behavior for a given record, return `resolved` as is
// Note that it's safe to mutate `resolved` object, so you can skip copying it for performance.
conflictResolver?: SyncConflictResolver
// commits changes in multiple batches, and not one - temporary workaround for memory issue
_unsafeBatchPerCollection?: boolean
// Advanced optimization - pullChanges must return syncJson or syncJsonId to be processed by native code.
// This can only be used on initial (login) sync, not for incremental syncs.
// This can only be used with SQLiteAdapter with JSI enabled.
// The exact API may change between versions of WatermelonDB.
// See documentation for more details.
unsafeTurbo?: boolean
// Called after pullChanges with whatever was returned by pullChanges, minus `changes`. Useful
// when using turbo mode
onDidPullChanges?: (_: Object) => Promise<void>
// Called after pullChanges is done, but before these changes are applied. Some stats about the pulled
// changes are passed as arguments. An advanced user can use this for example to show some UI to the user
// when processing a very large sync (could be useful for replacement syncs). Note that remote change count
// is NaN in turbo mode.
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>
useUnsafeChunkedAsyncGenerator: true
pullChanges: (_: SyncPullArgs) => AsyncGenerator<SyncPullResult, void, void>
}>
| $Exact<{
database: Database
pushChanges?: (_: SyncPushArgs) => Promise<SyncPushResult | undefined | void>
migrationsEnabledAtVersion?: SchemaVersion
sendCreatedAsUpdated?: boolean
log?: SyncLog
conflictResolver?: SyncConflictResolver
_unsafeBatchPerCollection?: boolean
unsafeTurbo?: boolean
onDidPullChanges?: (_: Object) => Promise<void>
onWillApplyRemoteChanges?: (info: $Exact<{ remoteChangeCount: number }>) => Promise<void>
useUnsafeChunkedAsyncGenerator?: false
pullChanges: (_: SyncPullArgs) => Promise<SyncPullResult>
}>

export function synchronize(args: SyncArgs): Promise<void>

Expand Down
Loading