Skip to content

Commit 0c63d69

Browse files
committed
Update TTL behavior
1 parent 0e2b14d commit 0c63d69

File tree

5 files changed

+115
-31
lines changed

5 files changed

+115
-31
lines changed

crates/core/src/sync/interface.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use crate::constants::SUBTYPE_JSON;
77
use crate::error::PowerSyncError;
88
use crate::schema::Schema;
99
use crate::state::DatabaseState;
10-
use crate::sync::subscriptions::apply_subscriptions;
10+
use crate::sync::subscriptions::{StreamKey, apply_subscriptions};
1111
use alloc::borrow::Cow;
1212
use alloc::boxed::Box;
1313
use alloc::rc::Rc;
@@ -30,8 +30,15 @@ pub struct StartSyncStream {
3030
pub parameters: Option<serde_json::Map<String, serde_json::Value>>,
3131
#[serde(default)]
3232
pub schema: Schema,
33+
34+
/// Whether to request default streams in the generated sync request.
3335
#[serde(default = "StartSyncStream::include_defaults_by_default")]
3436
pub include_defaults: bool,
37+
/// Streams that are currently active in the app.
38+
///
39+
/// We will increase the expiry date for those streams at the time we connect and disconnect.
40+
#[serde(default)]
41+
pub active_streams: Vec<StreamKey>,
3542
}
3643

3744
impl StartSyncStream {
@@ -46,6 +53,7 @@ impl Default for StartSyncStream {
4653
parameters: Default::default(),
4754
schema: Default::default(),
4855
include_defaults: Self::include_defaults_by_default(),
56+
active_streams: Default::default(),
4957
}
5058
}
5159
}

crates/core/src/sync/storage_adapter.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::{
1414
checkpoint::{ChecksumMismatch, validate_checkpoint},
1515
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
1616
streaming_sync::OwnedStreamDescription,
17-
subscriptions::LocallyTrackedSubscription,
17+
subscriptions::{LocallyTrackedSubscription, StreamKey},
1818
sync_status::SyncPriorityStatus,
1919
},
2020
sync_local::{PartialSyncOperation, SyncOperation},
@@ -357,6 +357,25 @@ impl StorageAdapter {
357357
Ok(())
358358
}
359359

