Skip to content

Commit 91cd811

Browse files
committed
Implement new protocol format
1 parent 20952d1 commit 91cd811

File tree

7 files changed

+269
-103
lines changed

7 files changed

+269
-103
lines changed

crates/core/src/sync/interface.rs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use alloc::rc::Rc;
1414
use alloc::sync::Arc;
1515
use alloc::{string::String, vec::Vec};
1616
use serde::{Deserialize, Serialize};
17-
use serde_with::{DisplayFromStr, serde_as};
1817
use sqlite::{ResultCode, Value};
1918
use sqlite_nostd::bindings::SQLITE_RESULT_SUBTYPE;
2019
use sqlite_nostd::{self as sqlite, ColumnType};
@@ -138,16 +137,13 @@ pub struct StreamSubscriptionRequest {
138137
pub subscriptions: Vec<RequestedStreamSubscription>,
139138
}
140139

141-
#[serde_as]
142140
#[derive(Serialize)]
143141
pub struct RequestedStreamSubscription {
144142
/// The name of the sync stream to subscribe to.
145143
pub stream: String,
146144
/// Parameters to make available in the stream's definition.
147145
pub parameters: Option<Box<JsonString>>,
148146
pub override_priority: Option<BucketPriority>,
149-
#[serde_as(as = "DisplayFromStr")]
150-
pub client_id: i64,
151147
}
152148

153149
#[derive(Serialize)]

crates/core/src/sync/line.rs

Lines changed: 77 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use alloc::borrow::Cow;
22
use alloc::rc::Rc;
3-
use alloc::string::{String, ToString};
3+
use alloc::string::String;
44
use alloc::vec::Vec;
55
use serde::Deserialize;
66
use serde::de::{Error, IgnoredAny, VariantAccess, Visitor};
@@ -85,13 +85,76 @@ pub struct Checkpoint<'a> {
8585
#[serde(borrow)]
8686
pub buckets: Vec<BucketChecksum<'a>>,
8787
#[serde(default, borrow)]
88-
pub streams: Vec<StreamDefinition<'a>>,
88+
pub streams: Vec<StreamDescription<'a>>,
8989
}
9090

9191
#[derive(Deserialize, Debug)]
92-
pub struct StreamDefinition<'a> {
92+
pub struct StreamDescription<'a> {
9393
pub name: SyncLineStr<'a>,
9494
pub is_default: bool,
95+
pub errors: Rc<Vec<StreamSubscriptionError>>,
96+
}
97+
98+
#[derive(Deserialize, Debug)]
99+
pub struct StreamSubscriptionError {
100+
pub subscription: StreamSubscriptionErrorCause,
101+
pub message: String,
102+
}
103+
104+
/// The concrete stream subscription that has caused an error.
105+
#[derive(Debug)]
106+
pub enum StreamSubscriptionErrorCause {
107+
/// The error is caused by the stream being subscribed to by default (i.e., no parameters).
108+
Default,
109+
/// The error is caused by an explicit subscription (e.g. due to invalid parameters).
110+
///
111+
/// The inner value is the index into [StreamSubscriptionRequest::subscriptions] of the
112+
/// faulty subscription.
113+
ExplicitSubscription(usize),
114+
}
115+
116+
impl<'de> Deserialize<'de> for StreamSubscriptionErrorCause {
117+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
118+
where
119+
D: serde::Deserializer<'de>,
120+
{
121+
struct CauseVisitor;
122+
123+
impl<'de> Visitor<'de> for CauseVisitor {
124+
type Value = StreamSubscriptionErrorCause;
125+
126+
fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
127+
write!(formatter, "default or index")
128+
}
129+
130+
fn visit_str<E>(self, _v: &str) -> Result<Self::Value, E>
131+
where
132+
E: Error,
133+
{
134+
return Ok(StreamSubscriptionErrorCause::Default);
135+
}
136+
137+
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
138+
where
139+
E: Error,
140+
{
141+
return Ok(StreamSubscriptionErrorCause::ExplicitSubscription(
142+
v as usize,
143+
));
144+
}
145+
146+
fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
147+
where
148+
E: Error,
149+
{
150+
return Ok(StreamSubscriptionErrorCause::ExplicitSubscription(
151+
v as usize,
152+
));
153+
}
154+
}
155+
156+
deserializer.deserialize_any(CauseVisitor)
157+
}
95158
}
96159

97160
#[serde_as]
@@ -141,9 +204,13 @@ pub struct BucketChecksum<'a> {
141204
#[derive(Debug)]
142205
pub enum BucketSubscriptionReason {
143206
/// A bucket was created from a default stream.
144-
DerivedFromDefaultStream(String),
207+
///
208+
/// The inner value is the index of the stream in [Checkpoint::streams].
209+
DerivedFromDefaultStream(usize),
145210
/// A bucket was created for a subscription id we've explicitly requested in the sync request.
146-
DerivedFromExplicitSubscription(i64),
211+
///
212+
/// The inner value is the index of the stream in [StreamSubscriptionRequest::subscriptions].
213+
DerivedFromExplicitSubscription(usize),
147214
}
148215

