Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

zcash_client_backend: Add cancellation logic to sync::run #1534

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions zcash_client_backend/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,17 @@ and this library adheres to Rust's notion of

## [Unreleased]

### Added
- `zcash_client_backend::sync`:
- `SyncHandle`
- `ShutdownListener`

### Changed
- The `Account` trait now uses an associated type for its `AccountId`
type instead of a type parameter. This change allows for the simplification
of some type signatures.
- `zcash_client_backend::sync::run`:
- It now takes a `ShutdownListener` argument.
- Transparent outputs are now refreshed in addition to shielded notes.

### Fixed
Expand Down
91 changes: 80 additions & 11 deletions zcash_client_backend/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,51 @@
zcash_protocol::{consensus::NetworkUpgrade, value::Zatoshis},
};

/// Scans the chain until the wallet is up-to-date.
/// A handle for managing a sync process.
pub struct SyncHandle {
shutdown_tx: tokio::sync::oneshot::Sender<()>,

Check failure on line 56 in zcash_client_backend/src/sync.rs

View workflow job for this annotation

GitHub Actions / Clippy (MSRV)

failed to resolve: use of undeclared crate or module `tokio`

error[E0433]: failed to resolve: use of undeclared crate or module `tokio` --> zcash_client_backend/src/sync.rs:56:18 | 56 | shutdown_tx: tokio::sync::oneshot::Sender<()>, | ^^^^^ use of undeclared crate or module `tokio`
}

impl SyncHandle {
/// Constructs a new sync handle, including a [`ShutdownListener`].
pub fn new() -> (Self, ShutdownListener) {
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();

Check failure on line 62 in zcash_client_backend/src/sync.rs

View workflow job for this annotation

GitHub Actions / Clippy (MSRV)

failed to resolve: use of undeclared crate or module `tokio`

error[E0433]: failed to resolve: use of undeclared crate or module `tokio` --> zcash_client_backend/src/sync.rs:62:42 | 62 | let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel(); | ^^^^^ use of undeclared crate or module `tokio`

(Self { shutdown_tx }, ShutdownListener { shutdown_rx })
}

/// Requests that [`run`] halt its progress and return at the next opportunity.
pub fn request_shutdown(self) {
let _ = self.shutdown_tx.send(());
}
}

/// A listener for [`SyncHandle::request_shutdown`].
pub struct ShutdownListener {
shutdown_rx: tokio::sync::oneshot::Receiver<()>,

Check failure on line 75 in zcash_client_backend/src/sync.rs

View workflow job for this annotation

GitHub Actions / Clippy (MSRV)

failed to resolve: use of undeclared crate or module `tokio`

error[E0433]: failed to resolve: use of undeclared crate or module `tokio` --> zcash_client_backend/src/sync.rs:75:18 | 75 | shutdown_rx: tokio::sync::oneshot::Receiver<()>, | ^^^^^ use of undeclared crate or module `tokio`
}

impl ShutdownListener {
fn requested(&mut self) -> bool {
const NOT_TRIGGERED: Result<(), tokio::sync::oneshot::error::TryRecvError> =

Check failure on line 80 in zcash_client_backend/src/sync.rs

View workflow job for this annotation

GitHub Actions / Clippy (MSRV)

failed to resolve: use of undeclared crate or module `tokio`

error[E0433]: failed to resolve: use of undeclared crate or module `tokio` --> zcash_client_backend/src/sync.rs:80:41 | 80 | const NOT_TRIGGERED: Result<(), tokio::sync::oneshot::error::TryRecvError> = | ^^^^^ use of undeclared crate or module `tokio`
Err(tokio::sync::oneshot::error::TryRecvError::Empty);

Check failure on line 81 in zcash_client_backend/src/sync.rs

View workflow job for this annotation

GitHub Actions / Clippy (MSRV)

failed to resolve: use of undeclared crate or module `tokio`

error[E0433]: failed to resolve: use of undeclared crate or module `tokio` --> zcash_client_backend/src/sync.rs:81:17 | 81 | Err(tokio::sync::oneshot::error::TryRecvError::Empty); | ^^^^^ use of undeclared crate or module `tokio`

let signal = self.shutdown_rx.try_recv();

match signal {
NOT_TRIGGERED => false,
// A shutdown has been requested.
_ => true,
}
}
}

