Skip to content

Commit fd0175d

Browse files
Refactor local storage implementation in powersync_attachments_stream package. Updated attachment queue initialization to use localStorage, and enhanced README for clarity on storage handling. Added tests for edge cases and robustness in local storage operations.
1 parent 1f696a0 commit fd0175d

File tree

11 files changed

+191
-196
lines changed

11 files changed

+191
-196
lines changed

demos/supabase-todolist/lib/attachments/photo_widget.dart

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import 'dart:io';
2-
2+
import 'package:path_provider/path_provider.dart';
3+
import 'package:path/path.dart' as p;
34
import 'package:flutter/material.dart';
45
import 'package:powersync_attachments_stream/powersync_attachments_stream.dart';
56
import 'package:powersync_flutter_demo/attachments/camera_helpers.dart';
@@ -37,7 +38,8 @@ class _PhotoWidgetState extends State<PhotoWidget> {
3738
if (photoId == null) {
3839
return _ResolvedPhotoState(photoPath: null, fileExists: false);
3940
}
40-
photoPath = await attachmentQueue.getLocalUri('$photoId.jpg');
41+
final appDocDir = await getApplicationDocumentsDirectory();
42+
photoPath = p.join(appDocDir.path, '$photoId.jpg');
4143

4244
bool fileExists = await File(photoPath).exists();
4345

demos/supabase-todolist/lib/attachments/queue.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ Future<void> initializeAttachmentQueue(PowerSyncDatabase db) async {
1919
db: db,
2020
remoteStorage: remoteStorage,
2121
logger: log,
22-
attachmentsDirectory: '${appDocDir.path}/attachments',
22+
localStorage: IOLocalStorage(appDocDir.path),
23+
// attachmentsDirectory: '${appDocDir.path}/attachments',
2324
watchAttachments: () => db.watch('''
2425
SELECT photo_id as id FROM todos WHERE photo_id IS NOT NULL
2526
''').map((results) => results

packages/powersync_attachments_stream/README.md

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -44,11 +44,12 @@ Schema schema = Schema(([
4444

4545
```dart
4646
final Directory appDocDir = await getApplicationDocumentsDirectory();
47+
final localStorage = IOLocalStorage('${appDocDir.path}/attachments');
4748
4849
final queue = AttachmentQueue(
4950
db: db,
5051
remoteStorage: remoteStorage,
51-
attachmentsDirectory: '${appDocDir.path}/attachments',
52+
localStorage: localStorage,
5253
watchAttachments: () => db.watch('''
5354
SELECT photo_id as id FROM todos WHERE photo_id IS NOT NULL
5455
''').map((results) => results
@@ -60,7 +61,7 @@ final queue = AttachmentQueue(
6061
);
6162
```
6263

63-
* The `attachmentsDirectory` specifies where local attachment files should be stored. This directory needs to be provided to the constructor. In Flutter, `path_provider`'s `getApplicationDocumentsDirectory()` with a subdirectory like `/attachments` is a good choice.
64+
* The `localStorage` is an implementation of `AbstractLocalStorageAdapter` that specifies where and how local attachment files should be stored. For mobile and desktop apps, `IOLocalStorage` can be used, which requires a directory path. In Flutter, `path_provider`'s `getApplicationDocumentsDirectory()` with a subdirectory like `/attachments` is a good choice.
6465
* The `remoteStorage` is responsible for connecting to the attachments backend. See the `RemoteStorageAdapter` interface definition [here](https://github.com/powersync-ja/powersync.dart/blob/main/packages/powersync_attachments_stream/lib/src/abstractions/remote_storage.dart).
6566
* `watchAttachments` is a `Stream` of `WatchedAttachmentItem`. The `WatchedAttachmentItem`s represent the attachments which should be present in the application. We recommend using `PowerSync`'s `watch` query as shown above. In this example, we provide the `fileExtension` for all photos. This information could also be obtained from the query if necessary.
6667

@@ -69,7 +70,7 @@ final queue = AttachmentQueue(
6970
```dart
7071
final remote = _RemoteStorageAdapter();
7172
72-
class _RemoteStorageAdapter implements RemoteStorageAdapter {
73+
class _RemoteStorageAdapter implements AbstractRemoteStorageAdapter {
7374
@override
7475
Future<void> uploadFile(Stream<List<int>> fileData, Attachment attachment) async {
7576
// TODO: Implement upload to your backend
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
include: package:flutter_lints/flutter.yaml
22

3+
analyzer:
4+
language:
5+
strict-casts: true
6+
strict-inference: true
7+
strict-raw-types: true
8+
39
# Additional information about this file can be found at
410
# https://dart.dev/guides/language/analysis-options

packages/powersync_attachments_stream/lib/src/abstractions/local_storage.dart

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,19 +26,9 @@ abstract class AbstractLocalStorageAdapter {
2626
/// Returns true if the file exists, false otherwise
2727
Future<bool> fileExists(String filePath);
2828

29-
/// Creates a directory at the specified path
30-
///
31-
/// [path] - Path of the directory to create
32-
Future<void> makeDir(String path);
33-
34-
/// Recursively removes a directory and its contents
35-
///
36-
/// [path] - Path of the directory to remove
37-
Future<void> rmDir(String path);
29+
/// Initializes the storage, performing any necessary setup.
30+
Future<void> initialize();
3831

39-
/// Copies a file from source to target path
40-
///
41-
/// [sourcePath] - Path of the source file
42-
/// [targetPath] - Path where the file will be copied
43-
Future<void> copyFile(String sourcePath, String targetPath);
32+
/// Clears all data from the storage.
33+
Future<void> clear();
4434
}

packages/powersync_attachments_stream/lib/src/attachment_queue_service.dart

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import 'dart:async';
88
import 'dart:typed_data';
99
import 'package:logging/logging.dart';
10-
import './storage/io_local_storage.dart';
1110
import 'package:powersync_core/powersync_core.dart';
1211
import 'package:sqlite_async/mutex.dart';
1312
import 'attachment.dart';
@@ -75,7 +74,6 @@ class WatchedAttachmentItem {
7574
class AttachmentQueue {
7675
final PowerSyncDatabase db;
7776
final AbstractRemoteStorageAdapter remoteStorage;
78-
final String attachmentsDirectory;
7977
final Stream<List<WatchedAttachmentItem>> Function() watchAttachments;
8078
final AbstractLocalStorageAdapter localStorage;
8179
final String attachmentsQueueTableName;
@@ -90,25 +88,23 @@ class AttachmentQueue {
9088

9189
final Mutex _mutex = Mutex();
9290
bool _closed = false;
93-
StreamSubscription? _syncStatusSubscription;
91+
StreamSubscription<List<WatchedAttachmentItem>>? _syncStatusSubscription;
9492
late final AbstractAttachmentService attachmentsService;
9593
late final SyncingService syncingService;
9694

9795
AttachmentQueue({
9896
required this.db,
9997
required this.remoteStorage,
100-
required this.attachmentsDirectory,
10198
required this.watchAttachments,
102-
IOLocalStorage? localStorage,
99+
required this.localStorage,
103100
this.attachmentsQueueTableName = defaultTableName,
104101
this.errorHandler,
105102
this.syncInterval = const Duration(seconds: 30),
106103
this.archivedCacheLimit = 100,
107104
this.syncThrottleDuration = const Duration(seconds: 1),
108105
this.downloadAttachments = true,
109106
Logger? logger,
110-
}) : localStorage = localStorage ?? IOLocalStorage(),
111-
logger = logger ?? Logger('AttachmentQueue') {
107+
}) : logger = logger ?? Logger('AttachmentQueue') {
112108
attachmentsService = AttachmentServiceImpl(
113109
db: db,
114110
logger: logger ?? Logger('AttachmentQueue'),
@@ -118,9 +114,8 @@ class AttachmentQueue {
118114

119115
syncingService = SyncingService(
120116
remoteStorage: remoteStorage,
121-
localStorage: this.localStorage,
117+
localStorage: localStorage,
122118
attachmentsService: attachmentsService,
123-
getLocalUri: (filename) async => getLocalUri(filename),
124119
errorHandler: errorHandler,
125120
syncThrottle: syncThrottleDuration,
126121
period: syncInterval,
@@ -139,7 +134,7 @@ class AttachmentQueue {
139134

140135
await _stopSyncingInternal();
141136

142-
await localStorage.makeDir(attachmentsDirectory);
137+
await localStorage.initialize();
143138

144139
await attachmentsService.withContext((context) async {
145140
await _verifyAttachments(context);
@@ -256,7 +251,7 @@ class AttachmentQueue {
256251
}
257252

258253
// Archive any items not specified in the watched items.
259-
// For QUEUED_DELETE or QUEUED_UPLOAD states, archive only if hasSynced is true.
254+
// For queuedDelete or queuedUpload states, archive only if hasSynced is true.
260255
// For other states, archive if the record is not found in the items.
261256
for (final attachment in currentAttachments) {
262257
final notInWatchedItems = items.every(
@@ -301,10 +296,9 @@ class AttachmentQueue {
301296
id,
302297
fileExtension,
303298
);
304-
final String localUri = getLocalUri(filename);
305299

306300
// Write the file to the filesystem.
307-
final fileSize = await localStorage.saveFile(localUri, data);
301+
final fileSize = await localStorage.saveFile(filename, data);
308302

309303
return await attachmentsService.withContext((attachmentContext) async {
310304
return await db.writeTransaction((tx) async {
@@ -314,7 +308,7 @@ class AttachmentQueue {
314308
size: fileSize,
315309
mediaType: mediaType,
316310
state: AttachmentState.queuedUpload,
317-
localUri: localUri,
311+
localUri: filename,
318312
metaData: metaData,
319313
);
320314

@@ -354,11 +348,6 @@ class AttachmentQueue {
354348
});
355349
}
356350

357-
/// Returns the user's storage directory with the attachment path used to load the file.
358-
String getLocalUri(String filename) {
359-
return '$attachmentsDirectory/$filename';
360-
}
361-
362351
/// Removes all archived items.
363352
Future<void> expireCache() async {
364353
await attachmentsService.withContext((context) async {
@@ -374,7 +363,7 @@ class AttachmentQueue {
374363
await attachmentsService.withContext((context) async {
375364
await context.clearQueue();
376365
});
377-
await localStorage.rmDir(attachmentsDirectory);
366+
await localStorage.clear();
378367
}
379368

380369
/// Cleans up stale attachments.

packages/powersync_attachments_stream/lib/src/implementations/attachment_context.dart

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,9 @@ class AttachmentContextImpl implements AbstractAttachmentContext {
124124
// Delete the archived attachments from the table
125125
final ids = archivedAttachments.map((a) => a.id).toList();
126126
if (ids.isNotEmpty) {
127-
final placeholders = List.filled(ids.length, '?').join(',');
128-
await db.execute('DELETE FROM $table WHERE id IN ($placeholders)', ids);
127+
await db.executeBatch('DELETE FROM $table WHERE id = ?', [
128+
for (final id in ids) [id],
129+
]);
129130
}
130131

131132
log.info('Deleted ${archivedAttachments.length} archived attachments.');

packages/powersync_attachments_stream/lib/src/implementations/attachment_service.dart

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import 'dart:async';
22
import 'package:logging/logging.dart';
33
import 'package:powersync_core/powersync_core.dart';
4+
import 'package:sqlite_async/sqlite_async.dart';
45

56
import '../abstractions/attachment_service.dart';
67
import '../abstractions/attachment_context.dart';
@@ -12,7 +13,7 @@ class AttachmentServiceImpl implements AbstractAttachmentService {
1213
final Logger logger;
1314
final int maxArchivedCount;
1415
final String attachmentsQueueTableName;
15-
Future<void> _mutex = Future.value();
16+
final Mutex _mutex = Mutex();
1617

1718
late final AbstractAttachmentContext _context;
1819

@@ -22,13 +23,18 @@ class AttachmentServiceImpl implements AbstractAttachmentService {
2223
required this.maxArchivedCount,
2324
required this.attachmentsQueueTableName,
2425
}) {
25-
_context = AttachmentContextImpl(db, logger, maxArchivedCount, attachmentsQueueTableName);
26+
_context = AttachmentContextImpl(
27+
db,
28+
logger,
29+
maxArchivedCount,
30+
attachmentsQueueTableName,
31+
);
2632
}
2733

2834
@override
2935
Stream<void> watchActiveAttachments() async* {
3036
logger.info('Watching attachments...');
31-
37+
3238
// Watch for attachments with active states (queued for upload, download, or delete)
3339
final stream = db.watch(
3440
'''
@@ -54,10 +60,16 @@ class AttachmentServiceImpl implements AbstractAttachmentService {
5460
}
5561

5662
@override
57-
Future<T> withContext<T>(Future<T> Function(AbstractAttachmentContext ctx) action) {
58-
// Simple mutex using chained futures
59-
final completer = Completer<T>();
60-
_mutex = _mutex.then((_) => action(_context)).then(completer.complete).catchError(completer.completeError);
61-
return completer.future;
63+
Future<T> withContext<T>(
64+
Future<T> Function(AbstractAttachmentContext ctx) action,
65+
) async {
66+
return await _mutex.lock(() async {
67+
try {
68+
return await action(_context);
69+
} catch (e, stackTrace) {
70+
// Re-throw the error to be handled by the caller
71+
Error.throwWithStackTrace(e, stackTrace);
72+
}
73+
});
6274
}
63-
}
75+
}

packages/powersync_attachments_stream/lib/src/storage/io_local_storage.dart

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ import '../abstractions/local_storage.dart';
2323
///
2424
/// Handles file and directory operations for attachments.
2525
class IOLocalStorage implements AbstractLocalStorageAdapter {
26-
final Directory baseDir;
26+
final String attachmentsDirectory;
27+
late final Directory baseDir;
2728

28-
IOLocalStorage([Directory? baseDir])
29-
: baseDir = baseDir ?? Directory.systemTemp;
29+
IOLocalStorage(this.attachmentsDirectory) {
30+
baseDir = Directory(attachmentsDirectory);
31+
}
3032

3133
File _fileFor(String filePath) => File(p.join(baseDir.path, filePath));
3234
File _metaFileFor(String filePath) =>
@@ -88,40 +90,15 @@ class IOLocalStorage implements AbstractLocalStorageAdapter {
8890

8991
/// Creates a directory and all necessary parent directories dynamically if they do not exist.
9092
@override
91-
Future<void> makeDir(String path) async {
92-
await Directory(p.join(baseDir.path, path)).create(recursive: true);
93+
Future<void> initialize() async {
94+
await baseDir.create(recursive: true);
9395
}
9496

95-
/// Recursively removes a directory and all its contents.
9697
@override
97-
Future<void> rmDir(String path) async {
98-
final dir = Directory(p.join(baseDir.path, path));
99-
if (await dir.exists()) {
100-
await for (final entity in dir.list(recursive: false)) {
101-
if (entity is Directory) {
102-
await rmDir(p.relative(entity.path, from: baseDir.path));
103-
} else if (entity is File) {
104-
await entity.delete();
105-
}
106-
}
107-
await dir.delete();
108-
}
109-
}
110-
111-
/// Copies a file and its metadata to a new location.
112-
@override
113-
Future<void> copyFile(String sourcePath, String targetPath) async {
114-
final sourceFile = _fileFor(sourcePath);
115-
final targetFile = _fileFor(targetPath);
116-
if (!await sourceFile.exists()) {
117-
throw FileSystemException('Source file does not exist', sourcePath);
118-
}
119-
await targetFile.parent.create(recursive: true);
120-
await sourceFile.copy(targetFile.path);
121-
final sourceMeta = _metaFileFor(sourcePath);
122-
final targetMeta = _metaFileFor(targetPath);
123-
if (await sourceMeta.exists()) {
124-
await sourceMeta.copy(targetMeta.path);
98+
Future<void> clear() async {
99+
if (await baseDir.exists()) {
100+
await baseDir.delete(recursive: true);
125101
}
102+
await baseDir.create(recursive: true);
126103
}
127104
}

0 commit comments

Comments
 (0)