360+
/// Increases the TTL for explicit subscriptions that are currently marked as active.
361+
pub fn increase_ttl(&self, streams: &[StreamKey]) -> Result<(), PowerSyncError> {
362+
let stmt = self.db.prepare_v2(
363+
"UPDATE ps_stream_subscriptions SET expires_at = unixepoch() + ttl WHERE stream_name = ? AND local_params = ? AND ttl IS NOT NULL",
364+
)?;
365+
366+
for stream in streams {
367+
stmt.bind_text(1, &stream.name, sqlite_nostd::Destructor::STATIC)?;
368+
stmt.bind_text(
369+
2,
370+
&stream.serialized_params(),
371+
sqlite_nostd::Destructor::STATIC,
372+
)?;
373+
stmt.exec()?;
374+
}
375+
376+
Ok(())
377+
}
378+
360379
pub fn iterate_local_subscriptions<F: FnMut(LocallyTrackedSubscription) -> ()>(
361380
&self,
362381
mut action: F,

crates/core/src/sync/streaming_sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,7 @@ impl StreamingSyncIteration {
797797

798798
let requests = self.adapter.collect_bucket_requests()?;
799799
let local_bucket_names: Vec<String> = requests.iter().map(|s| s.name.clone()).collect();
800+
self.adapter.increase_ttl(&self.options.active_streams)?;
800801
let (streams, index_to_id) = self
801802
.adapter
802803
.collect_subscription_requests(self.options.include_defaults)?;

crates/core/src/sync/subscriptions.rs

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -39,30 +39,45 @@ impl LocallyTrackedSubscription {
3939
pub enum SubscriptionChangeRequest {
4040
#[serde(rename = "subscribe")]
4141
Subscribe(SubscribeToStream),
42+
43+
/// Explicitly unsubscribes from a stream. This corresponds to the `unsubscribeAll()` API in the
44+
/// SDKs.
45+
///
46+
/// Unsubscribing a single stream subscription happens internally in the SDK by reducing its
47+
/// refcount. Once no references are remaining, it's no longer listed in
48+
/// [StartSyncStream.active_streams] which will cause it to get unsubscribed after its TTL.
4249
#[serde(rename = "unsubscribe")]
43-
Unsubscribe(UnsubscribeFromStream),
50+
Unsubscribe(StreamKey),
4451
}
4552

46-
#[serde_as]
53+
/// A key uniquely identifying a stream.
4754
#[derive(Deserialize)]
48-
pub struct SubscribeToStream {
49-
pub stream: String,
55+
pub struct StreamKey {
56+
pub name: String,
5057
#[serde(default)]
5158
pub params: Option<Box<serde_json::value::RawValue>>,
59+
}
60+
61+
impl StreamKey {
62+
pub fn serialized_params(&self) -> &str {
63+
match &self.params {
64+
Some(params) => params.get(),
65+
None => "null",
66+
}
67+
}
68+
}
69+
70+
#[serde_as]
71+
#[derive(Deserialize)]
72+
pub struct SubscribeToStream {
73+
pub stream: StreamKey,
5274
#[serde_as(as = "Option<DurationSeconds>")]
5375
#[serde(default)]
5476
pub ttl: Option<Duration>,
5577
#[serde(default)]
5678
pub priority: Option<BucketPriority>,
5779
}
5880

59-
#[derive(Deserialize)]
60-
pub struct UnsubscribeFromStream {
61-
pub stream: String,
62-
#[serde(default)]
63-
pub params: Option<Box<serde_json::value::RawValue>>,
64-
}
65-
6681
pub fn apply_subscriptions(
6782
db: *mut sqlite::sqlite3,
6883
subscription: SubscriptionChangeRequest,
@@ -83,17 +98,14 @@ INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params,
8398
)
8499
.into_db_result(db)?;
85100

86-
stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?;
101+
stmt.bind_text(1, &subscription.stream.name, sqlite::Destructor::STATIC)?;
87102
match &subscription.priority {
88103
Some(priority) => stmt.bind_int(2, priority.number),
89104
None => stmt.bind_null(2),
90105
}?;
91106
stmt.bind_text(
92107
3,
93-
match &subscription.params {
94-
Some(params) => params.get(),
95-
None => "null",
96-
},
108+
subscription.stream.serialized_params(),
97109
sqlite::Destructor::STATIC,
98110
)?;
99111
stmt.bind_int64(
@@ -109,13 +121,10 @@ INSERT INTO ps_stream_subscriptions (stream_name, local_priority, local_params,
109121
let stmt = db
110122
.prepare_v2("UPDATE ps_stream_subscriptions SET ttl = NULL WHERE stream_name = ? AND local_params = ?")
111123
.into_db_result(db)?;
112-
stmt.bind_text(1, &subscription.stream, sqlite::Destructor::STATIC)?;
124+
stmt.bind_text(1, &subscription.name, sqlite::Destructor::STATIC)?;
113125
stmt.bind_text(
114126
2,
115-
match &subscription.params {
116-
Some(params) => params.get(),
117-
None => "null",
118-
},
127+
subscription.serialized_params(),
119128
sqlite::Destructor::STATIC,
120129
)?;
121130
stmt.exec()?;

dart/test/sync_stream_test.dart

Lines changed: 55 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,9 @@ void main() {
180180
control(
181181
'subscriptions',
182182
json.encode({
183-
'subscribe': {'stream': 'a'},
183+
'subscribe': {
184+
'stream': {'name': 'a'}
185+
},
184186
}),
185187
);
186188

@@ -278,7 +280,7 @@ void main() {
278280
'subscriptions',
279281
json.encode({
280282
'unsubscribe': {
281-
'stream': 'my_stream',
283+
'name': 'my_stream',
282284
'params': null,
283285
}
284286
}),
@@ -304,12 +306,12 @@ void main() {
304306
);
305307
});
306308

307-
syncTest('ttl', (controller) {
309+
syncTest('delete after ttl', (controller) {
308310
control(
309311
'subscriptions',
310312
json.encode({
311313
'subscribe': {
312-
'stream': 'my_stream',
314+
'stream': {'name': 'my_stream'},
313315
'ttl': 3600,
314316
}
315317
}),
@@ -369,11 +371,52 @@ void main() {
369371
);
370372
});
371373

374+
syncTest('increase ttl', (controller) {
375+
control(
376+
'subscriptions',
377+
json.encode({
378+
'subscribe': {
379+
'stream': {'name': 'my_stream'},
380+
'ttl': 3600,
381+
}
382+
}),
383+
);
384+
385+
var [row] = db.select('SELECT * FROM ps_stream_subscriptions');
386+
expect(row, containsPair('expires_at', 1740826800));
387+
388+
controller.elapse(const Duration(minutes: 30));
389+
390+
// Mark the stream as active in the connect procedure
391+
control(
392+
'start',
393+
json.encode({
394+
'active_streams': [
395+
{'name': 'my_stream'}
396+
]
397+
}),
398+
);
399+
400+
// Which should increase its expiry date.
401+
[row] = db.select('SELECT * FROM ps_stream_subscriptions');
402+
expect(row, containsPair('expires_at', 1740828600));
403+
404+
// Stopping should not increase the expiry date. Client SDKs will do that
405+
// periodically.
406+
controller.elapse(const Duration(minutes: 30));
407+
control('stop', null);
408+
409+
[row] = db.select('SELECT * FROM ps_stream_subscriptions');
410+
expect(row, containsPair('expires_at', 1740828600));
411+
});
412+
372413
syncTest('can be made implicit', (_) {
373414
control(
374415
'subscriptions',
375416
json.encode({
376-
'subscribe': {'stream': 'a'}
417+
'subscribe': {
418+
'stream': {'name': 'a'}
419+
}
377420
}));
378421
control('start', null);
379422
control(
@@ -394,7 +437,7 @@ void main() {
394437
control(
395438
'subscriptions',
396439
json.encode({
397-
'unsubscribe': {'stream': 'a'}
440+
'unsubscribe': {'name': 'a'}
398441
}),
399442
);
400443
control('stop', null);
@@ -425,13 +468,17 @@ void main() {
425468
control(
426469
'subscriptions',
427470
json.encode({
428-
'subscribe': {'stream': 'a', 'params': 'invalid'}
471+
'subscribe': {
472+
'stream': {'name': 'a', 'params': 'invalid'}
473+
}
429474
}),
430475
);
431476
control(
432477
'subscriptions',
433478
json.encode({
434-
'subscribe': {'stream': 'a', 'params': 'valid'}
479+
'subscribe': {
480+
'stream': {'name': 'a', 'params': 'valid'}
481+
}
435482
}),
436483
);
437484

0 commit comments

Comments
 (0)