Skip to content

Commit a6c1856

Browse files
committed
Add tests
1 parent 5467e0d commit a6c1856

File tree

3 files changed

+91
-4
lines changed

3 files changed

+91
-4
lines changed

packages/powersync_core/test/in_memory_sync_test.dart

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import 'package:powersync_core/powersync_core.dart';
77
import 'package:powersync_core/sqlite3_common.dart';
88
import 'package:powersync_core/src/sync/streaming_sync.dart';
99
import 'package:powersync_core/src/sync/protocol.dart';
10+
import 'package:shelf/shelf.dart';
11+
import 'package:shelf_router/shelf_router.dart';
1012
import 'package:test/test.dart';
1113

1214
import 'bucket_storage_test.dart';
@@ -63,7 +65,7 @@ void _declareTests(String name, SyncOptions options, bool bson) {
6365

6466
void createSyncClient({Schema? schema}) {
6567
final (client, server) = inMemoryServer();
66-
server.mount(syncService.router.call);
68+
server.mount((req) => syncService.router(req));
6769

6870
final thisSyncClient = syncClient = database.connectWithMockService(
6971
client,
@@ -937,6 +939,46 @@ void _declareTests(String name, SyncOptions options, bool bson) {
937939

938940
expect(await query.next, 'from server');
939941
});
942+
943+
group('abort', () {
944+
test('during connect', () async {
945+
final requestStarted = Completer<void>();
946+
947+
syncService.router = Router()
948+
..post('/sync/stream', expectAsync1((Request request) async {
949+
requestStarted.complete();
950+
951+
// emulate a network that never connects
952+
await Completer<void>().future;
953+
}));
954+
955+
syncClient.streamingSync();
956+
await requestStarted.future;
957+
expect(database.currentStatus, isSyncStatus(connecting: true));
958+
959+
await syncClient.abort();
960+
expect(database.currentStatus.anyError, isNull);
961+
});
962+
963+
test('during stream', () async {
964+
final status = await waitForConnection();
965+
syncService.addLine({
966+
'checkpoint': {
967+
'last_op_id': '0',
968+
'buckets': [
969+
{
970+
'bucket': 'bkt',
971+
'checksum': 0,
972+
}
973+
],
974+
},
975+
});
976+
await expectLater(status, emits(isSyncStatus(downloading: true)));
977+
978+
await syncClient.abort();
979+
expect(database.currentStatus.anyError, isNull);
980+
});
981+
});
940982
});
941983
}
942984

packages/powersync_core/test/server/sync_server/in_memory_sync_server.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ final class MockSyncService {
1414
StreamController();
1515
Completer<Request> _listener = Completer();
1616

17-
final router = Router();
17+
var router = Router();
1818
Object? Function() writeCheckpoint = () {
1919
return {
2020
'data': {'write_checkpoint': '10'}

packages/powersync_core/test/utils/in_memory_http.dart

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,17 +35,29 @@ final class _MockServer implements shelf.Server {
3535

3636
Future<StreamedResponse> handleRequest(
3737
BaseRequest request, ByteStream body) async {
38+
final cancellationFuture = switch (request) {
39+
Abortable(:final abortTrigger) => abortTrigger,
40+
_ => null,
41+
};
42+
3843
if (_handler case final endpoint?) {
3944
final shelfRequest = shelf.Request(
4045
request.method,
4146
request.url,
4247
headers: request.headers,
4348
body: body,
4449
);
45-
final shelfResponse = await endpoint(shelfRequest);
50+
51+
final shelfResponse = await Future.any<shelf.Response>([
52+
Future.sync(() => endpoint(shelfRequest)),
53+
if (cancellationFuture != null)
54+
cancellationFuture.then((_) {
55+
throw RequestAbortedException();
56+
}),
57+
]);
4658

4759
return StreamedResponse(
48-
shelfResponse.read(),
60+
shelfResponse.read().injectCancellation(cancellationFuture),
4961
shelfResponse.statusCode,
5062
headers: shelfResponse.headers,
5163
);
@@ -54,3 +66,36 @@ final class _MockServer implements shelf.Server {
5466
}
5567
}
5668
}
69+
70+
extension<T> on Stream<T> {
71+
Stream<T> injectCancellation(Future<void>? token) {
72+
if (token == null) {
73+
return this;
74+
}
75+
76+
return Stream.multi(
77+
(listener) {
78+
final subscription = listen(
79+
listener.addSync,
80+
onError: listener.addErrorSync,
81+
onDone: listener.closeSync,
82+
);
83+
84+
listener
85+
..onPause = subscription.pause
86+
..onResume = subscription.resume
87+
..onCancel = subscription.cancel;
88+
89+
token.whenComplete(() {
90+
if (!listener.isClosed) {
91+
listener
92+
..addErrorSync(RequestAbortedException())
93+
..closeSync();
94+
subscription.cancel();
95+
}
96+
});
97+
},
98+
isBroadcast: isBroadcast,
99+
);
100+
}
101+
}

0 commit comments

Comments
 (0)