Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flaky p2p sync test #2503

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions crates/common/src/state_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -644,6 +644,14 @@ impl DeclaredClasses {
}
}

#[derive(Debug, thiserror::Error)]
pub enum StateUpdateError {
#[error("Contract class hash missing for contract {0}")]
ContractClassHashMissing(ContractAddress),
#[error(transparent)]
StorageError(#[from] anyhow::Error),
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
11 changes: 3 additions & 8 deletions crates/merkle-tree/src/contract_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Context;
use pathfinder_common::state_update::{ReverseContractUpdate, StorageRef};
use pathfinder_common::state_update::{ReverseContractUpdate, StateUpdateError, StorageRef};
use pathfinder_common::{
BlockNumber,
ClassHash,
Expand Down Expand Up @@ -55,7 +55,7 @@ pub fn update_contract_state(
transaction: &Transaction<'_>,
verify_hashes: bool,
block: BlockNumber,
) -> anyhow::Result<ContractStateUpdateResult> {
) -> Result<ContractStateUpdateResult, StateUpdateError> {
// Load the contract tree and insert the updates.
let (new_root, trie_update) = if !updates.is_empty() {
let mut contract_tree = match block.parent() {
Expand Down Expand Up @@ -95,12 +95,7 @@ pub fn update_contract_state(
transaction
.contract_class_hash(block.into(), contract_address)
.context("Querying contract's class hash")?
.with_context(|| {
format!(
"Contract's class hash is missing, block: {block}, contract_address: \
{contract_address}"
)
})?
.ok_or(StateUpdateError::ContractClassHashMissing(contract_address))?
};

let nonce = if let Some(nonce) = new_nonce {
Expand Down
15 changes: 9 additions & 6 deletions crates/merkle-tree/src/starknet_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Context;
use pathfinder_common::state_update::StateUpdateRef;
use pathfinder_common::state_update::{StateUpdateError, StateUpdateRef};
use pathfinder_common::{BlockNumber, ClassCommitment, StorageCommitment};
use pathfinder_storage::{Storage, Transaction};

Expand All @@ -14,7 +14,7 @@ pub fn update_starknet_state(
// we need this so that we can create extra read-only transactions for
// parallel contract state updates
storage: Storage,
) -> anyhow::Result<(StorageCommitment, ClassCommitment)> {
) -> Result<(StorageCommitment, ClassCommitment), StateUpdateError> {
use rayon::prelude::*;

let mut storage_commitment_tree = match block.parent() {
Expand All @@ -36,10 +36,13 @@ pub fn update_starknet_state(
|connection, (contract_address, update)| {
let connection = match connection {
Ok(connection) => connection,
Err(e) => anyhow::bail!(
"Failed to create database connection in rayon thread: {}",
e
),
Err(e) => {
return Err(anyhow::anyhow!(
"Failed to create database connection in rayon thread: {}",
e
)
.into())
}
};
let transaction = connection.transaction()?;
update_contract_state(
Expand Down
115 changes: 90 additions & 25 deletions crates/pathfinder/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ where
continue_from
}
Err(SyncError::Fatal(mut error)) => {
tracing::error!(%error, "Stopping checkpoint sync");
tracing::error!(?error, "Stopping checkpoint sync");
return Err(error.take_or_deep_clone());
}
Err(error) => {
Expand Down Expand Up @@ -204,7 +204,7 @@ where
match result {
Ok(_) => tracing::debug!("Restarting track sync: unexpected end of Block stream"),
Err(SyncError::Fatal(mut error)) => {
tracing::error!(%error, "Stopping track sync");
tracing::error!(?error, "Stopping track sync");
return Err(error.take_or_deep_clone());
}
Err(error) => {
Expand Down Expand Up @@ -315,7 +315,7 @@ mod tests {
use p2p::libp2p::PeerId;
use pathfinder_common::event::Event;
use pathfinder_common::receipt::Receipt;
use pathfinder_common::state_update::StateUpdateData;
use pathfinder_common::state_update::{self, StateUpdateData};
use pathfinder_common::transaction::Transaction;
use pathfinder_common::{
BlockHeader,
Expand Down Expand Up @@ -376,27 +376,49 @@ mod tests {
(public_key, blocks)
}

async fn sync_done_watch(storage: Storage, expected_last: BlockNumber) {
async fn sync_done_watch(
mut last_event_rx: tokio::sync::mpsc::Receiver<()>,
storage: Storage,
expected_last: BlockNumber,
) {
// Don't poll the DB until the last event is emitted from the fake P2P client
last_event_rx.recv().await.unwrap();

let mut interval = tokio::time::interval_at(
// Give sync some slack to process the last event and commit the last block
tokio::time::Instant::now() + Duration::from_millis(500),
Duration::from_millis(200),
);

let mut start = std::time::Instant::now();
tokio::task::spawn_blocking(move || loop {
std::thread::sleep(Duration::from_millis(200));
let mut db = storage.connection().unwrap();
let db = db.transaction().unwrap();
let header = db.block_header(expected_last.into()).unwrap();
if let Some(header) = header {
let after = start.elapsed();
if after > TIMEOUT {
break;
}

if header.number == expected_last {
tracing::info!(?after, "Sync done");
break;
loop {
interval.tick().await;
let storage = storage.clone();

let done = tokio::task::spawn_blocking(move || {
let mut db = storage.connection().unwrap();
let db = db.transaction().unwrap();
// We don't have to query the entire block, as tracking sync commits entire
// blocks to the DB, so if the header is there, the block is there
let header = db.block_header(expected_last.into()).unwrap();
if let Some(header) = header {
if header.number == expected_last {
let after = start.elapsed();
tracing::info!(?after, "Sync done");
return true;
}
}

false
})
.await
.unwrap();

if done {
break;
}
})
.await
.unwrap();
}
}

#[derive(Copy, Clone, Debug)]
Expand Down Expand Up @@ -449,6 +471,8 @@ mod tests {
})]
#[test_log::test(tokio::test)]
async fn sync(#[case] error_setup: ErrorSetup) {
use futures::FutureExt;

let (public_key, blocks) = generate_fake_blocks(ALL_BLOCKS as usize);
let last_header = &blocks.last().unwrap().header.header;
let last_checkpoint_header = &blocks[LAST_IN_CHECKPOINT.get() as usize].header.header;
Expand All @@ -458,13 +482,15 @@ mod tests {
let expect_fully_synced_blocks = error_setup.expected_last_synced.is_full();

let error_trigger = ErrorTrigger::new(error_setup.fatal_at);
let (last_event_tx, mut last_event_rx) = tokio::sync::mpsc::channel(1);

let sync = Sync {
storage: storage.clone(),
p2p: FakeP2PClient {
blocks: blocks.clone(),
error_trigger: error_trigger.clone(),
storage: storage.clone(),
last_event_tx,
},
// We use `l1_checkpoint_override` instead
eth_client: EthereumClient::new("https://unused.com").unwrap(),
Expand All @@ -483,13 +509,42 @@ mod tests {
block_hash_db: None,
};

tokio::select! {
let sync_done = if error_setup.fatal_at.is_some() {
// Sync will either bail on fatal error or time out
std::future::pending().boxed()
} else {
// Successful sync never ends
sync_done_watch(last_event_rx, storage.clone(), expected_last_synced_block).boxed()
};

let bail_early = tokio::select! {
result = tokio::time::timeout(TIMEOUT, sync.run()) => match result {
Ok(Ok(())) => unreachable!("Sync does not exit upon success, sync_done_watch should have been triggered"),
Ok(Err(e)) => tracing::debug!(%e, "Sync failed with a fatal error"),
Err(_) => tracing::debug!("Test timed out"),
Ok(Err(error)) => {
let unexpected_fatal = error_setup.fatal_at.is_none();
if unexpected_fatal {
tracing::debug!(?error, "Sync failed with an unexpected fatal error");
} else {
tracing::debug!(?error, "Sync failed with a fatal error");
}
unexpected_fatal
},
Err(_) => {
tracing::debug!("Test timed out");
true
},
},
_ = sync_done_watch(storage.clone(), expected_last_synced_block) => tracing::debug!("Sync completion detected"),
_ = sync_done => {
tracing::debug!("Sync completion detected");
false
},
};

if bail_early {
blocks.iter().for_each(|b| {
tracing::error!(block=%b.header.header.number, state_update=?b.state_update.as_ref().unwrap());
});
return;
}

assert!(error_trigger.all_errors_triggered());
Expand Down Expand Up @@ -590,6 +645,7 @@ mod tests {
pub blocks: Vec<Block>,
pub error_trigger: ErrorTrigger,
pub storage: Storage,
pub last_event_tx: tokio::sync::mpsc::Sender<()>,
}

#[derive(Clone)]
Expand All @@ -606,7 +662,11 @@ mod tests {
(0..=4)
.map(|_| AtomicU64::new((0..CHECKPOINT_BLOCKS).fake()))
.chain(
(5..=9).map(|_| AtomicU64::new((CHECKPOINT_BLOCKS..ALL_BLOCKS).fake())),
// The last block is always error free to ease checking for sync
// completion
(5..=9).map(|_| {
AtomicU64::new((CHECKPOINT_BLOCKS..ALL_BLOCKS - 1).fake())
}),
)
.collect(),
)),
Expand Down Expand Up @@ -1002,6 +1062,11 @@ mod tests {
e.push(Ok(Faker.fake()));
}

if block == LAST_IN_TRACK {
let last_event_tx = self.last_event_tx.clone();
last_event_tx.send(()).await.unwrap();
}

Some((PeerId::random(), stream::iter(e)))
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/pathfinder/src/sync/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub(super) enum SyncError {
ClassDefinitionsDeclarationsMismatch(PeerId),
#[error("Class hash computation failed")]
ClassHashComputationError(PeerId),
#[error("Contract's class is missing")]
ContractClassMissing(PeerId),
#[error("Discontinuity in header chain")]
Discontinuity(PeerId),
#[error("Event commitment mismatch")]
Expand Down
11 changes: 10 additions & 1 deletion crates/pathfinder/src/sync/state_updates.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use pathfinder_common::state_update::{
ContractClassUpdate,
ContractUpdate,
StateUpdateData,
StateUpdateError,
StateUpdateRef,
SystemContractUpdate,
};
Expand Down Expand Up @@ -285,7 +286,15 @@ pub async fn batch_update_starknet_state(
tail,
storage.clone(),
)
.context("Updating Starknet state")?;
.map_err(|error| match error {
StateUpdateError::ContractClassHashMissing(for_contract) => {
tracing::debug!(%for_contract, "Contract class hash is missing");
SyncError::ContractClassMissing(peer)
}
StateUpdateError::StorageError(error) => SyncError::Fatal(Arc::new(
error.context(format!("Updating Starknet state, tail {tail}")),
)),
})?;
let state_commitment = StateCommitment::calculate(storage_commitment, class_commitment);
let expected_state_commitment = db
.state_commitment(tail.into())
Expand Down
2 changes: 1 addition & 1 deletion crates/pathfinder/src/sync/track.rs
Original file line number Diff line number Diff line change
Expand Up @@ -827,7 +827,7 @@ impl ProcessStage for StoreBlock {
block_number,
self.storage.clone(),
)
.context("Updating Starknet state")?;
.with_context(|| format!("Updating Starknet state, block_number {block_number}"))?;

// Ensure that roots match.
let state_commitment = StateCommitment::calculate(storage_commitment, class_commitment);
Expand Down
3 changes: 2 additions & 1 deletion crates/storage/src/fake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use pathfinder_common::receipt::Receipt;
use pathfinder_common::state_update::{
ContractClassUpdate,
ContractUpdate,
StateUpdateError,
StateUpdateRef,
SystemContractUpdate,
};
Expand Down Expand Up @@ -70,7 +71,7 @@ pub type UpdateTriesFn = Box<
bool,
BlockNumber,
Storage,
) -> anyhow::Result<(StorageCommitment, ClassCommitment)>,
) -> Result<(StorageCommitment, ClassCommitment), StateUpdateError>,
>;

pub struct Config {
Expand Down
Loading