/// Scans the chain until the wallet is up-to-date, or `shutdown` is triggered.
///
/// `shutdown` can be obtained with [`SyncHandle::new`], and is triggered with
/// [`SyncHandle::request_shutdown`].
pub async fn run<P, ChT, CaT, DbT>(
mut shutdown: ShutdownListener,
client: &mut CompactTxStreamerClient<ChT>,
params: &P,
db_cache: &CaT,
Expand Down Expand Up @@ -83,6 +126,7 @@
update_subtree_roots(client, db_data).await?;

while running(
&mut shutdown,
client,
params,
db_cache,
Expand All @@ -98,6 +142,7 @@
}

async fn running<P, ChT, CaT, DbT, TrErr>(
shutdown: &mut ShutdownListener,
client: &mut CompactTxStreamerClient<ChT>,
params: &P,
db_cache: &CaT,
Expand Down Expand Up @@ -140,6 +185,9 @@

// 5) Get the suggested scan ranges from the wallet database
let mut scan_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?;
if shutdown.requested() {
return Ok(false);
}

// Store the handles to cached block deletions (which we spawn into separate
// tasks to allow us to continue downloading and scanning other ranges).
Expand All @@ -154,7 +202,11 @@
Some(scan_range) if scan_range.priority() == ScanPriority::Verify => {
// Download the blocks in `scan_range` into the block source,
// overwriting any existing blocks in this range.
download_blocks(client, db_cache, scan_range).await?;
download_blocks(client, db_cache, scan_range, shutdown).await?;

if shutdown.requested() {
return Ok(false);
}

let chain_state =
download_chain_state(client, scan_range.block_range().start - 1).await?;
Expand All @@ -172,6 +224,9 @@
if scan_ranges_updated {
// The suggested scan ranges have been updated, so we re-request.
scan_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?;
if shutdown.requested() {
return Ok(false);
}
} else {
// At this point, the cache and scanned data are locally
// consistent (though not necessarily consistent with the
Expand All @@ -192,6 +247,9 @@
// and calling `scan_cached_blocks` on each range.
let scan_ranges = db_data.suggest_scan_ranges().map_err(Error::Wallet)?;
debug!("Suggested ranges: {:?}", scan_ranges);
if shutdown.requested() {
return Ok(false);
}
for scan_range in scan_ranges.into_iter().flat_map(|r| {
// Limit the number of blocks we download and scan at any one time.
(0..).scan(r, |acc, _| {
Expand All @@ -209,7 +267,11 @@
})
}) {
// Download the blocks in `scan_range` into the block source.
download_blocks(client, db_cache, &scan_range).await?;
download_blocks(client, db_cache, &scan_range, shutdown).await?;

if shutdown.requested() {
return Ok(false);
}

let chain_state = download_chain_state(client, scan_range.block_range().start - 1).await?;

Expand All @@ -220,14 +282,14 @@
// Delete the now-scanned blocks.
block_deletions.push(db_cache.delete(scan_range));

if scan_ranges_updated {
if scan_ranges_updated || shutdown.requested() {
// The suggested scan ranges have been updated (either due to a continuity
// error or because a higher priority range has been added).
info!("Waiting for cached blocks to be deleted...");
for deletion in block_deletions {
deletion.await.map_err(Error::Cache)?;
}
return Ok(true);
return Ok(!shutdown.requested());
}
}

Expand Down Expand Up @@ -335,6 +397,7 @@
client: &mut CompactTxStreamerClient<ChT>,
db_cache: &CaT,
scan_range: &ScanRange,
shutdown: &mut ShutdownListener,
) -> Result<(), Error<CaT::Error, DbErr, TrErr>>
where
ChT: GrpcService<BoxBody>,
Expand All @@ -353,12 +416,18 @@
start: Some(start),
end: Some(end),
};
let compact_blocks = client
.get_block_range(range)
.await?
.into_inner()
.try_collect::<Vec<_>>()
.await?;
let compact_block_stream = client.get_block_range(range).await?.into_inner();
tokio::pin!(compact_block_stream);

Check failure on line 420 in zcash_client_backend/src/sync.rs

View workflow job for this annotation

GitHub Actions / Clippy (MSRV)

failed to resolve: use of undeclared crate or module `tokio`

error[E0433]: failed to resolve: use of undeclared crate or module `tokio` --> zcash_client_backend/src/sync.rs:420:5 | 420 | tokio::pin!(compact_block_stream); | ^^^^^ use of undeclared crate or module `tokio`

let mut compact_blocks = vec![];
while let Some(block) = compact_block_stream.try_next().await? {
compact_blocks.push(block);

if shutdown.requested() {
// Stop fetching blocks; we will exit once we return.
break;
}
}

db_cache
.insert(compact_blocks)
Expand Down
Loading