From 3374dd0b1e310c5be3a353cfe541fb933e0ba85d Mon Sep 17 00:00:00 2001 From: Krzysztof Lis Date: Tue, 5 Mar 2024 11:26:18 +0100 Subject: [PATCH] feat(sync/p2p): drive stream to completion --- crates/pathfinder/src/sync/p2p.rs | 21 +++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/crates/pathfinder/src/sync/p2p.rs b/crates/pathfinder/src/sync/p2p.rs index dc27b47ada..6e7ade27e6 100644 --- a/crates/pathfinder/src/sync/p2p.rs +++ b/crates/pathfinder/src/sync/p2p.rs @@ -28,6 +28,8 @@ use crate::state::block_hash::{ calculate_transaction_commitment, TransactionCommitmentFinalHashType, }; +use self::state_updates::ContractDiffSyncError; + /// Provides P2P sync capability for blocks secured by L1. #[derive(Clone)] pub struct Sync { @@ -308,7 +310,7 @@ impl Sync { .context("Finding next missing state update")? { let getter = getter.clone(); - let _stream = self + let result = self .p2p .clone() .contract_updates_stream(start, stop, getter) @@ -320,7 +322,22 @@ impl Sync { }) // Persist state updates (without: state commitments and declared classes) .and_then(|x| state_updates::persist(self.storage.clone(), x)) - .inspect_ok(|x| tracing::info!(tail=%x, "State update chunk synced")); + .inspect_ok(|x| tracing::info!(tail=%x, "State update chunk synced")) + // Drive stream to completion. + .try_fold((), |_, _| std::future::ready(Ok(()))) + .await; + + match result { + Ok(()) => { + tracing::info!("Syncing contract updates complete"); + } + Err(ContractDiffSyncError::StorageCommitmentMismatch(peer_data)) => { + tracing::debug!(peer=%peer_data.peer, block=%peer_data.data, "Error while streaming contract updates: storage commitment mismatch"); + } + Err(ContractDiffSyncError::DatabaseOrComputeError(error)) => { + tracing::debug!(%error, "Error while streaming contract updates"); + } + } } Ok(())