Skip to content

Commit d4f676b

Browse files
committed
Update subscription state while offline
1 parent bd3b435 commit d4f676b

File tree

3 files changed

+32
-21
lines changed

3 files changed

+32
-21
lines changed

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import 'dart:async';
2-
import 'dart:convert';
32

43
import 'package:logging/logging.dart';
54
import 'package:meta/meta.dart';
@@ -15,8 +14,6 @@ import 'package:powersync_core/src/schema.dart';
1514
import 'package:powersync_core/src/schema_logic.dart';
1615
import 'package:powersync_core/src/schema_logic.dart' as schema_logic;
1716
import 'package:powersync_core/src/sync/connection_manager.dart';
18-
import 'package:powersync_core/src/sync/instruction.dart';
19-
import 'package:powersync_core/src/sync/mutable_sync_status.dart';
2017
import 'package:powersync_core/src/sync/options.dart';
2118
import 'package:powersync_core/src/sync/sync_status.dart';
2219

@@ -108,7 +105,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
108105
await _checkVersion();
109106
await database.execute('SELECT powersync_init()');
110107
await updateSchema(schema);
111-
await _updateHasSynced();
108+
await _connections.resolveOfflineSyncStatus();
112109
}
113110

114111
/// Check that a supported version of the powersync extension is loaded.
@@ -134,23 +131,6 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
134131
return isInitialized;
135132
}
136133

137-
Future<List<SyncStream>> get subscribedStreams {
138-
throw UnimplementedError();
139-
}
140-
141-
Future<void> _updateHasSynced() async {
142-
// Query the database to see if any data has been synced.
143-
final row = await database.get(
144-
'SELECT powersync_offline_sync_status() AS r;',
145-
);
146-
147-
final status = CoreSyncStatus.fromJson(
148-
json.decode(row['r'] as String) as Map<String, Object?>);
149-
150-
setStatus((MutableSyncStatus()..applyFromCore(status))
151-
.immutableSnapshot(setLastSynced: true));
152-
}
153-
154134
/// Returns a [Future] which will resolve once at least one full sync cycle
155135
/// has completed (meaninng that the first consistent checkpoint has been
156136
/// reached across all buckets).

packages/powersync_core/lib/src/sync/connection_manager.dart

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import 'package:powersync_core/src/sync/options.dart';
1010
import 'package:powersync_core/src/sync/stream.dart';
1111
import 'package:powersync_core/src/sync/sync_status.dart';
1212

13+
import 'instruction.dart';
14+
import 'mutable_sync_status.dart';
1315
import 'streaming_sync.dart';
1416

1517
/// A (stream name, JSON parameters) pair that uniquely identifies a stream
@@ -252,6 +254,15 @@ final class ConnectionManager {
252254
'priority': priority,
253255
},
254256
});
257+
258+
await _activeGroup.syncMutex.lock(() async {
259+
if (_abortActiveSync == null) {
260+
// Since we're not connected, update the offline sync status to reflect
261+
// the new subscription.
262+
// With a connection, the sync client would include it in its state.
263+
await resolveOfflineSyncStatus();
264+
}
265+
});
255266
}
256267

257268
Future<void> unsubscribeAll({
@@ -266,6 +277,18 @@ final class ConnectionManager {
266277
});
267278
}
268279

280+
Future<void> resolveOfflineSyncStatus() async {
281+
final row = await db.database.get(
282+
'SELECT powersync_offline_sync_status() AS r;',
283+
);
284+
285+
final status = CoreSyncStatus.fromJson(
286+
json.decode(row['r'] as String) as Map<String, Object?>);
287+
288+
manuallyChangeSyncStatus((MutableSyncStatus()..applyFromCore(status))
289+
.immutableSnapshot(setLastSynced: true));
290+
}
291+
269292
SyncStream syncStream(String name, Map<String, Object?>? parameters) {
270293
return _SyncStreamImplementation(this, name, parameters);
271294
}

packages/powersync_core/test/sync/stream_test.dart

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,4 +211,12 @@ void main() {
211211
await pumpEventQueue();
212212
expect(syncService.controller.hasListener, isTrue);
213213
});
214+
215+
test('subscriptions update while offline', () async {
216+
final stream = StreamQueue(database.statusStream);
217+
218+
final subscription = await database.syncStream('foo').subscribe();
219+
var status = await stream.next;
220+
expect(status.statusFor(subscription), isNotNull);
221+
});
214222
}

0 commit comments

Comments
 (0)