Skip to content

Commit

Permalink
Improve syncevent
Browse files Browse the repository at this point in the history
  • Loading branch information
Rigidity committed Nov 23, 2024
1 parent 9c004af commit df7f3d1
Show file tree
Hide file tree
Showing 14 changed files with 112 additions and 65 deletions.
20 changes: 10 additions & 10 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 15 additions & 2 deletions crates/sage-wallet/src/queues/transaction_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ impl TransactionQueue {
}

if resolved {
self.sync_sender
.send(SyncEvent::TransactionEnded {
transaction_id,
success: true,
})
.await
.ok();
continue;
}

Expand All @@ -141,11 +148,17 @@ impl TransactionQueue {
let mut tx = self.db.tx().await?;
safely_remove_transaction(&mut tx, transaction_id).await?;
tx.commit().await?;

self.sync_sender
.send(SyncEvent::TransactionEnded {
transaction_id,
success: false,
})
.await
.ok();
}
}

self.sync_sender.send(SyncEvent::Transaction).await.ok();

Ok(())
}
}
Expand Down
17 changes: 13 additions & 4 deletions crates/sage-wallet/src/sync_manager/sync_event.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
use std::net::IpAddr;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
use chia::protocol::{Bytes32, CoinState};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SyncEvent {
Start(IpAddr),
Stop,
Subscribed,
Derivation,
CoinState,
Transaction,
DerivationIndex {
next_index: u32,
},
CoinsUpdated {
coin_states: Vec<CoinState>,
},
TransactionEnded {
transaction_id: Bytes32,
success: bool,
},
PuzzleBatchSynced,
CatInfo,
DidInfo,
Expand Down
24 changes: 18 additions & 6 deletions crates/sage-wallet/src/sync_manager/wallet_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ pub async fn sync_wallet(
}
tx.commit().await?;

sync_sender.send(SyncEvent::Derivation).await.ok();
sync_sender
.send(SyncEvent::DerivationIndex {
next_index: start_index,
})
.await
.ok();

for batch in p2_puzzle_hashes.chunks(500) {
derive_more |= sync_puzzle_hashes(
Expand Down Expand Up @@ -216,22 +221,28 @@ pub async fn incremental_sync(
) -> Result<(), WalletError> {
let mut tx = wallet.db.tx().await?;

for coin_state in coin_states {
for &coin_state in &coin_states {
upsert_coin(&mut tx, coin_state, None).await?;

if coin_state.spent_height.is_some() {
handle_spent_coin(&mut tx, coin_state.coin.coin_id()).await?;
}
}

sync_sender
.send(SyncEvent::CoinsUpdated { coin_states })
.await
.ok();

let mut derived = false;

let mut next_index = tx.derivation_index(false).await?;

if derive_automatically {
let max_index = tx
.max_used_derivation_index(false)
.await?
.map_or(0, |index| index + 1);
let mut next_index = tx.derivation_index(false).await?;

while next_index < max_index + 500 {
wallet
Expand All @@ -245,10 +256,11 @@ pub async fn incremental_sync(

tx.commit().await?;

sync_sender.send(SyncEvent::CoinState).await.ok();

if derived {
sync_sender.send(SyncEvent::Derivation).await.ok();
sync_sender
.send(SyncEvent::DerivationIndex { next_index })
.await
.ok();
}

Ok(())
Expand Down
21 changes: 16 additions & 5 deletions crates/sage-wallet/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ impl TestWallet {
index: key_index,
};

test.consume_until(SyncEvent::Subscribed).await;
test.consume_until(|event| matches!(event, SyncEvent::Subscribed))
.await;
assert_eq!(test.wallet.db.balance().await?, balance as u128);

Ok(test)
Expand Down Expand Up @@ -170,18 +171,28 @@ impl TestWallet {
Ok(())
}

pub async fn consume_until(&mut self, event: SyncEvent) {
pub async fn consume_until(&mut self, f: impl Fn(SyncEvent) -> bool) {
loop {
let next = timeout(Duration::from_secs(10), self.events.recv())
.await
.unwrap_or_else(|_| panic!("timed out listening for {event:?}"))
.unwrap_or_else(|| panic!("missing {event:?}"));
.unwrap_or_else(|_| panic!("timed out listening for event"))
.unwrap_or_else(|| panic!("missing next event"));

debug!("Consuming event: {next:?}");

if event == next {
if f(next) {
return;
}
}
}

pub async fn wait_for_coins(&mut self) {
self.consume_until(|event| matches!(event, SyncEvent::CoinsUpdated { .. }))
.await;
}

pub async fn wait_for_puzzles(&mut self) {
self.consume_until(|event| matches!(event, SyncEvent::PuzzleBatchSynced))
.await;
}
}
8 changes: 4 additions & 4 deletions crates/sage-wallet/src/wallet/cats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl Wallet {
mod tests {
use test_log::test;

use crate::{SyncEvent, TestWallet};
use crate::TestWallet;

#[test(tokio::test)]
async fn test_send_cat() -> anyhow::Result<()> {
Expand All @@ -150,7 +150,7 @@ mod tests {
assert_eq!(coin_spends.len(), 2);

test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;

assert_eq!(test.wallet.db.balance().await?, 500);
assert_eq!(test.wallet.db.spendable_coins().await?.len(), 1);
Expand All @@ -164,7 +164,7 @@ mod tests {
assert_eq!(coin_spends.len(), 1);

test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;

assert_eq!(test.wallet.db.cat_balance(asset_id).await?, 1000);
assert_eq!(test.wallet.db.spendable_cat_coins(asset_id).await?.len(), 2);
Expand All @@ -176,7 +176,7 @@ mod tests {
assert_eq!(coin_spends.len(), 3);

test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;

assert_eq!(test.wallet.db.balance().await?, 0);
assert_eq!(test.wallet.db.spendable_coins().await?.len(), 0);
Expand Down
10 changes: 5 additions & 5 deletions crates/sage-wallet/src/wallet/did_assign.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ mod tests {
use chia::puzzles::nft::NftMetadata;
use test_log::test;

use crate::{SyncEvent, TestWallet, WalletNftMint};
use crate::{TestWallet, WalletNftMint};

use super::*;

Expand All @@ -142,7 +142,7 @@ mod tests {

let (coin_spends, did) = test.wallet.create_did(0, false, true).await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;

let (coin_spends, mut nfts, _did) = test
.wallet
Expand All @@ -159,7 +159,7 @@ mod tests {
)
.await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::PuzzleBatchSynced).await;
test.wait_for_puzzles().await;

let nft = nfts.remove(0);

Expand All @@ -168,7 +168,7 @@ mod tests {
.assign_nfts(vec![nft.info.launcher_id], None, 0, false, true)
.await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;

let coin_spends = test
.wallet
Expand All @@ -181,7 +181,7 @@ mod tests {
)
.await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions crates/sage-wallet/src/wallet/dids.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl Wallet {

#[cfg(test)]
mod tests {
use crate::{SyncEvent, TestWallet};
use crate::TestWallet;

use test_log::test;

Expand All @@ -134,15 +134,15 @@ mod tests {

let (coin_spends, did) = test.wallet.create_did(0, false, true).await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;

for _ in 0..2 {
let coin_spends = test
.wallet
.transfer_dids(vec![did.info.launcher_id], test.puzzle_hash, 0, false, true)
.await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;
}

assert_ne!(
Expand Down
10 changes: 5 additions & 5 deletions crates/sage-wallet/src/wallet/nfts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl Wallet {
mod tests {
use test_log::test;

use crate::{SyncEvent, TestWallet};
use crate::TestWallet;

use super::*;

Expand All @@ -242,7 +242,7 @@ mod tests {

let (coin_spends, did) = test.wallet.create_did(0, false, true).await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;

let (coin_spends, mut nfts, _did) = test
.wallet
Expand All @@ -259,7 +259,7 @@ mod tests {
)
.await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::PuzzleBatchSynced).await;
test.wait_for_puzzles().await;

let puzzle_hash = test.wallet.p2_puzzle_hash(false, true).await?;

Expand All @@ -275,7 +275,7 @@ mod tests {
.add_nft_uri(nft.info.launcher_id, 0, item, false, true)
.await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;
}

for _ in 0..2 {
Expand All @@ -284,7 +284,7 @@ mod tests {
.transfer_nfts(vec![nft.info.launcher_id], puzzle_hash, 0, false, true)
.await?;
test.transact(coin_spends).await?;
test.consume_until(SyncEvent::CoinState).await;
test.wait_for_coins().await;
}

Ok(())
Expand Down
Loading

0 comments on commit df7f3d1

Please sign in to comment.