diff --git a/zcash_client_backend/CHANGELOG.md b/zcash_client_backend/CHANGELOG.md index 99fed1c8b9..4ab8b0875f 100644 --- a/zcash_client_backend/CHANGELOG.md +++ b/zcash_client_backend/CHANGELOG.md @@ -7,11 +7,17 @@ and this library adheres to Rust's notion of ## [Unreleased] +### Added +- `zcash_client_backend::sync`: + - `SyncHandle` + - `ShutdownListener` + ### Changed - The `Account` trait now uses an associated type for its `AccountId` type instead of a type parameter. This change allows for the simplification of some type signatures. - `zcash_client_backend::sync::run`: + - It now takes a `ShutdownListener` argument. - Transparent outputs are now refreshed in addition to shielded notes. ### Fixed diff --git a/zcash_client_backend/src/sync.rs b/zcash_client_backend/src/sync.rs index 2ddce5fbec..611e74bb26 100644 --- a/zcash_client_backend/src/sync.rs +++ b/zcash_client_backend/src/sync.rs @@ -51,8 +51,51 @@ use { zcash_protocol::{consensus::NetworkUpgrade, value::Zatoshis}, }; -/// Scans the chain until the wallet is up-to-date. +/// A handle for managing a sync process. +pub struct SyncHandle { + shutdown_tx: tokio::sync::oneshot::Sender<()>, +} + +impl SyncHandle { + /// Constructs a new sync handle, including a [`ShutdownListener`]. + pub fn new() -> (Self, ShutdownListener) { + let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); + + (Self { shutdown_tx }, ShutdownListener { shutdown_rx }) + } + + /// Requests that [`run`] halt its progress and return at the next opportunity. + pub fn request_shutdown(self) { + let _ = self.shutdown_tx.send(()); + } +} + +/// A listener for [`SyncHandle::request_shutdown`]. +pub struct ShutdownListener { + shutdown_rx: tokio::sync::oneshot::Receiver<()>, +} + +impl ShutdownListener { + fn requested(&mut self) -> bool { + const NOT_TRIGGERED: Result<(), tokio::sync::oneshot::error::TryRecvError> = + Err(tokio::sync::oneshot::error::TryRecvError::Empty); + + let signal = self.shutdown_rx.try_recv(); + + match signal { + NOT_TRIGGERED => false, + // A shutdown has been requested. + _ => true, + } + } +} + +/// Scans the chain until the wallet is up-to-date, or `shutdown` is triggered. +/// +/// `shutdown` can be obtained with [`SyncHandle::new`], and is triggered with +/// [`SyncHandle::request_shutdown`]. pub async fn run( + mut shutdown: ShutdownListener, client: &mut CompactTxStreamerClient, params: &P, db_cache: &CaT, @@ -83,6 +126,7 @@ where update_subtree_roots(client, db_data).await?; while running( + &mut shutdown, client, params, db_cache, @@ -98,6 +142,7 @@ where } async fn running( + shutdown: &mut ShutdownListener, client: &mut CompactTxStreamerClient, params: &P, db_cache: &CaT, @@ -140,6 +185,9 @@ where // 5) Get the suggested scan ranges from the wallet database let mut scan_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?; + if shutdown.requested() { + return Ok(false); + } // Store the handles to cached block deletions (which we spawn into separate // tasks to allow us to continue downloading and scanning other ranges). @@ -154,7 +202,11 @@ where Some(scan_range) if scan_range.priority() == ScanPriority::Verify => { // Download the blocks in `scan_range` into the block source, // overwriting any existing blocks in this range. - download_blocks(client, db_cache, scan_range).await?; + download_blocks(client, db_cache, scan_range, shutdown).await?; + + if shutdown.requested() { + return Ok(false); + } let chain_state = download_chain_state(client, scan_range.block_range().start - 1).await?; @@ -172,6 +224,9 @@ where if scan_ranges_updated { // The suggested scan ranges have been updated, so we re-request. scan_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?; + if shutdown.requested() { + return Ok(false); + } } else { // At this point, the cache and scanned data are locally // consistent (though not necessarily consistent with the @@ -192,6 +247,9 @@ where // and calling `scan_cached_blocks` on each range. let scan_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?; debug!("Suggested ranges: {:?}", scan_ranges); + if shutdown.requested() { + return Ok(false); + } for scan_range in scan_ranges.into_iter().flat_map(|r| { // Limit the number of blocks we download and scan at any one time. (0..).scan(r, |acc, _| { @@ -209,7 +267,11 @@ where }) }) { // Download the blocks in `scan_range` into the block source. - download_blocks(client, db_cache, &scan_range).await?; + download_blocks(client, db_cache, &scan_range, shutdown).await?; + + if shutdown.requested() { + return Ok(false); + } let chain_state = download_chain_state(client, scan_range.block_range().start - 1).await?; @@ -220,14 +282,14 @@ where // Delete the now-scanned blocks. block_deletions.push(db_cache.delete(scan_range)); - if scan_ranges_updated { + if scan_ranges_updated || shutdown.requested() { // The suggested scan ranges have been updated (either due to a continuity // error or because a higher priority range has been added). info!("Waiting for cached blocks to be deleted..."); for deletion in block_deletions { deletion.await.map_err(Error::Cache)?; } - return Ok(true); + return Ok(!shutdown.requested()); } } @@ -335,6 +397,7 @@ async fn download_blocks( client: &mut CompactTxStreamerClient, db_cache: &CaT, scan_range: &ScanRange, + shutdown: &mut ShutdownListener, ) -> Result<(), Error> where ChT: GrpcService, @@ -353,12 +416,18 @@ where start: Some(start), end: Some(end), }; - let compact_blocks = client - .get_block_range(range) - .await? - .into_inner() - .try_collect::>() - .await?; + let compact_block_stream = client.get_block_range(range).await?.into_inner(); + tokio::pin!(compact_block_stream); + + let mut compact_blocks = vec![]; + while let Some(block) = compact_block_stream.try_next().await? { + compact_blocks.push(block); + + if shutdown.requested() { + // Stop fetching blocks; we will exit once we return. + break; + } + } db_cache .insert(compact_blocks)