Skip to content

Commit

Permalink
[ENG-1776] Delete cloud ops after they've been ingested (#2512)
Browse files Browse the repository at this point in the history
* delete cloud ops after they've been ingested

* give wait_tx to p2p sync ingest

* remove Ingested event

* add sync docs for setting relation fields

* Update core/crates/sync/README.md

Co-authored-by: Oscar Beaumont <[email protected]>

---------

Co-authored-by: Oscar Beaumont <[email protected]>
  • Loading branch information
Brendonovich and oscartbeaumont authored May 31, 2024
1 parent ab97572 commit 735e80a
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 18 deletions.
33 changes: 31 additions & 2 deletions core/crates/sync/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

Spacedrive's sync system. Consumes types and helpers from `sd-sync`.

## Using Sync

### Creating Records

Prepare a sync id by creating or obtaining its value,
Expand Down Expand Up @@ -101,3 +99,34 @@ sync.write_ops(
)
)
```

### Setting Relation Fields

Setting relation fields requires providing the Sync ID of the relation.
Setting the relation field's scalar fields instead will not properly sync then relation,
usually because the scalar fields are local and disconnected from the Sync ID.

```rs
let (sync_params, db_params): (Vec<_>, Vec<_>) = [
sync_db_entry!(
prisma_sync::object::SyncId { pub_id: object_pub_id },
file_path::object
)
].into_iter().unzip();

sync.write_ops(
db,
(
sync.shared_update(
prisma_sync::file_path::SyncId {
pub_id: file_path_pub_id
},
sync_params
),
db.file_path().update(
file_path::id::equals(file_path_id),
db_params
)
)
)
```
19 changes: 11 additions & 8 deletions core/crates/sync/src/db_operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ impl cloud_crdt_include::Data {
Uuid::from_slice(&self.instance.pub_id).unwrap()
}

pub fn into_operation(self) -> CRDTOperation {
CRDTOperation {
instance: self.instance(),
timestamp: self.timestamp(),
record_id: rmp_serde::from_slice(&self.record_id).unwrap(),
model: self.model as u16,
data: serde_json::from_slice(&self.data).unwrap(),
}
pub fn into_operation(self) -> (i32, CRDTOperation) {
(
self.id,
CRDTOperation {
instance: self.instance(),
timestamp: self.timestamp(),
record_id: rmp_serde::from_slice(&self.record_id).unwrap(),
model: self.model as u16,
data: serde_json::from_slice(&self.data).unwrap(),
},
)
}
}

Expand Down
9 changes: 7 additions & 2 deletions core/crates/sync/src/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub enum Request {
timestamps: Vec<(Uuid, NTP64)>,
tx: oneshot::Sender<()>,
},
Ingested,
// Ingested,
FinishedIngesting,
}

Expand Down Expand Up @@ -129,6 +129,10 @@ impl Actor {
}
}

if let Some(tx) = event.wait_tx {
tx.send(()).ok();
}

match event.has_more {
true => State::RetrievingMessages,
false => {
Expand Down Expand Up @@ -421,7 +425,7 @@ impl Actor {

self.timestamps.write().await.insert(instance, new_ts);

self.io.req_tx.send(Request::Ingested).await.ok();
// self.io.req_tx.send(Request::Ingested).await.ok();

Ok(())
}
Expand All @@ -445,6 +449,7 @@ pub struct MessagesEvent {
pub instance_id: Uuid,
pub messages: CompressedCRDTOperations,
pub has_more: bool,
pub wait_tx: Option<oneshot::Sender<()>>,
}

impl ActorTypes for Actor {
Expand Down
2 changes: 1 addition & 1 deletion core/crates/sync/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ impl Manager {
pub async fn get_cloud_ops(
&self,
args: GetOpsArgs,
) -> prisma_client_rust::Result<Vec<CRDTOperation>> {
) -> prisma_client_rust::Result<Vec<(i32, CRDTOperation)>> {
let db = &self.db;

macro_rules! db_args {
Expand Down
7 changes: 4 additions & 3 deletions core/crates/sync/tests/mock_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,14 @@ impl Instance {
messages: CompressedCRDTOperations::new(messages),
has_more: false,
instance_id: instance1.id,
wait_tx: None,
}))
.await
.unwrap();
}
ingest::Request::Ingested => {
instance2.sync.tx.send(SyncMessage::Ingested).ok();
}
// ingest::Request::Ingested => {
// instance2.sync.tx.send(SyncMessage::Ingested).ok();
// }
ingest::Request::FinishedIngesting => {}
}
}
Expand Down
20 changes: 18 additions & 2 deletions core/src/cloud/sync/ingest.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use sd_prisma::prisma::cloud_crdt_operation;
use sd_sync::CompressedCRDTOperations;
use std::sync::{
atomic::{AtomicBool, Ordering},
Expand Down Expand Up @@ -44,13 +45,15 @@ pub async fn run_actor(
_ => continue,
};

let ops = err_break!(
let (ops_ids, ops): (Vec<_>, Vec<_>) = err_break!(
sync.get_cloud_ops(GetOpsArgs {
clocks: timestamps,
count: OPS_PER_REQUEST,
})
.await
);
)
.into_iter()
.unzip();

if ops.is_empty() {
break;
Expand All @@ -63,16 +66,29 @@ pub async fn run_actor(
ops.last().map(|operation| operation.timestamp.as_u64()),
);

let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();

err_break!(
sync.ingest
.event_tx
.send(sd_core_sync::Event::Messages(MessagesEvent {
instance_id: sync.instance,
has_more: ops.len() == OPS_PER_REQUEST as usize,
messages: CompressedCRDTOperations::new(ops),
wait_tx: Some(wait_tx)
}))
.await
);

err_break!(wait_rx.await);

err_break!(
sync.db
.cloud_crdt_operation()
.delete_many(vec![cloud_crdt_operation::id::in_vec(ops_ids)])
.exec()
.await
);
}
}
}
Expand Down
5 changes: 5 additions & 0 deletions core/src/p2p/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,20 @@ mod responder {

let rx::Operations(ops) = rx::Operations::from_stream(stream).await.unwrap();

let (wait_tx, wait_rx) = tokio::sync::oneshot::channel::<()>();

ingest
.event_tx
.send(Event::Messages(MessagesEvent {
instance_id: library.sync.instance,
has_more: ops.len() == OPS_PER_REQUEST as usize,
messages: ops,
wait_tx: Some(wait_tx),
}))
.await
.expect("TODO: Handle ingest channel closed, so we don't loose ops");

wait_rx.await.unwrap()
}

debug!("Sync responder done");
Expand Down

0 comments on commit 735e80a

Please sign in to comment.