From cea388bbacf1af3377718c8e6fb9164b513842aa Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 15 May 2025 09:24:04 +0200 Subject: [PATCH 01/14] Start integrating Rust extension --- .../lib/src/sync/bucket_storage.dart | 16 ++ .../lib/src/sync/instruction.dart | 147 ++++++++++++++++++ .../lib/src/sync/mutable_sync_status.dart | 15 ++ .../lib/src/sync/streaming_sync.dart | 103 +++++++++++- .../lib/src/sync/sync_status.dart | 3 + .../lib/src/web/web_bucket_storage.dart | 5 + 6 files changed, 286 insertions(+), 3 deletions(-) create mode 100644 packages/powersync_core/lib/src/sync/instruction.dart diff --git a/packages/powersync_core/lib/src/sync/bucket_storage.dart b/packages/powersync_core/lib/src/sync/bucket_storage.dart index 5f6e15ff..d6145918 100644 --- a/packages/powersync_core/lib/src/sync/bucket_storage.dart +++ b/packages/powersync_core/lib/src/sync/bucket_storage.dart @@ -365,6 +365,22 @@ UPDATE ps_buckets SET count_since_last = 0, count_at_last = ?1->name }); } + Future control(String op, [Object? payload]) async { + return await writeTransaction( + (tx) async { + final [row] = + await tx.execute('SELECT powersync_control(?, ?)', [op, payload]); + return row.columnAt(0) as String; + }, + // We flush when powersync_control yields an instruction to do so. + flush: false, + ); + } + + Future flushFileSystem() async { + // Noop outside of web. + } + /// Note: The asynchronous nature of this is due to this needing a global /// lock. The actual database operations are still synchronous, and it /// is assumed that multiple functions on this instance won't be called diff --git a/packages/powersync_core/lib/src/sync/instruction.dart b/packages/powersync_core/lib/src/sync/instruction.dart new file mode 100644 index 00000000..c5c7d32a --- /dev/null +++ b/packages/powersync_core/lib/src/sync/instruction.dart @@ -0,0 +1,147 @@ +import 'sync_status.dart'; + +/// An internal instruction emitted by the sync client in the core extension in +/// response to the Dart SDK passing sync data into the extension. +sealed class Instruction { + factory Instruction.fromJson(Map json) { + return switch (json) { + {'LogLine': final logLine} => + LogLine.fromJson(logLine as Map), + {'UpdateSyncStatus': final updateStatus} => + UpdateSyncStatus.fromJson(updateStatus as Map), + {'EstablishSyncStream': final establish} => + EstablishSyncStream.fromJson(establish as Map), + {'FetchCredentials': final creds} => + FetchCredentials.fromJson(creds as Map), + {'CloseSyncStream': _} => const CloseSyncStream(), + {'FlushFileSystem': _} => const FlushFileSystem(), + {'DidCompleteSync': _} => const DidCompleteSync(), + _ => UnknownSyncInstruction(json) + }; + } +} + +final class LogLine implements Instruction { + final String severity; + final String line; + + LogLine({required this.severity, required this.line}); + + factory LogLine.fromJson(Map json) { + return LogLine( + severity: json['severity'] as String, + line: json['line'] as String, + ); + } +} + +final class EstablishSyncStream implements Instruction { + final Map request; + + EstablishSyncStream(this.request); + + factory EstablishSyncStream.fromJson(Map json) { + return EstablishSyncStream(json['request'] as Map); + } +} + +final class UpdateSyncStatus implements Instruction { + final CoreSyncStatus status; + + UpdateSyncStatus({required this.status}); + + factory UpdateSyncStatus.fromJson(Map json) { + return UpdateSyncStatus( + status: + CoreSyncStatus.fromJson(json['status'] as Map)); + } +} + +final class CoreSyncStatus { + final bool connected; + final bool connecting; + final List priorityStatus; + final DownloadProgress? downloading; + + CoreSyncStatus({ + required this.connected, + required this.connecting, + required this.priorityStatus, + required this.downloading, + }); + + factory CoreSyncStatus.fromJson(Map json) { + return CoreSyncStatus( + connected: json['connected'] as bool, + connecting: json['connecting'] as bool, + priorityStatus: [ + for (final entry in json['priority_status'] as List) + _priorityStatusFromJson(entry as Map) + ], + downloading: switch (json['downloading']) { + null => null, + final raw as Map => DownloadProgress.fromJson(raw), + }, + ); + } + + static SyncPriorityStatus _priorityStatusFromJson(Map json) { + return ( + priority: BucketPriority(json['priority'] as int), + hasSynced: json['has_synced'] as bool?, + lastSyncedAt: switch (json['last_synced_at']) { + null => null, + final lastSyncedAt as int => + DateTime.fromMillisecondsSinceEpoch(lastSyncedAt * 1000), + }, + ); + } +} + +final class DownloadProgress { + final Map progress; + + DownloadProgress(this.progress); + + factory DownloadProgress.fromJson(Map line) { + return DownloadProgress(line.map((k, v) => + MapEntry(k, _bucketProgressFromJson(v as Map)))); + } + + static BucketProgress _bucketProgressFromJson(Map json) { + return ( + priority: BucketPriority(json['priority'] as int), + atLast: json['at_last'] as int, + sinceLast: json['since_last'] as int, + targetCount: json['target_count'] as int, + ); + } +} + +final class FetchCredentials implements Instruction { + final bool didExpire; + + FetchCredentials(this.didExpire); + + factory FetchCredentials.fromJson(Map line) { + return FetchCredentials(line['did_expire'] as bool); + } +} + +final class CloseSyncStream implements Instruction { + const CloseSyncStream(); +} + +final class FlushFileSystem implements Instruction { + const FlushFileSystem(); +} + +final class DidCompleteSync implements Instruction { + const DidCompleteSync(); +} + +final class UnknownSyncInstruction implements Instruction { + final Map source; + + UnknownSyncInstruction(this.source); +} diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 6da63654..51d010d4 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -1,6 +1,7 @@ import 'dart:async'; import 'package:collection/collection.dart'; +import 'package:powersync_core/src/sync/instruction.dart'; import 'sync_status.dart'; import 'bucket_storage.dart'; @@ -79,6 +80,20 @@ final class MutableSyncStatus { } } + void applyFromCore(CoreSyncStatus status) { + connected = status.connected; + connecting = status.connecting; + downloading = status.downloading != null; + priorityStatusEntries = status.priorityStatus; + downloadProgress = switch (status.downloading) { + null => null, + final downloading => InternalSyncDownloadProgress(downloading.progress), + }; + lastSyncedAt = status.priorityStatus + .firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority) + ?.lastSyncedAt; + } + SyncStatus immutableSnapshot() { return SyncStatus( connected: connected, diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 24284751..a0c3e250 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -1,8 +1,10 @@ import 'dart:async'; import 'dart:convert' as convert; +import 'dart:typed_data'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; +import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; @@ -13,7 +15,7 @@ import 'package:sqlite_async/mutex.dart'; import 'bucket_storage.dart'; import '../crud.dart'; - +import 'instruction.dart'; import 'internal_connector.dart'; import 'mutable_sync_status.dart'; import 'stream_utils.dart'; @@ -137,7 +139,12 @@ class StreamingSyncImplementation implements StreamingSync { } // Protect sync iterations with exclusivity (if a valid Mutex is provided) await syncMutex.lock(() { - return _streamingSyncIteration(); + switch (options.source.syncImplementation) { + case SyncClientImplementation.dart: + return _dartStreamingSyncIteration(); + case SyncClientImplementation.rust: + return _rustStreamingSyncIteration(); + } }, timeout: _retryDelay); } catch (e, stacktrace) { if (aborted && e is http.ClientException) { @@ -238,6 +245,7 @@ class StreamingSyncImplementation implements StreamingSync { } assert(identical(_activeCrudUpload, completer)); + _nonLineSyncEvents.add(const UploadCompleted()); _activeCrudUpload = null; completer.complete(); }); @@ -281,6 +289,10 @@ class StreamingSyncImplementation implements StreamingSync { }); } + Future _rustStreamingSyncIteration() async { + await _ActiveRustStreamingIteration(this).syncIteration(); + } + Future<(List, Map)> _collectLocalBucketState() async { final bucketEntries = await adapter.getBucketStates(); @@ -295,7 +307,7 @@ class StreamingSyncImplementation implements StreamingSync { return (initialRequests, localDescriptions); } - Future _streamingSyncIteration() async { + Future _dartStreamingSyncIteration() async { var (bucketRequests, bucketMap) = await _collectLocalBucketState(); if (aborted) { return; @@ -565,6 +577,91 @@ typedef BucketDescription = ({ int priority, }); +final class _ActiveRustStreamingIteration { + final StreamingSyncImplementation sync; + + StreamSubscription? _completedUploads; + final Completer _completedStream = Completer(); + + _ActiveRustStreamingIteration(this.sync); + + Future syncIteration() async { + try { + await _control('start', convert.json.encode(sync.options.params)); + assert(_completedStream.isCompleted, 'Should have started streaming'); + await _completedStream.future; + } finally { + _completedUploads?.cancel(); + await _stop(); + } + } + + Stream _receiveLines(Object? data) { + return sync._rawStreamingSyncRequest(data).map(ReceivedLine.new); + } + + Future _handleLines(EstablishSyncStream request) async { + final events = addBroadcast( + _receiveLines(request.request), sync._nonLineSyncEvents.stream); + + listen: + await for (final event in events) { + switch (event) { + case ReceivedLine(line: final Uint8List line): + await _control('line_binary', line); + case ReceivedLine(line: final line as String): + await _control('line_text', line); + case UploadCompleted(): + await _control('completed_upload'); + case TokenRefreshComplete(): + await _control('refreshed_token'); + case AbortRequested(): + break listen; + } + } + } + + Future _stop() => _control('stop'); + + Future _control(String operation, [Object? payload]) async { + final rawResponse = await sync.adapter.control(operation, payload); + final instructions = convert.json.decode(rawResponse) as List; + + for (final instruction in instructions) { + await _handleInstruction( + Instruction.fromJson(instruction as Map)); + } + } + + Future _handleInstruction(Instruction instruction) async { + switch (instruction) { + case LogLine(:final severity, :final line): + sync.logger.log( + switch (severity) { + 'DEBUG' => Level.FINE, + 'INFO' => Level.INFO, + _ => Level.WARNING, + }, + line); + case EstablishSyncStream(): + _completedStream.complete(_handleLines(instruction)); + case UpdateSyncStatus(:final status): + sync._state.updateStatus((m) => m.applyFromCore(status)); + case FetchCredentials(): + // TODO: Handle this case. + throw UnimplementedError(); + case CloseSyncStream(): + sync._nonLineSyncEvents.add(AbortRequested()); + case FlushFileSystem(): + await sync.adapter.flushFileSystem(); + case DidCompleteSync(): + sync._state.updateStatus((m) => m.downloadError = null); + case UnknownSyncInstruction(:final source): + sync.logger.warning('Unknown instruction: $source'); + } + } +} + sealed class SyncEvent {} final class ReceivedLine implements SyncEvent { diff --git a/packages/powersync_core/lib/src/sync/sync_status.dart b/packages/powersync_core/lib/src/sync/sync_status.dart index fceece50..9c1fcfba 100644 --- a/packages/powersync_core/lib/src/sync/sync_status.dart +++ b/packages/powersync_core/lib/src/sync/sync_status.dart @@ -189,6 +189,9 @@ extension type const BucketPriority._(int priorityNumber) { /// A [Comparator] instance suitable for comparing [BucketPriority] values. static int comparator(BucketPriority a, BucketPriority b) => -a.priorityNumber.compareTo(b.priorityNumber); + + /// The priority used by PowerSync to indicate that a full sync was completed. + static const fullSyncPriority = BucketPriority._(2147483647); } /// Partial information about the synchronization status for buckets within a diff --git a/packages/powersync_core/lib/src/web/web_bucket_storage.dart b/packages/powersync_core/lib/src/web/web_bucket_storage.dart index 4ba46b07..a430d569 100644 --- a/packages/powersync_core/lib/src/web/web_bucket_storage.dart +++ b/packages/powersync_core/lib/src/web/web_bucket_storage.dart @@ -17,4 +17,9 @@ class WebBucketStorage extends BucketStorage { return _webDb.writeTransaction(callback, lockTimeout: lockTimeout, flush: flush); } + + @override + Future flushFileSystem() { + return _webDb.flush(); + } } From 1421bb61c476b20de8ab0bb9ad4baa64a928a27b Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 15 May 2025 14:29:19 +0200 Subject: [PATCH 02/14] Add new sync implementation --- .../powersync_core/lib/src/sync/options.dart | 30 ++++++++++++++++++ .../lib/src/sync/streaming_sync.dart | 31 ++++++++++++++----- .../lib/src/web/sync_worker.dart | 4 +++ .../lib/src/web/sync_worker_protocol.dart | 5 ++- 4 files changed, 61 insertions(+), 9 deletions(-) diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index f13f7e38..f9017dec 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -24,13 +24,43 @@ final class SyncOptions { /// When set to null, PowerSync defaults to a delay of 5 seconds. final Duration? retryDelay; + /// The [SyncClientImplementation] to use. + final SyncClientImplementation syncImplementation; + const SyncOptions({ this.crudThrottleTime, this.retryDelay, this.params, + this.syncImplementation = SyncClientImplementation.defaultClient, }); } +/// The PowerSync SDK offers two different implementations for receiving sync +/// lines: One handling most logic in Dart, and a newer one offloading that work +/// to the native PowerSync extension. +enum SyncClientImplementation { + /// A sync implementation that decodes and handles sync lines in Dart. + @Deprecated( + "Don't use SyncClientImplementation.dart directly, " + "use SyncClientImplementation.defaultClient instead.", + ) + dart, + + /// An experimental sync implementation that parses and handles sync lines in + /// the native PowerSync core extensions. + /// + /// This implementation can be more performant than the Dart implementation, + /// and supports receiving sync lines in a more efficient format. + /// + /// Note that this option is currently experimental. + @experimental + rust; + + /// The default sync client implementation to use. + // ignore: deprecated_member_use_from_same_package + static const defaultClient = dart; +} + @internal extension type ResolvedSyncOptions(SyncOptions source) { Duration get crudThrottleTime => diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index a0c3e250..d793fdf4 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -4,7 +4,6 @@ import 'dart:typed_data'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; -import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; @@ -444,6 +443,7 @@ class StreamingSyncImplementation implements StreamingSync { case UploadCompleted(): // Only relevant for the Rust sync implementation. break; + case AbortCurrentIteration(): case TokenRefreshComplete(): // We have a new token, so stop the iteration. haveInvalidated = true; @@ -579,6 +579,7 @@ typedef BucketDescription = ({ final class _ActiveRustStreamingIteration { final StreamingSyncImplementation sync; + var _isActive = true; StreamSubscription? _completedUploads; final Completer _completedStream = Completer(); @@ -591,6 +592,7 @@ final class _ActiveRustStreamingIteration { assert(_completedStream.isCompleted, 'Should have started streaming'); await _completedStream.future; } finally { + _isActive = true; _completedUploads?.cancel(); await _stop(); } @@ -604,7 +606,7 @@ final class _ActiveRustStreamingIteration { final events = addBroadcast( _receiveLines(request.request), sync._nonLineSyncEvents.stream); - listen: + loop: await for (final event in events) { switch (event) { case ReceivedLine(line: final Uint8List line): @@ -613,10 +615,10 @@ final class _ActiveRustStreamingIteration { await _control('line_text', line); case UploadCompleted(): await _control('completed_upload'); + case AbortCurrentIteration(): + break loop; case TokenRefreshComplete(): await _control('refreshed_token'); - case AbortRequested(): - break listen; } } } @@ -647,11 +649,20 @@ final class _ActiveRustStreamingIteration { _completedStream.complete(_handleLines(instruction)); case UpdateSyncStatus(:final status): sync._state.updateStatus((m) => m.applyFromCore(status)); - case FetchCredentials(): - // TODO: Handle this case. - throw UnimplementedError(); + case FetchCredentials(:final didExpire): + if (didExpire) { + await sync.connector.prefetchCredentials(invalidate: true); + } else { + sync.connector.prefetchCredentials().then((_) { + if (_isActive && !sync.aborted) { + sync._nonLineSyncEvents.add(const TokenRefreshComplete()); + } + }, onError: (Object e, StackTrace s) { + sync.logger.warning('Could not prefetch credentials', e, s); + }); + } case CloseSyncStream(): - sync._nonLineSyncEvents.add(AbortRequested()); + sync._nonLineSyncEvents.add(const AbortCurrentIteration()); case FlushFileSystem(): await sync.adapter.flushFileSystem(); case DidCompleteSync(): @@ -677,3 +688,7 @@ final class UploadCompleted implements SyncEvent { final class TokenRefreshComplete implements SyncEvent { const TokenRefreshComplete(); } + +final class AbortCurrentIteration implements SyncEvent { + const AbortCurrentIteration(); +} diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index e2443e23..b5e8ed63 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -80,6 +80,10 @@ class _ConnectedClient { final encodedParams => jsonDecode(encodedParams) as Map, }, + syncImplementation: switch (request.implementationName) { + null => SyncClientImplementation.defaultClient, + final name => SyncClientImplementation.values.byName(name), + }, ); _runner = _worker.referenceSyncTask( diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index f4de6cc9..2b859e53 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -69,7 +69,8 @@ extension type StartSynchronization._(JSObject _) implements JSObject { required String databaseName, required int crudThrottleTimeMs, required int requestId, - required int? retryDelayMs, + required int retryDelayMs, + required String implementationName, String? syncParamsEncoded, }); @@ -77,6 +78,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external int get requestId; external int get crudThrottleTimeMs; external int? get retryDelayMs; + external String? get implementationName; external String? get syncParamsEncoded; } @@ -417,6 +419,7 @@ final class WorkerCommunicationChannel { crudThrottleTimeMs: options.crudThrottleTime.inMilliseconds, retryDelayMs: options.retryDelay.inMilliseconds, requestId: id, + implementationName: options.source.syncImplementation.name, syncParamsEncoded: switch (options.source.params) { null => null, final params => jsonEncode(params), From 88923e9917065ad77457d166d33a69018ca918b5 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Thu, 15 May 2025 16:20:45 +0200 Subject: [PATCH 03/14] Use in-memory sync tests --- .../lib/src/sync/instruction.dart | 14 +- .../lib/src/sync/mutable_sync_status.dart | 2 +- .../lib/src/sync/streaming_sync.dart | 18 ++- .../test/in_memory_sync_test.dart | 136 ++++++++++-------- 4 files changed, 104 insertions(+), 66 deletions(-) diff --git a/packages/powersync_core/lib/src/sync/instruction.dart b/packages/powersync_core/lib/src/sync/instruction.dart index c5c7d32a..f0146e8e 100644 --- a/packages/powersync_core/lib/src/sync/instruction.dart +++ b/packages/powersync_core/lib/src/sync/instruction.dart @@ -99,13 +99,19 @@ final class CoreSyncStatus { } final class DownloadProgress { - final Map progress; + final Map buckets; - DownloadProgress(this.progress); + DownloadProgress(this.buckets); factory DownloadProgress.fromJson(Map line) { - return DownloadProgress(line.map((k, v) => - MapEntry(k, _bucketProgressFromJson(v as Map)))); + final rawBuckets = line['buckets'] as Map; + + return DownloadProgress(rawBuckets.map((k, v) { + return MapEntry( + k, + _bucketProgressFromJson(v as Map), + ); + })); } static BucketProgress _bucketProgressFromJson(Map json) { diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index 51d010d4..df49e1c2 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -87,7 +87,7 @@ final class MutableSyncStatus { priorityStatusEntries = status.priorityStatus; downloadProgress = switch (status.downloading) { null => null, - final downloading => InternalSyncDownloadProgress(downloading.progress), + final downloading => InternalSyncDownloadProgress(downloading.buckets), }; lastSyncedAt = status.priorityStatus .firstWhereOrNull((s) => s.priority == BucketPriority.fullSyncPriority) diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index d793fdf4..c81f9929 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -139,6 +139,7 @@ class StreamingSyncImplementation implements StreamingSync { // Protect sync iterations with exclusivity (if a valid Mutex is provided) await syncMutex.lock(() { switch (options.source.syncImplementation) { + // ignore: deprecated_member_use_from_same_package case SyncClientImplementation.dart: return _dartStreamingSyncIteration(); case SyncClientImplementation.rust: @@ -568,7 +569,7 @@ String _syncErrorMessage(Object? error) { } else if (error is PowerSyncProtocolException) { return 'Protocol error'; } else { - return '${error.runtimeType}'; + return '${error.runtimeType}: $error'; } } @@ -592,7 +593,7 @@ final class _ActiveRustStreamingIteration { assert(_completedStream.isCompleted, 'Should have started streaming'); await _completedStream.future; } finally { - _isActive = true; + _isActive = false; _completedUploads?.cancel(); await _stop(); } @@ -608,6 +609,10 @@ final class _ActiveRustStreamingIteration { loop: await for (final event in events) { + if (!_isActive || sync.aborted) { + break; + } + switch (event) { case ReceivedLine(line: final Uint8List line): await _control('line_binary', line); @@ -623,7 +628,9 @@ final class _ActiveRustStreamingIteration { } } - Future _stop() => _control('stop'); + Future _stop() { + return _control('stop'); + } Future _control(String operation, [Object? payload]) async { final rawResponse = await sync.adapter.control(operation, payload); @@ -662,7 +669,10 @@ final class _ActiveRustStreamingIteration { }); } case CloseSyncStream(): - sync._nonLineSyncEvents.add(const AbortCurrentIteration()); + if (!sync.aborted) { + _isActive = false; + sync._nonLineSyncEvents.add(const AbortCurrentIteration()); + } case FlushFileSystem(): await sync.adapter.flushFileSystem(); case DidCompleteSync(): diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 3ab8ae76..9455bc94 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -16,9 +16,26 @@ import 'utils/in_memory_http.dart'; import 'utils/test_utils_impl.dart'; void main() { + _declareTests( + 'dart sync client', + SyncOptions( + // ignore: deprecated_member_use_from_same_package + syncImplementation: SyncClientImplementation.dart, + ), + ); + + _declareTests( + 'rust sync client', + SyncOptions( + syncImplementation: SyncClientImplementation.rust, + ), + ); +} + +void _declareTests(String name, SyncOptions options) { final ignoredLogger = Logger.detached('powersync.test')..level = Level.OFF; - group('in-memory sync tests', () { + group(name, () { late final testUtils = TestUtils(); late TestPowerSyncFactory factory; @@ -44,6 +61,7 @@ void main() { expiresAt: DateTime.now(), ); }, uploadData: (db) => uploadData(db)), + options: options, ); } @@ -107,6 +125,7 @@ void main() { }); await expectLater( status, emits(isSyncStatus(downloading: false, hasSynced: true))); + await syncClient.abort(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -122,65 +141,68 @@ void main() { isTrue); }); - test('can save independent buckets in same transaction', () async { - final status = await waitForConnection(); - - syncService.addLine({ - 'checkpoint': Checkpoint( - lastOpId: '0', - writeCheckpoint: null, - checksums: [ - BucketChecksum(bucket: 'a', checksum: 0, priority: 3), - BucketChecksum(bucket: 'b', checksum: 0, priority: 3), - ], - ) - }); - await expectLater(status, emits(isSyncStatus(downloading: true))); - - var commits = 0; - raw.commits.listen((_) => commits++); + // ignore: deprecated_member_use_from_same_package + if (options.syncImplementation == SyncClientImplementation.dart) { + test('can save independent buckets in same transaction', () async { + final status = await waitForConnection(); - syncService - ..addLine({ - 'data': { - 'bucket': 'a', - 'data': >[ - { - 'op_id': '1', - 'op': 'PUT', - 'object_type': 'a', - 'object_id': '1', - 'checksum': 0, - 'data': {}, - } - ], - } - }) - ..addLine({ - 'data': { - 'bucket': 'b', - 'data': >[ - { - 'op_id': '2', - 'op': 'PUT', - 'object_type': 'b', - 'object_id': '1', - 'checksum': 0, - 'data': {}, - } + syncService.addLine({ + 'checkpoint': Checkpoint( + lastOpId: '0', + writeCheckpoint: null, + checksums: [ + BucketChecksum(bucket: 'a', checksum: 0, priority: 3), + BucketChecksum(bucket: 'b', checksum: 0, priority: 3), ], - } + ) }); + await expectLater(status, emits(isSyncStatus(downloading: true))); - // Wait for the operations to be inserted. - while (raw.select('SELECT * FROM ps_oplog;').length < 2) { - await pumpEventQueue(); - } + var commits = 0; + raw.commits.listen((_) => commits++); - // The two buckets should have been inserted in a single transaction - // because the messages were received in quick succession. - expect(commits, 1); - }); + syncService + ..addLine({ + 'data': { + 'bucket': 'a', + 'data': >[ + { + 'op_id': '1', + 'op': 'PUT', + 'object_type': 'a', + 'object_id': '1', + 'checksum': 0, + 'data': {}, + } + ], + } + }) + ..addLine({ + 'data': { + 'bucket': 'b', + 'data': >[ + { + 'op_id': '2', + 'op': 'PUT', + 'object_type': 'b', + 'object_id': '1', + 'checksum': 0, + 'data': {}, + } + ], + } + }); + + // Wait for the operations to be inserted. + while (raw.select('SELECT * FROM ps_oplog;').length < 2) { + await pumpEventQueue(); + } + + // The two buckets should have been inserted in a single transaction + // because the messages were received in quick succession. + expect(commits, 1); + }); + } group('partial sync', () { test('updates sync state incrementally', () async { @@ -281,6 +303,7 @@ void main() { }); await database.waitForFirstSync(priority: BucketPriority(1)); expect(database.currentStatus.hasSynced, isFalse); + await syncClient.abort(); final independentDb = factory.wrapRaw(raw, logger: ignoredLogger); addTearDown(independentDb.close); @@ -485,7 +508,7 @@ void main() { }) async { await expectLater( status, - emits(isSyncStatus( + emitsThrough(isSyncStatus( downloading: true, downloadProgress: isSyncDownloadProgress( progress: total, @@ -644,7 +667,6 @@ void main() { await checkProgress(progress(8, 8), progress(10, 14)); addCheckpointComplete(0); - await checkProgress(progress(8, 8), progress(10, 14)); addDataLine('b', 4); await checkProgress(progress(8, 8), progress(14, 14)); From 814baded86b3416bc92e4011c3ea280ad48e264e Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Tue, 20 May 2025 18:15:42 +0200 Subject: [PATCH 04/14] Properly copy options --- .../native/native_powersync_database.dart | 7 +++--- .../powersync_database_impl_stub.dart | 2 +- .../lib/src/database/powersync_db_mixin.dart | 11 ++++----- .../database/web/web_powersync_database.dart | 10 ++++---- .../powersync_core/lib/src/sync/options.dart | 23 +++++++++++++++++++ 5 files changed, 36 insertions(+), 17 deletions(-) diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 01fddb76..2f846fd5 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -120,7 +120,7 @@ class PowerSyncDatabaseImpl @internal Future connectInternal({ required PowerSyncBackendConnector connector, - required SyncOptions options, + required ResolvedSyncOptions options, required AbortController abort, required Zone asyncWorkZone, }) async { @@ -135,7 +135,6 @@ class PowerSyncDatabaseImpl SendPort? initPort; final hasInitPort = Completer(); final receivedIsolateExit = Completer(); - final resolved = ResolvedSyncOptions(options); Future waitForShutdown() async { // Only complete the abortion signal after the isolate shuts down. This @@ -183,7 +182,7 @@ class PowerSyncDatabaseImpl final port = initPort = data[1] as SendPort; hasInitPort.complete(); var crudStream = database - .onChange(['ps_crud'], throttle: resolved.crudThrottleTime); + .onChange(['ps_crud'], throttle: options.crudThrottleTime); crudUpdateSubscription = crudStream.listen((event) { port.send(['update']); }); @@ -245,7 +244,7 @@ class PowerSyncDatabaseImpl _PowerSyncDatabaseIsolateArgs( receiveMessages.sendPort, dbRef, - resolved, + options, crudMutex.shared, syncMutex.shared, ), diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index 83498b17..2a795497 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -115,7 +115,7 @@ class PowerSyncDatabaseImpl required PowerSyncBackendConnector connector, required AbortController abort, required Zone asyncWorkZone, - required SyncOptions options, + required ResolvedSyncOptions options, }) { throw UnimplementedError(); } diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 6db00e0d..62e2bec5 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -289,11 +289,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { // the lock for the connection. await initialize(); - final resolvedOptions = SyncOptions( - crudThrottleTime: options?.crudThrottleTime ?? crudThrottleTime, - // ignore: deprecated_member_use_from_same_package - retryDelay: options?.retryDelay ?? retryDelay, - params: options?.params ?? params, + final resolvedOptions = ResolvedSyncOptions.resolve( + options, + crudThrottleTime: crudThrottleTime, + params: params, ); // ignore: deprecated_member_use_from_same_package @@ -362,7 +361,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { @internal Future connectInternal({ required PowerSyncBackendConnector connector, - required SyncOptions options, + required ResolvedSyncOptions options, required AbortController abort, required Zone asyncWorkZone, }); diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 44fa01d9..6b40a6a2 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -118,10 +118,8 @@ class PowerSyncDatabaseImpl required PowerSyncBackendConnector connector, required AbortController abort, required Zone asyncWorkZone, - required SyncOptions options, + required ResolvedSyncOptions options, }) async { - final resolved = ResolvedSyncOptions(options); - final storage = BucketStorage(database); StreamingSync sync; // Try using a shared worker for the synchronization implementation to avoid @@ -130,7 +128,7 @@ class PowerSyncDatabaseImpl sync = await SyncWorkerHandle.start( database: this, connector: connector, - options: options, + options: options.source, workerUri: Uri.base.resolve('/powersync_sync.worker.js'), ); } catch (e) { @@ -139,13 +137,13 @@ class PowerSyncDatabaseImpl e, ); final crudStream = - database.onChange(['ps_crud'], throttle: resolved.crudThrottleTime); + database.onChange(['ps_crud'], throttle: options.crudThrottleTime); sync = StreamingSyncImplementation( adapter: storage, connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, - options: resolved, + options: options, client: BrowserClient(), // Only allows 1 sync implementation to run at a time per database // This should be global (across tabs) when using Navigator locks. diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index f9017dec..130161f9 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -33,6 +33,18 @@ final class SyncOptions { this.params, this.syncImplementation = SyncClientImplementation.defaultClient, }); + + SyncOptions _copyWith({ + Duration? crudThrottleTime, + Map? params, + }) { + return SyncOptions( + crudThrottleTime: crudThrottleTime ?? this.crudThrottleTime, + retryDelay: retryDelay, + params: params ?? this.params, + syncImplementation: syncImplementation, + ); + } } /// The PowerSync SDK offers two different implementations for receiving sync @@ -63,6 +75,17 @@ enum SyncClientImplementation { @internal extension type ResolvedSyncOptions(SyncOptions source) { + factory ResolvedSyncOptions.resolve( + SyncOptions? source, { + Duration? crudThrottleTime, + Map? params, + }) { + return ResolvedSyncOptions((source ?? SyncOptions())._copyWith( + crudThrottleTime: crudThrottleTime, + params: params, + )); + } + Duration get crudThrottleTime => source.crudThrottleTime ?? const Duration(milliseconds: 10); From d104fd2029968b8505a347a39c6b3711db3f9140 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 21 May 2025 09:43:08 +0200 Subject: [PATCH 05/14] Revert intended changes --- .../powersync_core/lib/src/database/powersync_db_mixin.dart | 2 ++ packages/powersync_core/lib/src/sync/mutable_sync_status.dart | 2 +- packages/powersync_core/lib/src/sync/options.dart | 3 +++ packages/powersync_core/lib/src/sync/streaming_sync.dart | 1 - 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 62e2bec5..808efc71 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -292,6 +292,8 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { final resolvedOptions = ResolvedSyncOptions.resolve( options, crudThrottleTime: crudThrottleTime, + // ignore: deprecated_member_use_from_same_package + retryDelay: retryDelay, params: params, ); diff --git a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart index df49e1c2..23e3becb 100644 --- a/packages/powersync_core/lib/src/sync/mutable_sync_status.dart +++ b/packages/powersync_core/lib/src/sync/mutable_sync_status.dart @@ -1,8 +1,8 @@ import 'dart:async'; import 'package:collection/collection.dart'; -import 'package:powersync_core/src/sync/instruction.dart'; +import 'instruction.dart'; import 'sync_status.dart'; import 'bucket_storage.dart'; import 'protocol.dart'; diff --git a/packages/powersync_core/lib/src/sync/options.dart b/packages/powersync_core/lib/src/sync/options.dart index 130161f9..d9b0833b 100644 --- a/packages/powersync_core/lib/src/sync/options.dart +++ b/packages/powersync_core/lib/src/sync/options.dart @@ -36,6 +36,7 @@ final class SyncOptions { SyncOptions _copyWith({ Duration? crudThrottleTime, + Duration? retryDelay, Map? params, }) { return SyncOptions( @@ -78,10 +79,12 @@ extension type ResolvedSyncOptions(SyncOptions source) { factory ResolvedSyncOptions.resolve( SyncOptions? source, { Duration? crudThrottleTime, + Duration? retryDelay, Map? params, }) { return ResolvedSyncOptions((source ?? SyncOptions())._copyWith( crudThrottleTime: crudThrottleTime, + retryDelay: retryDelay, params: params, )); } diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index c81f9929..b2a4a935 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -245,7 +245,6 @@ class StreamingSyncImplementation implements StreamingSync { } assert(identical(_activeCrudUpload, completer)); - _nonLineSyncEvents.add(const UploadCompleted()); _activeCrudUpload = null; completer.complete(); }); From b1dbad156d20e8c20c21530578e4868b55c6bb38 Mon Sep 17 00:00:00 2001 From: Simon Binder Date: Wed, 4 Jun 2025 20:38:52 +0200 Subject: [PATCH 06/14] Fix start invocation --- packages/powersync_core/lib/src/sync/streaming_sync.dart | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index b2a4a935..b2e2f5bc 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -588,7 +588,13 @@ final class _ActiveRustStreamingIteration { Future syncIteration() async { try { - await _control('start', convert.json.encode(sync.options.params)); + await _control( + 'start', + convert.json.encode({ + 'parameters': sync.options.params, + 'schema': 'TODO: Pass-through schema (probably in serialized form)', + }), + ); assert(_completedStream.isCompleted, 'Should have started streaming'); await _completedStream.future; } finally { From c635e6bfc5c60dc9c1323421f919341e1fce24b9 Mon Sep 17 00:00:00 2001 From: Jorge Sardina Date: Fri, 6 Jun 2025 14:19:43 +0200 Subject: [PATCH 07/14] raw tables in schema --- .../native/native_powersync_database.dart | 4 ++ .../database/web/web_powersync_database.dart | 1 + packages/powersync_core/lib/src/schema.dart | 59 ++++++++++++++++++- .../lib/src/sync/streaming_sync.dart | 5 +- .../lib/src/web/sync_worker.dart | 1 + .../test/utils/abstract_test_utils.dart | 1 + 6 files changed, 68 insertions(+), 3 deletions(-) diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index 2f846fd5..af1ac201 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -247,6 +247,7 @@ class PowerSyncDatabaseImpl options, crudMutex.shared, syncMutex.shared, + schema, ), debugName: 'Sync ${database.openFactory.path}', onError: receiveUnhandledErrors.sendPort, @@ -290,6 +291,7 @@ class _PowerSyncDatabaseIsolateArgs { final ResolvedSyncOptions options; final SerializedMutex crudMutex; final SerializedMutex syncMutex; + final Schema schema; _PowerSyncDatabaseIsolateArgs( this.sPort, @@ -297,6 +299,7 @@ class _PowerSyncDatabaseIsolateArgs { this.options, this.crudMutex, this.syncMutex, + this.schema, ); } @@ -392,6 +395,7 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { final storage = BucketStorage(connection); final sync = StreamingSyncImplementation( adapter: storage, + schema: args.schema, connector: InternalConnector( getCredentialsCached: getCredentialsCached, prefetchCredentials: prefetchCredentials, diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 6b40a6a2..fb410caa 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -141,6 +141,7 @@ class PowerSyncDatabaseImpl sync = StreamingSyncImplementation( adapter: storage, + schema: schema, connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, options: options, diff --git a/packages/powersync_core/lib/src/schema.dart b/packages/powersync_core/lib/src/schema.dart index 4892ee6c..3f8722ef 100644 --- a/packages/powersync_core/lib/src/schema.dart +++ b/packages/powersync_core/lib/src/schema.dart @@ -8,10 +8,11 @@ import 'schema_logic.dart'; class Schema { /// List of tables in the schema. final List tables; + final List rawTables; - const Schema(this.tables); + const Schema(this.tables, {this.rawTables = const []}); - Map toJson() => {'tables': tables}; + Map toJson() => {'raw_tables': rawTables, 'tables': tables}; void validate() { Set tableNames = {}; @@ -315,6 +316,60 @@ class Column { Map toJson() => {'name': name, 'type': type.sqlite}; } +class RawTable { + final String + name; // TODO: it does not need to be the same name as the raw table + final PendingStatement put; + final PendingStatement delete; + + const RawTable( + this.name, + this.put, + this.delete, + ); + + Map toJson() => { + 'name': name, + 'put': put, + 'delete': delete, + }; +} + +class PendingStatement { + final String sql; + final List params; + + PendingStatement({required this.sql, required this.params}); + + Map toJson() => { + 'sql': sql, + 'params': params, + }; +} + +sealed class PendingStatementValue { + dynamic toJson(); +} + +class PendingStmtValueColumn extends PendingStatementValue { + final String column; + PendingStmtValueColumn(this.column); + + @override + dynamic toJson() { + return { + 'Column': column, + }; + } +} + +class PendingStmtValueId extends PendingStatementValue { + @override + dynamic toJson() { + return 'Id'; + } +} + /// Type of column. enum ColumnType { /// TEXT column. diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index b2e2f5bc..82527fd8 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -5,6 +5,7 @@ import 'dart:typed_data'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; +import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; import 'package:powersync_core/src/log_internal.dart'; @@ -32,6 +33,7 @@ abstract interface class StreamingSync { @internal class StreamingSyncImplementation implements StreamingSync { + final Schema? schema; //TODO(SkillDevs): pass in all implementations final BucketStorage adapter; final InternalConnector connector; final ResolvedSyncOptions options; @@ -62,6 +64,7 @@ class StreamingSyncImplementation implements StreamingSync { String? clientId; StreamingSyncImplementation({ + required this.schema, required this.adapter, required this.connector, required this.crudUpdateTriggerStream, @@ -592,7 +595,7 @@ final class _ActiveRustStreamingIteration { 'start', convert.json.encode({ 'parameters': sync.options.params, - 'schema': 'TODO: Pass-through schema (probably in serialized form)', + 'schema': sync.schema, }), ); assert(_completedStream.isCompleted, 'Should have started streaming'); diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index b5e8ed63..5ee3c4ca 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -264,6 +264,7 @@ class _SyncRunner { sync = StreamingSyncImplementation( adapter: WebBucketStorage(database), + schema: null, connector: InternalConnector( getCredentialsCached: client.channel.credentialsCallback, prefetchCredentials: ({required bool invalidate}) async { diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index 3ea4a319..b2dac843 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -153,6 +153,7 @@ extension MockSync on PowerSyncDatabase { }) { final impl = StreamingSyncImplementation( adapter: BucketStorage(this), + schema: null, client: client, options: ResolvedSyncOptions(options), connector: InternalConnector.wrap(connector, this), From af052b07d2a2696296bc2ed51bb53a39f473a480 Mon Sep 17 00:00:00 2001 From: David Martos Date: Sat, 7 Jun 2025 01:00:40 +0200 Subject: [PATCH 08/14] fix tests --- .../powersync_core/test/in_memory_sync_test.dart | 13 +++++++------ .../test/utils/abstract_test_utils.dart | 2 +- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/packages/powersync_core/test/in_memory_sync_test.dart b/packages/powersync_core/test/in_memory_sync_test.dart index 9455bc94..c735d817 100644 --- a/packages/powersync_core/test/in_memory_sync_test.dart +++ b/packages/powersync_core/test/in_memory_sync_test.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'package:async/async.dart'; import 'package:logging/logging.dart'; @@ -172,7 +173,7 @@ void _declareTests(String name, SyncOptions options) { 'object_type': 'a', 'object_id': '1', 'checksum': 0, - 'data': {}, + 'data': '{}', } ], } @@ -187,7 +188,7 @@ void _declareTests(String name, SyncOptions options) { 'object_type': 'b', 'object_id': '1', 'checksum': 0, - 'data': {}, + 'data': '{}', } ], } @@ -229,7 +230,7 @@ void _declareTests(String name, SyncOptions options) { 'data': [ { 'checksum': priority + 10, - 'data': {'name': 'test', 'email': 'email'}, + 'data': json.encode({'name': 'test', 'email': 'email'}), 'op': 'PUT', 'op_id': '${operationId++}', 'object_id': 'prio$priority', @@ -411,7 +412,7 @@ void _declareTests(String name, SyncOptions options) { 'data': [ { 'checksum': 0, - 'data': {'name': 'from local', 'email': 'local@example.org'}, + 'data': json.encode({'name': 'from local', 'email': 'local@example.org'}), 'op': 'PUT', 'op_id': '1', 'object_id': '1', @@ -419,7 +420,7 @@ void _declareTests(String name, SyncOptions options) { }, { 'checksum': 0, - 'data': {'name': 'additional', 'email': ''}, + 'data': json.encode({'name': 'additional', 'email': ''}), 'op': 'PUT', 'op_id': '2', 'object_id': '2', @@ -477,7 +478,7 @@ void _declareTests(String name, SyncOptions options) { 'object_type': bucket, 'object_id': '$lastOpId', 'checksum': 0, - 'data': {}, + 'data': '{}', } ], } diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index b2dac843..b0ebd7ba 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -153,7 +153,7 @@ extension MockSync on PowerSyncDatabase { }) { final impl = StreamingSyncImplementation( adapter: BucketStorage(this), - schema: null, + schema: schema, client: client, options: ResolvedSyncOptions(options), connector: InternalConnector.wrap(connector, this), From bf9f1e9e18bb097f3cdec454d8ecf0fce390e4a8 Mon Sep 17 00:00:00 2001 From: David Martos Date: Sun, 8 Jun 2025 01:21:29 +0200 Subject: [PATCH 09/14] manual schema management --- .../native/native_powersync_database.dart | 37 ++++++++--- .../lib/src/database/powersync_database.dart | 64 ++++++++++++------- .../powersync_database_impl_stub.dart | 15 +++-- .../lib/src/database/powersync_db_mixin.dart | 38 ++++++++++- .../database/web/web_powersync_database.dart | 28 ++++++-- 5 files changed, 140 insertions(+), 42 deletions(-) diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index af1ac201..f4be54e3 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -44,6 +44,9 @@ class PowerSyncDatabaseImpl @override SqliteDatabase database; + @override + bool manualSchemaManagement; + @override @protected late Future isInitialized; @@ -76,6 +79,7 @@ class PowerSyncDatabaseImpl required String path, int maxReaders = SqliteDatabase.defaultMaxReaders, Logger? logger, + bool manualSchemaManagement = false, @Deprecated("Use [PowerSyncDatabase.withFactory] instead.") // ignore: deprecated_member_use_from_same_package SqliteConnectionSetup? sqliteSetup}) { @@ -83,8 +87,13 @@ class PowerSyncDatabaseImpl DefaultSqliteOpenFactory factory = // ignore: deprecated_member_use_from_same_package PowerSyncOpenFactory(path: path, sqliteSetup: sqliteSetup); - return PowerSyncDatabaseImpl.withFactory(factory, - schema: schema, maxReaders: maxReaders, logger: logger); + return PowerSyncDatabaseImpl.withFactory( + factory, + schema: schema, + maxReaders: maxReaders, + logger: logger, + manualSchemaManagement: manualSchemaManagement, + ); } /// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory]. @@ -96,13 +105,19 @@ class PowerSyncDatabaseImpl /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds. factory PowerSyncDatabaseImpl.withFactory( - DefaultSqliteOpenFactory openFactory, - {required Schema schema, - int maxReaders = SqliteDatabase.defaultMaxReaders, - Logger? logger}) { + DefaultSqliteOpenFactory openFactory, { + required Schema schema, + int maxReaders = SqliteDatabase.defaultMaxReaders, + Logger? logger, + bool manualSchemaManagement = false, + }) { final db = SqliteDatabase.withFactory(openFactory, maxReaders: maxReaders); return PowerSyncDatabaseImpl.withDatabase( - schema: schema, database: db, logger: logger); + schema: schema, + database: db, + logger: logger, + manualSchemaManagement: manualSchemaManagement, + ); } /// Open a PowerSyncDatabase on an existing [SqliteDatabase]. @@ -110,8 +125,12 @@ class PowerSyncDatabaseImpl /// Migrations are run on the database when this constructor is called. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds.s - PowerSyncDatabaseImpl.withDatabase( - {required this.schema, required this.database, Logger? logger}) { + PowerSyncDatabaseImpl.withDatabase({ + required this.schema, + required this.database, + Logger? logger, + this.manualSchemaManagement = false, + }) { this.logger = logger ?? autoLogger; isInitialized = baseInit(); } diff --git a/packages/powersync_core/lib/src/database/powersync_database.dart b/packages/powersync_core/lib/src/database/powersync_database.dart index 95543ce8..eb220758 100644 --- a/packages/powersync_core/lib/src/database/powersync_database.dart +++ b/packages/powersync_core/lib/src/database/powersync_database.dart @@ -32,19 +32,23 @@ abstract class PowerSyncDatabase /// A maximum of [maxReaders] concurrent read transactions are allowed. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds. - factory PowerSyncDatabase( - {required Schema schema, - required String path, - Logger? logger, - @Deprecated("Use [PowerSyncDatabase.withFactory] instead.") - // ignore: deprecated_member_use_from_same_package - SqliteConnectionSetup? sqliteSetup}) { + factory PowerSyncDatabase({ + required Schema schema, + required String path, + Logger? logger, + bool manualSchemaManagement = false, + @Deprecated("Use [PowerSyncDatabase.withFactory] instead.") + // ignore: deprecated_member_use_from_same_package + SqliteConnectionSetup? sqliteSetup, + }) { return PowerSyncDatabaseImpl( - schema: schema, - path: path, - logger: logger, - // ignore: deprecated_member_use_from_same_package - sqliteSetup: sqliteSetup); + schema: schema, + path: path, + manualSchemaManagement: manualSchemaManagement, + logger: logger, + // ignore: deprecated_member_use_from_same_package + sqliteSetup: sqliteSetup, + ); } /// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory]. @@ -55,12 +59,20 @@ abstract class PowerSyncDatabase /// Subclass [PowerSyncOpenFactory] to add custom logic to this process. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds. - factory PowerSyncDatabase.withFactory(DefaultSqliteOpenFactory openFactory, - {required Schema schema, - int maxReaders = SqliteDatabase.defaultMaxReaders, - Logger? logger}) { - return PowerSyncDatabaseImpl.withFactory(openFactory, - schema: schema, maxReaders: maxReaders, logger: logger); + factory PowerSyncDatabase.withFactory( + DefaultSqliteOpenFactory openFactory, { + required Schema schema, + int maxReaders = SqliteDatabase.defaultMaxReaders, + bool manualSchemaManagement = false, + Logger? logger, + }) { + return PowerSyncDatabaseImpl.withFactory( + openFactory, + schema: schema, + maxReaders: maxReaders, + manualSchemaManagement: manualSchemaManagement, + logger: logger, + ); } /// Open a PowerSyncDatabase on an existing [SqliteDatabase]. @@ -68,11 +80,17 @@ abstract class PowerSyncDatabase /// Migrations are run on the database when this constructor is called. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds. - factory PowerSyncDatabase.withDatabase( - {required Schema schema, - required SqliteDatabase database, - Logger? loggers}) { + factory PowerSyncDatabase.withDatabase({ + required Schema schema, + required SqliteDatabase database, + bool manualSchemaManagement = false, + Logger? logger, + }) { return PowerSyncDatabaseImpl.withDatabase( - schema: schema, database: database, logger: loggers); + schema: schema, + database: database, + manualSchemaManagement: manualSchemaManagement, + logger: logger, + ); } } diff --git a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart index 2a795497..ee3ab2af 100644 --- a/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart +++ b/packages/powersync_core/lib/src/database/powersync_database_impl_stub.dart @@ -32,6 +32,9 @@ class PowerSyncDatabaseImpl @override SqliteDatabase get database => throw UnimplementedError(); + @override + bool get manualSchemaManagement => throw UnimplementedError(); + @override Future get isInitialized => throw UnimplementedError(); @@ -53,6 +56,7 @@ class PowerSyncDatabaseImpl {required Schema schema, required String path, int maxReaders = SqliteDatabase.defaultMaxReaders, + bool manualSchemaManagement = false, Logger? logger, @Deprecated("Use [PowerSyncDatabase.withFactory] instead.") // ignore: deprecated_member_use_from_same_package @@ -72,6 +76,7 @@ class PowerSyncDatabaseImpl DefaultSqliteOpenFactory openFactory, { required Schema schema, int maxReaders = SqliteDatabase.defaultMaxReaders, + bool manualSchemaManagement = false, Logger? logger, }) { throw UnimplementedError(); @@ -82,10 +87,12 @@ class PowerSyncDatabaseImpl /// Migrations are run on the database when this constructor is called. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds.s - factory PowerSyncDatabaseImpl.withDatabase( - {required Schema schema, - required SqliteDatabase database, - Logger? logger}) { + factory PowerSyncDatabaseImpl.withDatabase({ + required Schema schema, + required SqliteDatabase database, + bool manualSchemaManagement = false, + Logger? logger, + }) { throw UnimplementedError(); } diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 808efc71..ae63823e 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -38,6 +38,10 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Use [attachedLogger] to propagate logs to [Logger.root] for custom logging. Logger get logger; + bool get manualSchemaManagement; + + bool _manualSchemaManagementCompleted = false; + @Deprecated("This field is unused, pass params to connect() instead") Map? clientParams; @@ -110,10 +114,36 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { statusStream = statusStreamController.stream; updates = powerSyncUpdateNotifications(database.updates); + _manualSchemaManagementCompleted = false; + await database.initialize(); await _checkVersion(); await database.execute('SELECT powersync_init()'); - await updateSchema(schema); + + if (!manualSchemaManagement) { + // Create the internal db schema + await updateSchema(schema); + await _afterSchemaReady(); + } + } + + Future markSchemaAsReady() async { + await isInitialized; + _manualSchemaManagementCompleted = true; + + await _afterSchemaReady(); + } + + void _assertSchemaIsReady() { + if (!manualSchemaManagement || _manualSchemaManagementCompleted) { + return; + } + + throw AssertionError( + 'In manual schema management mode, you need to mark the powersync database as ready'); + } + + Future _afterSchemaReady() async { await _updateHasSynced(); } @@ -289,6 +319,8 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { // the lock for the connection. await initialize(); + _assertSchemaIsReady(); + final resolvedOptions = ResolvedSyncOptions.resolve( options, crudThrottleTime: crudThrottleTime, @@ -452,6 +484,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Get an unique id for this client. /// This id is only reset when the database is deleted. Future getClientId() async { + _assertSchemaIsReady(); // TODO(skilldevs): Needed? final row = await get('SELECT powersync_client_id() as client_id'); return row['client_id'] as String; } @@ -459,6 +492,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Get upload queue size estimate and count. Future getUploadQueueStats( {bool includeSize = false}) async { + _assertSchemaIsReady(); if (includeSize) { final row = await getOptional( 'SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud'); @@ -486,6 +520,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// data by transaction. One batch may contain data from multiple transactions, /// and a single transaction may be split over multiple batches. Future getCrudBatch({int limit = 100}) async { + _assertSchemaIsReady(); final rows = await getAll( 'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [limit + 1]); @@ -532,6 +567,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Unlike [getCrudBatch], this only returns data from a single transaction at a time. /// All data for the transaction is loaded into memory. Future getNextCrudTransaction() async { + _assertSchemaIsReady(); return await readTransaction((tx) async { final first = await tx.getOptional( 'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1'); diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index fb410caa..8132d8ce 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -38,6 +38,9 @@ class PowerSyncDatabaseImpl @override SqliteDatabase database; + @override + bool manualSchemaManagement; + @override @protected late Future isInitialized; @@ -69,14 +72,20 @@ class PowerSyncDatabaseImpl {required Schema schema, required String path, int maxReaders = SqliteDatabase.defaultMaxReaders, + bool manualSchemaManagement = false, Logger? logger, @Deprecated("Use [PowerSyncDatabase.withFactory] instead.") // ignore: deprecated_member_use_from_same_package SqliteConnectionSetup? sqliteSetup}) { // ignore: deprecated_member_use_from_same_package DefaultSqliteOpenFactory factory = PowerSyncOpenFactory(path: path); - return PowerSyncDatabaseImpl.withFactory(factory, - maxReaders: maxReaders, logger: logger, schema: schema); + return PowerSyncDatabaseImpl.withFactory( + factory, + maxReaders: maxReaders, + logger: logger, + schema: schema, + manualSchemaManagement: manualSchemaManagement, + ); } /// Open a [PowerSyncDatabase] with a [PowerSyncOpenFactory]. @@ -91,10 +100,15 @@ class PowerSyncDatabaseImpl DefaultSqliteOpenFactory openFactory, {required Schema schema, int maxReaders = SqliteDatabase.defaultMaxReaders, + bool manualSchemaManagement = false, Logger? logger}) { final db = SqliteDatabase.withFactory(openFactory, maxReaders: 1); return PowerSyncDatabaseImpl.withDatabase( - schema: schema, logger: logger, database: db); + schema: schema, + manualSchemaManagement: manualSchemaManagement, + logger: logger, + database: db, + ); } /// Open a PowerSyncDatabase on an existing [SqliteDatabase]. @@ -102,8 +116,12 @@ class PowerSyncDatabaseImpl /// Migrations are run on the database when this constructor is called. /// /// [logger] defaults to [autoLogger], which logs to the console in debug builds. - PowerSyncDatabaseImpl.withDatabase( - {required this.schema, required this.database, Logger? logger}) { + PowerSyncDatabaseImpl.withDatabase({ + required this.schema, + required this.database, + this.manualSchemaManagement = false, + Logger? logger, + }) { if (logger != null) { this.logger = logger; } else { From 54cf1b9bc1865ea764c7acfc876f629491f122d6 Mon Sep 17 00:00:00 2001 From: David Martos Date: Sun, 8 Jun 2025 12:12:20 +0200 Subject: [PATCH 10/14] name params --- packages/powersync_core/lib/src/schema.dart | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/powersync_core/lib/src/schema.dart b/packages/powersync_core/lib/src/schema.dart index 3f8722ef..575aacc6 100644 --- a/packages/powersync_core/lib/src/schema.dart +++ b/packages/powersync_core/lib/src/schema.dart @@ -322,11 +322,11 @@ class RawTable { final PendingStatement put; final PendingStatement delete; - const RawTable( - this.name, - this.put, - this.delete, - ); + const RawTable({ + required this.name, + required this.put, + required this.delete, + }); Map toJson() => { 'name': name, From d287e3d5fa16d98e5db7173dfb29bc69dff62891 Mon Sep 17 00:00:00 2001 From: Jorge Sardina Date: Wed, 11 Jun 2025 12:22:20 +0200 Subject: [PATCH 11/14] pass schema to sync worker --- .../native/native_powersync_database.dart | 9 ++++--- .../database/web/web_powersync_database.dart | 3 ++- .../lib/src/sync/streaming_sync.dart | 7 +++-- .../lib/src/web/sync_controller.dart | 5 +++- .../lib/src/web/sync_worker.dart | 27 ++++++++++++------- .../lib/src/web/sync_worker_protocol.dart | 6 ++++- .../test/utils/abstract_test_utils.dart | 6 +++-- 7 files changed, 41 insertions(+), 22 deletions(-) diff --git a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart index f4be54e3..94e3669b 100644 --- a/packages/powersync_core/lib/src/database/native/native_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/native/native_powersync_database.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'dart:isolate'; import 'package:meta/meta.dart'; @@ -266,7 +267,7 @@ class PowerSyncDatabaseImpl options, crudMutex.shared, syncMutex.shared, - schema, + jsonEncode(schema), ), debugName: 'Sync ${database.openFactory.path}', onError: receiveUnhandledErrors.sendPort, @@ -310,7 +311,7 @@ class _PowerSyncDatabaseIsolateArgs { final ResolvedSyncOptions options; final SerializedMutex crudMutex; final SerializedMutex syncMutex; - final Schema schema; + final String schemaJson; _PowerSyncDatabaseIsolateArgs( this.sPort, @@ -318,7 +319,7 @@ class _PowerSyncDatabaseIsolateArgs { this.options, this.crudMutex, this.syncMutex, - this.schema, + this.schemaJson, ); } @@ -414,7 +415,7 @@ Future _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async { final storage = BucketStorage(connection); final sync = StreamingSyncImplementation( adapter: storage, - schema: args.schema, + schemaJson: args.schemaJson, connector: InternalConnector( getCredentialsCached: getCredentialsCached, prefetchCredentials: prefetchCredentials, diff --git a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart index 8132d8ce..a879d9e0 100644 --- a/packages/powersync_core/lib/src/database/web/web_powersync_database.dart +++ b/packages/powersync_core/lib/src/database/web/web_powersync_database.dart @@ -1,4 +1,5 @@ import 'dart:async'; +import 'dart:convert'; import 'package:meta/meta.dart'; import 'package:http/browser_client.dart'; import 'package:logging/logging.dart'; @@ -159,7 +160,7 @@ class PowerSyncDatabaseImpl sync = StreamingSyncImplementation( adapter: storage, - schema: schema, + schemaJson: jsonEncode(schema), connector: InternalConnector.wrap(connector, this), crudUpdateTriggerStream: crudStream, options: options, diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 82527fd8..b4dbe068 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -5,7 +5,6 @@ import 'dart:typed_data'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; import 'package:meta/meta.dart'; -import 'package:powersync_core/powersync_core.dart'; import 'package:powersync_core/src/abort_controller.dart'; import 'package:powersync_core/src/exceptions.dart'; import 'package:powersync_core/src/log_internal.dart'; @@ -33,7 +32,7 @@ abstract interface class StreamingSync { @internal class StreamingSyncImplementation implements StreamingSync { - final Schema? schema; //TODO(SkillDevs): pass in all implementations + final String schemaJson; final BucketStorage adapter; final InternalConnector connector; final ResolvedSyncOptions options; @@ -64,7 +63,7 @@ class StreamingSyncImplementation implements StreamingSync { String? clientId; StreamingSyncImplementation({ - required this.schema, + required this.schemaJson, required this.adapter, required this.connector, required this.crudUpdateTriggerStream, @@ -595,7 +594,7 @@ final class _ActiveRustStreamingIteration { 'start', convert.json.encode({ 'parameters': sync.options.params, - 'schema': sync.schema, + 'schema': convert.json.decode(sync.schemaJson), }), ); assert(_completedStream.isCompleted, 'Should have started streaming'); diff --git a/packages/powersync_core/lib/src/web/sync_controller.dart b/packages/powersync_core/lib/src/web/sync_controller.dart index 0c26252e..7f05cff3 100644 --- a/packages/powersync_core/lib/src/web/sync_controller.dart +++ b/packages/powersync_core/lib/src/web/sync_controller.dart @@ -113,6 +113,9 @@ class SyncWorkerHandle implements StreamingSync { @override Future streamingSync() async { await _channel.startSynchronization( - database.database.openFactory.path, ResolvedSyncOptions(options)); + database.database.openFactory.path, + ResolvedSyncOptions(options), + database.schema, + ); } } diff --git a/packages/powersync_core/lib/src/web/sync_worker.dart b/packages/powersync_core/lib/src/web/sync_worker.dart index 5ee3c4ca..ddc4eaf0 100644 --- a/packages/powersync_core/lib/src/web/sync_worker.dart +++ b/packages/powersync_core/lib/src/web/sync_worker.dart @@ -45,12 +45,16 @@ class _SyncWorker { }); } - _SyncRunner referenceSyncTask( - String databaseIdentifier, SyncOptions options, _ConnectedClient client) { + _SyncRunner referenceSyncTask(String databaseIdentifier, SyncOptions options, + String schemaJson, _ConnectedClient client) { return _requestedSyncTasks.putIfAbsent(databaseIdentifier, () { return _SyncRunner(databaseIdentifier); }) - ..registerClient(client, options); + ..registerClient( + client, + options, + schemaJson, + ); } } @@ -86,8 +90,8 @@ class _ConnectedClient { }, ); - _runner = _worker.referenceSyncTask( - request.databaseName, recoveredOptions, this); + _runner = _worker.referenceSyncTask(request.databaseName, + recoveredOptions, request.schemaJson, this); return (JSObject(), null); case SyncWorkerMessageType.abortSynchronization: _runner?.disconnectClient(this); @@ -128,6 +132,7 @@ class _ConnectedClient { class _SyncRunner { final String identifier; ResolvedSyncOptions options = ResolvedSyncOptions(SyncOptions()); + String schemaJson = '{}'; final StreamGroup<_RunnerEvent> _group = StreamGroup(); final StreamController<_RunnerEvent> _mainEvents = StreamController(); @@ -146,10 +151,12 @@ class _SyncRunner { case _AddConnection( :final client, :final options, + :final schemaJson, ): connections.add(client); final (newOptions, reconnect) = this.options.applyFrom(options); this.options = newOptions; + this.schemaJson = schemaJson; if (sync == null) { await _requestDatabase(client); @@ -264,7 +271,7 @@ class _SyncRunner { sync = StreamingSyncImplementation( adapter: WebBucketStorage(database), - schema: null, + schemaJson: client._runner!.schemaJson, connector: InternalConnector( getCredentialsCached: client.channel.credentialsCallback, prefetchCredentials: ({required bool invalidate}) async { @@ -287,8 +294,9 @@ class _SyncRunner { sync!.streamingSync(); } - void registerClient(_ConnectedClient client, SyncOptions options) { - _mainEvents.add(_AddConnection(client, options)); + void registerClient( + _ConnectedClient client, SyncOptions options, String schemaJson) { + _mainEvents.add(_AddConnection(client, options, schemaJson)); } /// Remove a client, disconnecting if no clients remain.. @@ -307,8 +315,9 @@ sealed class _RunnerEvent {} final class _AddConnection implements _RunnerEvent { final _ConnectedClient client; final SyncOptions options; + final String schemaJson; - _AddConnection(this.client, this.options); + _AddConnection(this.client, this.options, this.schemaJson); } final class _RemoveConnection implements _RunnerEvent { diff --git a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart index 2b859e53..3c64d90f 100644 --- a/packages/powersync_core/lib/src/web/sync_worker_protocol.dart +++ b/packages/powersync_core/lib/src/web/sync_worker_protocol.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:js_interop'; import 'package:logging/logging.dart'; +import 'package:powersync_core/src/schema.dart'; import 'package:powersync_core/src/sync/options.dart'; import 'package:web/web.dart'; @@ -71,6 +72,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { required int requestId, required int retryDelayMs, required String implementationName, + required String schemaJson, String? syncParamsEncoded, }); @@ -79,6 +81,7 @@ extension type StartSynchronization._(JSObject _) implements JSObject { external int get crudThrottleTimeMs; external int? get retryDelayMs; external String? get implementationName; + external String get schemaJson; external String? get syncParamsEncoded; } @@ -410,7 +413,7 @@ final class WorkerCommunicationChannel { } Future startSynchronization( - String databaseName, ResolvedSyncOptions options) async { + String databaseName, ResolvedSyncOptions options, Schema schema) async { final (id, completion) = _newRequest(); port.postMessage(SyncWorkerMessage( type: SyncWorkerMessageType.startSynchronization.name, @@ -420,6 +423,7 @@ final class WorkerCommunicationChannel { retryDelayMs: options.retryDelay.inMilliseconds, requestId: id, implementationName: options.source.syncImplementation.name, + schemaJson: jsonEncode(schema), syncParamsEncoded: switch (options.source.params) { null => null, final params => jsonEncode(params), diff --git a/packages/powersync_core/test/utils/abstract_test_utils.dart b/packages/powersync_core/test/utils/abstract_test_utils.dart index b0ebd7ba..f402bb5f 100644 --- a/packages/powersync_core/test/utils/abstract_test_utils.dart +++ b/packages/powersync_core/test/utils/abstract_test_utils.dart @@ -1,3 +1,5 @@ +import 'dart:convert'; + import 'package:http/http.dart'; import 'package:logging/logging.dart'; import 'package:powersync_core/powersync_core.dart'; @@ -74,7 +76,7 @@ abstract mixin class TestPowerSyncFactory implements PowerSyncOpenFactory { schema: schema, database: SqliteDatabase.singleConnection( SqliteConnection.synchronousWrapper(raw)), - loggers: logger, + logger: logger, ); } } @@ -153,7 +155,7 @@ extension MockSync on PowerSyncDatabase { }) { final impl = StreamingSyncImplementation( adapter: BucketStorage(this), - schema: schema, + schemaJson: jsonEncode(schema), client: client, options: ResolvedSyncOptions(options), connector: InternalConnector.wrap(connector, this), From 58de160ae028bd669ea7ca99993ca70349c8ac4e Mon Sep 17 00:00:00 2001 From: Jorge Sardina Date: Wed, 11 Jun 2025 12:37:25 +0200 Subject: [PATCH 12/14] review comments --- .../lib/src/database/powersync_db_mixin.dart | 14 +++++++------- packages/powersync_core/lib/src/schema.dart | 18 +++++++++++------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index ae63823e..64744797 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -134,12 +134,12 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { await _afterSchemaReady(); } - void _assertSchemaIsReady() { + void _checkSchemaIsReady() { if (!manualSchemaManagement || _manualSchemaManagementCompleted) { return; } - throw AssertionError( + throw StateError( 'In manual schema management mode, you need to mark the powersync database as ready'); } @@ -319,7 +319,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { // the lock for the connection. await initialize(); - _assertSchemaIsReady(); + _checkSchemaIsReady(); final resolvedOptions = ResolvedSyncOptions.resolve( options, @@ -484,7 +484,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Get an unique id for this client. /// This id is only reset when the database is deleted. Future getClientId() async { - _assertSchemaIsReady(); // TODO(skilldevs): Needed? + _checkSchemaIsReady(); // TODO(skilldevs): Needed? final row = await get('SELECT powersync_client_id() as client_id'); return row['client_id'] as String; } @@ -492,7 +492,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Get upload queue size estimate and count. Future getUploadQueueStats( {bool includeSize = false}) async { - _assertSchemaIsReady(); + _checkSchemaIsReady(); if (includeSize) { final row = await getOptional( 'SELECT SUM(cast(data as blob) + 20) as size, count(*) as count FROM ps_crud'); @@ -520,7 +520,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// data by transaction. One batch may contain data from multiple transactions, /// and a single transaction may be split over multiple batches. Future getCrudBatch({int limit = 100}) async { - _assertSchemaIsReady(); + _checkSchemaIsReady(); final rows = await getAll( 'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT ?', [limit + 1]); @@ -567,7 +567,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Unlike [getCrudBatch], this only returns data from a single transaction at a time. /// All data for the transaction is loaded into memory. Future getNextCrudTransaction() async { - _assertSchemaIsReady(); + _checkSchemaIsReady(); return await readTransaction((tx) async { final first = await tx.getOptional( 'SELECT id, tx_id, data FROM ps_crud ORDER BY id ASC LIMIT 1'); diff --git a/packages/powersync_core/lib/src/schema.dart b/packages/powersync_core/lib/src/schema.dart index 575aacc6..d5273c38 100644 --- a/packages/powersync_core/lib/src/schema.dart +++ b/packages/powersync_core/lib/src/schema.dart @@ -316,9 +316,8 @@ class Column { Map toJson() => {'name': name, 'type': type.sqlite}; } -class RawTable { - final String - name; // TODO: it does not need to be the same name as the raw table +final class RawTable { + final String name; final PendingStatement put; final PendingStatement delete; @@ -335,7 +334,7 @@ class RawTable { }; } -class PendingStatement { +final class PendingStatement { final String sql; final List params; @@ -348,12 +347,15 @@ class PendingStatement { } sealed class PendingStatementValue { + factory PendingStatementValue.id() = _PendingStmtValueId; + factory PendingStatementValue.column(String column) = _PendingStmtValueColumn; + dynamic toJson(); } -class PendingStmtValueColumn extends PendingStatementValue { +class _PendingStmtValueColumn implements PendingStatementValue { final String column; - PendingStmtValueColumn(this.column); + const _PendingStmtValueColumn(this.column); @override dynamic toJson() { @@ -363,7 +365,9 @@ class PendingStmtValueColumn extends PendingStatementValue { } } -class PendingStmtValueId extends PendingStatementValue { +class _PendingStmtValueId implements PendingStatementValue { + const _PendingStmtValueId(); + @override dynamic toJson() { return 'Id'; From 23d6ffad7f62b6bb8b4ba5f980a34fc627a08078 Mon Sep 17 00:00:00 2001 From: David Martos Date: Mon, 7 Jul 2025 11:02:52 +0200 Subject: [PATCH 13/14] remove todo --- .../powersync_core/lib/src/database/powersync_db_mixin.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart index 64744797..ad14dd89 100644 --- a/packages/powersync_core/lib/src/database/powersync_db_mixin.dart +++ b/packages/powersync_core/lib/src/database/powersync_db_mixin.dart @@ -484,7 +484,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection { /// Get an unique id for this client. /// This id is only reset when the database is deleted. Future getClientId() async { - _checkSchemaIsReady(); // TODO(skilldevs): Needed? + _checkSchemaIsReady(); final row = await get('SELECT powersync_client_id() as client_id'); return row['client_id'] as String; } From 4abeb7b5e28eaced0475f93d0c1cef16ebcb7158 Mon Sep 17 00:00:00 2001 From: David Martos Date: Mon, 7 Jul 2025 11:04:13 +0200 Subject: [PATCH 14/14] format --- packages/powersync_core/lib/src/sync/streaming_sync.dart | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/powersync_core/lib/src/sync/streaming_sync.dart b/packages/powersync_core/lib/src/sync/streaming_sync.dart index 35ffad1d..8646d9a7 100644 --- a/packages/powersync_core/lib/src/sync/streaming_sync.dart +++ b/packages/powersync_core/lib/src/sync/streaming_sync.dart @@ -598,7 +598,11 @@ final class _ActiveRustStreamingIteration { Future syncIteration() async { try { await _control( - 'start', convert.json.encode({'parameters': sync.options.params, 'schema': convert.json.decode(sync.schemaJson),})); + 'start', + convert.json.encode({ + 'parameters': sync.options.params, + 'schema': convert.json.decode(sync.schemaJson), + })); assert(_completedStream.isCompleted, 'Should have started streaming'); await _completedStream.future; } finally {