Skip to content

Commit af99d66

Browse files
Refactor attachment queue initialization and implemented default base path for local storage in the powersync_attachments_stream package. Simplified the attachment watching logic. Updated README for clarity on schema and attachment states.
1 parent e51836a commit af99d66

File tree

4 files changed

+103
-99
lines changed

4 files changed

+103
-99
lines changed

demos/supabase-todolist/lib/attachments/queue.dart

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,27 @@ final log = Logger('AttachmentQueue');
1414
Future<void> initializeAttachmentQueue(PowerSyncDatabase db) async {
1515
// Use the app's document directory for local storage
1616
final Directory appDocDir = await getApplicationDocumentsDirectory();
17-
final localStorage = IOLocalStorage(appDocDir);
1817

1918
attachmentQueue = AttachmentQueue(
2019
db: db,
2120
remoteStorage: remoteStorage,
2221
attachmentsDirectory: '${appDocDir.path}/attachments',
2322
watchAttachments: () => db.watch('''
2423
SELECT photo_id as id FROM todos WHERE photo_id IS NOT NULL
25-
''').map((results) {
26-
final items = results.map((row) => WatchedAttachmentItem(id: row['id'] as String, fileExtension: 'jpg')).toList();
27-
log.info('Watched attachment IDs: ${items.map((e) => e.id).toList()}');
28-
return items;
29-
}),
30-
localStorage: localStorage,
31-
errorHandler: null,
24+
''').map((results) => results
25+
.map((row) => WatchedAttachmentItem(
26+
id: row['id'] as String,
27+
fileExtension: 'jpg',
28+
))
29+
.toList()),
3230
);
3331

3432
await attachmentQueue.startSync();
3533
}
3634

37-
Future<Attachment> savePhotoAttachment(Stream<Uint8List> photoData, String todoId,
35+
Future<Attachment> savePhotoAttachment(
36+
Stream<Uint8List> photoData, String todoId,
3837
{String mediaType = 'image/jpeg'}) async {
39-
4038
// Save the file using the AttachmentQueue API
4139
return await attachmentQueue.saveFile(
4240
data: photoData,

packages/powersync_attachments_stream/README.md

Lines changed: 85 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -29,46 +29,39 @@ In the example below, the user captures photos when checklist items are complete
2929
1. First, define your schema including the `checklist` table and the local-only attachments table:
3030

3131
```dart
32-
final checklists = Table(
33-
name: 'checklists',
34-
columns: [
32+
Schema schema = Schema(([
33+
const Table('checklists', [
3534
Column.text('description'),
3635
Column.integer('completed'),
3736
Column.text('photo_id'),
38-
],
39-
);
40-
41-
final schema = Schema([
42-
UserRow.table,
43-
// Add the local-only table which stores attachment states
44-
// Learn more about this function below
45-
createAttachmentsTable('attachments'),
46-
]);
37+
]),
38+
AttachmentsQueueTable(
39+
attachmentsQueueTableName: defaultAttachmentsQueueTableName)
40+
]));
4741
```
4842

4943
2. Create an `AttachmentQueue` instance. This class provides default syncing utilities and implements a default sync strategy. This class is open and can be overridden for custom functionality:
5044

5145
```dart
46+
final Directory appDocDir = await getApplicationDocumentsDirectory();
47+
5248
final queue = AttachmentQueue(
53-
db: db,
54-
attachmentsDirectory: attachmentsDirectory,
55-
remoteStorage: SupabaseRemoteStorage(supabase),
56-
watchAttachments: () => db.watch(
57-
'''
58-
SELECT photo_id
59-
FROM checklists
60-
WHERE photo_id IS NOT NULL
61-
''',
62-
mapper: (row) => WatchedAttachmentItem(
63-
id: row['photo_id'] as String,
64-
fileExtension: 'jpg',
65-
),
66-
),
67-
);
49+
db: db,
50+
remoteStorage: remoteStorage,
51+
attachmentsDirectory: '${appDocDir.path}/attachments',
52+
watchAttachments: () => db.watch('''
53+
SELECT photo_id as id FROM todos WHERE photo_id IS NOT NULL
54+
''').map((results) => results
55+
.map((row) => WatchedAttachmentItem(
56+
id: row['id'] as String,
57+
fileExtension: 'jpg',
58+
))
59+
.toList()),
60+
);
6861
```
6962

7063
* The `attachmentsDirectory` specifies where local attachment files should be stored. This directory needs to be provided to the constructor. In Flutter, `path_provider`'s `getApplicationDocumentsDirectory()` with a subdirectory like `/attachments` is a good choice.
71-
* The `remoteStorage` is responsible for connecting to the attachments backend. See the `RemoteStorageAdapter` interface definition [here](https://github.com/powersync-ja/powersync-dart/blob/main/core/lib/src/attachments/remote_storage_adapter.dart).
64+
* The `remoteStorage` is responsible for connecting to the attachments backend. See the `RemoteStorageAdapter` interface definition [here](https://github.com/powersync-ja/powersync.dart/blob/main/packages/powersync_attachments_stream/lib/src/abstractions/remote_storage.dart).
7265
* `watchAttachments` is a `Stream` of `WatchedAttachmentItem`. The `WatchedAttachmentItem`s represent the attachments which should be present in the application. We recommend using `PowerSync`'s `watch` query as shown above. In this example, we provide the `fileExtension` for all photos. This information could also be obtained from the query if necessary.
7366

7467
3. Implement a `RemoteStorageAdapter` which interfaces with a remote storage provider. This will be used for downloading, uploading, and deleting attachments:
@@ -78,13 +71,13 @@ final remote = _RemoteStorageAdapter();
7871
7972
class _RemoteStorageAdapter implements RemoteStorageAdapter {
8073
@override
81-
Stream<List<int>> downloadFile(Attachment attachment) async* {
82-
// TODO: Implement download from your backend
74+
Future<void> uploadFile(Stream<List<int>> fileData, Attachment attachment) async {
75+
// TODO: Implement upload to your backend
8376
}
8477
8578
@override
86-
Future<void> uploadFile(Stream<List<int>> fileData, Attachment attachment) async {
87-
// TODO: Implement upload to your backend
79+
Future<Stream<List<int>>> downloadFile(Attachment attachment) async {
80+
// TODO: Implement download from your backend
8881
}
8982
9083
@override
@@ -104,68 +97,76 @@ await queue.startSync();
10497

10598
```dart
10699
await queue.saveFile(
107-
data: Stream.value([0]), // Your attachment data
108-
mediaType: 'image/jpg',
109-
fileExtension: 'jpg',
110-
onSave: (tx, attachment) async {
111-
/**
112-
* This callback is invoked in the same transaction which creates the attachment record.
113-
* Assignments of the newly created photo_id should be done in the same transaction for maximum efficiency.
114-
*/
115-
await tx.execute(
116-
'''
117-
UPDATE checklists
118-
SET photo_id = ?
119-
WHERE id = ?
120-
''',
121-
[attachment.id, checklistId],
122-
);
123-
},
124-
);
100+
data: photoData,
101+
mediaType: 'image/jpg',
102+
fileExtension: 'jpg',
103+
metaData: 'Test meta data',
104+
updateHook: (context, attachment) async {
105+
// Update the todo item to reference this attachment
106+
await context.execute(
107+
'UPDATE checklists SET photo_id = ? WHERE id = ?',
108+
[attachment.id, checklistId],
109+
);
110+
},
111+
);
125112
```
126113

127114
## Implementation Details
128115

129116
### Attachment Table Structure
130117

131-
The `createAttachmentsTable` function creates a local-only table for tracking attachment states.
118+
The `AttachmentsQueueTable` class creates a **local-only table** for tracking the states and metadata of file attachments. It allows customization of the table name, additional columns, indexes, and optionally a view name.
132119

133120
An attachments table definition can be created with the following options:
134121

135-
| Option | Description | Default |
136-
|--------|-----------------------|---------------|
137-
| `name` | The name of the table | `attachments` |
122+
| Option | Description | Default |
123+
| ---------------------- | -------------------------------| ----------------------------|
124+
| `attachmentsQueueTableName` | The name of the table | `defaultAttachmentsQueueTableName` |
125+
| `additionalColumns` | Extra columns to add to the table | `[]` (empty list) |
126+
| `indexes` | Indexes to optimize queries | `[]` (empty list) |
127+
| `viewName` | Optional associated view name | `null` |
128+
129+
The default columns included in the table are:
138130

139-
The default columns are:
131+
| Column Name | Type | Description |
132+
| ------------ | --------- | -------------------------------------------------------------------------------- |
133+
| `filename` | `TEXT` | The filename of the attachment |
134+
| `local_uri` | `TEXT` | Local file URI or path |
135+
| `timestamp` | `INTEGER` | The timestamp of the last update to the attachment |
136+
| `size` | `INTEGER` | File size in bytes |
137+
| `media_type` | `TEXT` | The media (MIME) type of the attachment |
138+
| `state` | `INTEGER` | Current state of the attachment (e.g., queued, syncing, synced) |
139+
| `has_synced` | `INTEGER` | Internal flag indicating if the attachment has ever been synced (for caching) |
140+
| `meta_data` | `TEXT` | Additional metadata stored as JSON |
140141

141-
| Column Name | Type | Description |
142-
|--------------|-----------|--------------------------------------------------------------------------------------------------------------------|
143-
| `id` | `TEXT` | Unique identifier for the attachment |
144-
| `filename` | `TEXT` | The filename of the attachment |
145-
| `media_type` | `TEXT` | The media type of the attachment |
146-
| `state` | `INTEGER` | Current state of the attachment (see `AttachmentState` enum) |
147-
| `timestamp` | `INTEGER` | The timestamp of last update to the attachment |
148-
| `size` | `INTEGER` | File size in bytes |
149-
| `has_synced` | `INTEGER` | Internal flag tracking if the attachment has ever been synced (used for caching) |
150-
| `meta_data` | `TEXT` | Additional metadata in JSON format |
142+
The class extends a base `Table` class using a `localOnly` constructor, so this table exists **only locally** on the device and is not synchronized with a remote database.
143+
144+
This design allows flexible tracking and management of attachment syncing state and metadata within the local database. |
151145

152146
### Attachment States
153147

154-
Attachments are managed through the following states:
148+
Attachments are managed through the following states, which represent their current synchronization status with remote storage:
149+
150+
| State | Description |
151+
| ----------------- | ---------------------------------------------------------------------- |
152+
| `queuedUpload` | Attachment is queued for upload to remote/cloud storage |
153+
| `queuedDelete` | Attachment is queued for deletion from both remote and local storage |
154+
| `queuedDownload` | Attachment is queued for download from remote/cloud storage |
155+
| `synced` | Attachment is fully synchronized with remote storage |
156+
| `archived` | Attachment is archived — no longer actively synchronized or referenced |
157+
158+
---
159+
160+
The `AttachmentState` enum also provides helper methods for converting between the enum and its integer representation:
155161

156-
| State | Description |
157-
|-------------------|-------------------------------------------------------------------------------|
158-
| `QUEUED_UPLOAD` | Attachment is queued for upload to cloud storage |
159-
| `QUEUED_DELETE` | Attachment is queued for deletion from cloud storage and local storage |
160-
| `QUEUED_DOWNLOAD` | Attachment is queued for download from cloud storage |
161-
| `SYNCED` | Attachment is fully synced |
162-
| `ARCHIVED` | Attachment is orphaned - i.e., no longer referenced by any data |
162+
- `AttachmentState.fromInt(int value)` — Constructs an `AttachmentState` from its corresponding integer index. Throws an `ArgumentError` if the value is out of range.
163+
- `toInt()` — Returns the integer index of the current `AttachmentState` instance.
163164

164165
### Sync Process
165166

166167
The `AttachmentQueue` implements a sync process with these components:
167168

168-
1. **State Monitoring**: The queue watches the attachments table for records in `QUEUED_UPLOAD`, `QUEUED_DELETE`, and `QUEUED_DOWNLOAD` states. An event loop triggers calls to the remote storage for these operations.
169+
1. **State Monitoring**: The queue watches the attachments table for records in `queuedUpload`, `queuedDelete`, and `queuedDownload` states. An event loop triggers calls to the remote storage for these operations.
169170

170171
2. **Periodic Sync**: By default, the queue triggers a sync every 30 seconds to retry failed uploads/downloads, in particular after the app was offline. This interval can be configured by setting `syncInterval` in the `AttachmentQueue` constructor options, or disabled by setting the interval to `0`.
171172

@@ -178,27 +179,27 @@ The `AttachmentQueue` implements a sync process with these components:
178179
The `saveFile` method handles attachment creation and upload:
179180

180181
1. The attachment is saved to local storage
181-
2. An `AttachmentRecord` is created with `QUEUED_UPLOAD` state, linked to the local file using `localUri`
182+
2. An `AttachmentRecord` is created with `queuedUpload` state, linked to the local file using `localUri`
182183
3. The attachment must be assigned to relational data in the same transaction, since this data is constantly watched and should always represent the attachment queue state
183184
4. The `RemoteStorageAdapter` `uploadFile` function is called
184-
5. On successful upload, the state changes to `SYNCED`
185-
6. If upload fails, the record stays in `QUEUED_UPLOAD` state for retry
185+
5. On successful upload, the state changes to `synced`
186+
6. If upload fails, the record stays in `queuedUpload` state for retry
186187

187188
### Download Process
188189

189190
Attachments are scheduled for download when the stream from `watchAttachments` emits a new item that is not present locally:
190191

191-
1. An `AttachmentRecord` is created with `QUEUED_DOWNLOAD` state
192+
1. An `AttachmentRecord` is created with `queuedDownload` state
192193
2. The `RemoteStorageAdapter` `downloadFile` function is called
193194
3. The received data is saved to local storage
194-
4. On successful download, the state changes to `SYNCED`
195+
4. On successful download, the state changes to `synced`
195196
5. If download fails, the operation is retried in the next sync cycle
196197

197198
### Delete Process
198199

199200
The `deleteFile` method deletes attachments from both local and remote storage:
200201

201-
1. The attachment record moves to `QUEUED_DELETE` state
202+
1. The attachment record moves to `queuedDelete` state
202203
2. The attachment must be unassigned from relational data in the same transaction, since this data is constantly watched and should always represent the attachment queue state
203204
3. On successful deletion, the record is removed
204205
4. If deletion fails, the operation is retried in the next sync cycle
@@ -207,7 +208,7 @@ The `deleteFile` method deletes attachments from both local and remote storage:
207208

208209
The `AttachmentQueue` implements a caching system for archived attachments:
209210

210-
1. Local attachments are marked as `ARCHIVED` if the stream from `watchAttachments` no longer references them
211+
1. Local attachments are marked as `archived` if the stream from `watchAttachments` no longer references them
211212
2. Archived attachments are kept in the cache for potential future restoration
212213
3. The cache size is controlled by the `archivedCacheLimit` parameter in the `AttachmentQueue` constructor
213214
4. By default, the queue keeps the last 100 archived attachment records
@@ -234,19 +235,19 @@ final errorHandler = _SyncErrorHandler();
234235
235236
class _SyncErrorHandler implements SyncErrorHandler {
236237
@override
237-
Future<bool> onDownloadError(Attachment attachment, Exception exception) async {
238+
Future<bool> onDownloadError(Attachment attachment, Object exception) async {
238239
// TODO: Return if the attachment sync should be retried
239240
return false;
240241
}
241242
242243
@override
243-
Future<bool> onUploadError(Attachment attachment, Exception exception) async {
244+
Future<bool> onUploadError(Attachment attachment, Object exception) async {
244245
// TODO: Return if the attachment sync should be retried
245246
return false;
246247
}
247248
248249
@override
249-
Future<bool> onDeleteError(Attachment attachment, Exception exception) async {
250+
Future<bool> onDeleteError(Attachment attachment, Object exception) async {
250251
// TODO: Return if the attachment sync should be retried
251252
return false;
252253
}

packages/powersync_attachments_stream/lib/src/attachment_queue_service.dart

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import 'dart:async';
88
import 'dart:typed_data';
99
import 'package:logging/logging.dart';
10+
import './storage/io_local_storage.dart';
1011
import 'package:powersync_core/powersync_core.dart';
1112
import 'package:sqlite_async/mutex.dart';
1213
import 'attachment.dart';
@@ -29,10 +30,13 @@ import 'sync/syncing_service.dart';
2930
class WatchedAttachmentItem {
3031
/// Id for the attachment record.
3132
final String id;
33+
3234
/// File extension used to determine an internal filename for storage if no [filename] is provided.
3335
final String? fileExtension;
36+
3437
/// Filename to store the attachment with.
3538
final String? filename;
39+
3640
/// Optional metadata for the attachment record.
3741
final String? metaData;
3842

@@ -95,15 +99,16 @@ class AttachmentQueue {
9599
required this.remoteStorage,
96100
required this.attachmentsDirectory,
97101
required this.watchAttachments,
98-
required this.localStorage,
102+
IOLocalStorage? localStorage,
99103
this.attachmentsQueueTableName = defaultTableName,
100104
this.errorHandler,
101105
this.syncInterval = const Duration(seconds: 30),
102106
this.archivedCacheLimit = 100,
103107
this.syncThrottleDuration = const Duration(seconds: 1),
104108
this.downloadAttachments = true,
105109
Logger? logger,
106-
}) : logger = logger ?? Logger('AttachmentQueue') {
110+
}) : localStorage = localStorage ?? IOLocalStorage(),
111+
logger = logger ?? Logger('AttachmentQueue') {
107112
attachmentsService = AttachmentServiceImpl(
108113
db: db,
109114
logger: logger ?? Logger('AttachmentQueue'),
@@ -113,7 +118,7 @@ class AttachmentQueue {
113118

114119
syncingService = SyncingService(
115120
remoteStorage: remoteStorage,
116-
localStorage: localStorage,
121+
localStorage: this.localStorage,
117122
attachmentsService: attachmentsService,
118123
getLocalUri: (filename) async => getLocalUri(filename),
119124
errorHandler: errorHandler,
@@ -229,7 +234,6 @@ class AttachmentQueue {
229234
// hasSynced: existingQueueItem.hasSynced;
230235
),
231236
);
232-
233237
} else if (existingQueueItem.state == AttachmentState.archived) {
234238
// The attachment is present again. Need to queue it for sync.
235239
if (existingQueueItem.hasSynced) {

packages/powersync_attachments_stream/lib/src/storage/io_local_storage.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@ import '../abstractions/local_storage.dart';
2525
class IOLocalStorage implements AbstractLocalStorageAdapter {
2626
final Directory baseDir;
2727

28-
IOLocalStorage(this.baseDir);
28+
IOLocalStorage([Directory? baseDir])
29+
: baseDir = baseDir ?? Directory.systemTemp;
2930

3031
File _fileFor(String filePath) => File(p.join(baseDir.path, filePath));
3132
File _metaFileFor(String filePath) =>

0 commit comments

Comments
 (0)