149216
impl<'de> Deserialize<'de> for BucketSubscriptionReason {
@@ -153,7 +220,7 @@ impl<'de> Deserialize<'de> for BucketSubscriptionReason {
153220
{
154221
struct MyVisitor;
155222

156-
const VARIANTS: &'static [&'static str] = &["def", "sub"];
223+
const VARIANTS: &'static [&'static str] = &["default", "sub"];
157224

158225
impl<'de> Visitor<'de> for MyVisitor {
159226
type Value = BucketSubscriptionReason;
@@ -168,17 +235,12 @@ impl<'de> Deserialize<'de> for BucketSubscriptionReason {
168235
{
169236
let (key, variant) = data.variant::<&'de str>()?;
170237
Ok(match key {
171-
"def" => BucketSubscriptionReason::DerivedFromDefaultStream(
238+
"default" => BucketSubscriptionReason::DerivedFromDefaultStream(
239+
variant.newtype_variant()?,
240+
),
241+
"sub" => BucketSubscriptionReason::DerivedFromExplicitSubscription(
172242
variant.newtype_variant()?,
173243
),
174-
"sub" => {
175-
let textual_id = variant.newtype_variant::<&'de str>()?;
176-
let id = textual_id
177-
.parse()
178-
.map_err(|_| A::Error::custom("not an int"))?;
179-
180-
BucketSubscriptionReason::DerivedFromExplicitSubscription(id)
181-
}
182244
other => return Err(A::Error::unknown_variant(other, VARIANTS)),
183245
})
184246
}

crates/core/src/sync/storage_adapter.rs

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,18 @@ use core::{assert_matches::debug_assert_matches, fmt::Display};
22

33
use alloc::{string::ToString, vec::Vec};
44
use serde::Serialize;
5-
use serde_json::value::RawValue;
65
use sqlite_nostd::{self as sqlite, Connection, ManagedStmt, ResultCode};
76

87
use crate::{
98
error::{PSResult, PowerSyncError},
109
ext::SafeManagedStmt,
11-
kv::client_id,
1210
operations::delete_bucket,
1311
schema::Schema,
1412
state::DatabaseState,
1513
sync::{
1614
checkpoint::{ChecksumMismatch, validate_checkpoint},
1715
interface::{RequestedStreamSubscription, StreamSubscriptionRequest},
18-
streaming_sync::OwnedStreamDefinition,
16+
streaming_sync::OwnedStreamDescription,
1917
subscriptions::LocallyTrackedSubscription,
2018
sync_status::SyncPriorityStatus,
2119
},
@@ -289,10 +287,12 @@ impl StorageAdapter {
289287
pub fn collect_subscription_requests(
290288
&self,
291289
include_defaults: bool,
292-
) -> Result<StreamSubscriptionRequest, PowerSyncError> {
290+
) -> Result<(StreamSubscriptionRequest, Vec<i64>), PowerSyncError> {
293291
self.delete_outdated_subscriptions()?;
294292

295293
let mut subscriptions: Vec<RequestedStreamSubscription> = Vec::new();
294+
let mut index_to_local_id = Vec::<i64>::new();
295+
296296
// We have an explicit subscription iff ttl is not null. Checking is_default is not enough,
297297
// because a stream can both be a default stream and have an explicit subscription.
298298
let stmt = self
@@ -306,14 +306,17 @@ impl StorageAdapter {
306306
stream: subscription.stream_name,
307307
parameters: subscription.local_params,
308308
override_priority: subscription.local_priority,
309-
client_id: subscription.id,
310309
});
310+
index_to_local_id.push(subscription.id);
311311
}
312312

313-
Ok(StreamSubscriptionRequest {
314-
include_defaults,
315-
subscriptions,
316-
})
313+
Ok((
314+
StreamSubscriptionRequest {
315+
include_defaults,
316+
subscriptions,
317+
},
318+
index_to_local_id,
319+
))
317320
}
318321

319322
pub fn now(&self) -> Result<Timestamp, ResultCode> {
@@ -370,8 +373,9 @@ impl StorageAdapter {
370373

371374
pub fn create_default_subscription(
372375
&self,
373-
stream: &OwnedStreamDefinition,
376+
stream: &OwnedStreamDescription,
374377
) -> Result<LocallyTrackedSubscription, PowerSyncError> {
378+
debug_assert!(stream.is_default);
375379
let stmt = self.db.prepare_v2("INSERT INTO ps_stream_subscriptions (stream_name, active, is_default) VALUES (?, TRUE, TRUE) RETURNING *;")?;
376380
stmt.bind_text(1, &stream.name, sqlite_nostd::Destructor::STATIC)?;
377381

0 commit comments

Comments
 (0)