Skip to content

Commit

Permalink
fix(papyrus_p2p_sync): treat no fin as BadPeerError (#3976)
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama authored Feb 5, 2025
1 parent cb6e7a3 commit 81a1e77
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 14 deletions.
25 changes: 21 additions & 4 deletions crates/papyrus_p2p_sync/src/client/block_data_stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,25 @@ where
Some(Ok(DataOrFin(None))) => {
debug!("Network query ending at block {} for {:?} finished", end_block_number, Self::TYPE_DESCRIPTION);
},
Some(_) => Err(P2pSyncClientError::TooManyResponses)?,
None => Err(P2pSyncClientError::ReceiverChannelTerminated {
type_description: Self::TYPE_DESCRIPTION
})?,
Some(_) => {
warn!(
"Query for {:?} returned more messages after {:?} even though it \
should have returned Fin. reporting peer and retrying query.",
Self::TYPE_DESCRIPTION, current_block_number
);
client_response_manager.report_peer();
continue 'send_query_and_parse_responses;
}

None => {
warn!(
"Query for {:?} didn't send Fin after block {:?}. \
Reporting peer and retrying query.",
Self::TYPE_DESCRIPTION, current_block_number
);
client_response_manager.report_peer();
continue 'send_query_and_parse_responses;
}
}
}
}.boxed()
Expand All @@ -220,6 +235,8 @@ where

#[derive(thiserror::Error, Debug)]
pub(crate) enum BadPeerError {
#[error("The sender end of the response receivers for {type_description:?} was closed.")]
SessionEndedWithoutFin { type_description: &'static str },
#[error(
"Blocks returned unordered from the network. Expected header with \
{expected_block_number}, got {actual_block_number}."
Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/class.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ impl BlockDataStreamBuilder<(ApiContractClass, ClassHash)> for ClassStreamBuilde

while current_class_len < target_class_len {
let maybe_contract_class = classes_response_manager.next().await.ok_or(
P2pSyncClientError::ReceiverChannelTerminated {
ParseDataError::BadPeer(BadPeerError::SessionEndedWithoutFin {
type_description: Self::TYPE_DESCRIPTION,
},
}),
)?;
let Some((api_contract_class, class_hash)) = maybe_contract_class?.0 else {
if current_class_len == 0 {
Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ impl BlockDataStreamBuilder<SignedBlockHeader> for HeaderStreamBuilder {
) -> BoxFuture<'a, Result<Option<Self::Output>, ParseDataError>> {
async move {
let maybe_signed_header = signed_headers_response_manager.next().await.ok_or(
P2pSyncClientError::ReceiverChannelTerminated {
ParseDataError::BadPeer(BadPeerError::SessionEndedWithoutFin {
type_description: Self::TYPE_DESCRIPTION,
},
}),
)?;
let Some(signed_block_header) = maybe_signed_header?.0 else {
return Ok(None);
Expand Down
2 changes: 0 additions & 2 deletions crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,6 @@ pub enum P2pSyncClientError {
field."
)]
OldHeaderInStorage { block_number: BlockNumber, missing_field: &'static str },
#[error("The sender end of the response receivers for {type_description:?} was closed.")]
ReceiverChannelTerminated { type_description: &'static str },
#[error(transparent)]
StorageError(#[from] StorageError),
#[error(transparent)]
Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/state_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ impl BlockDataStreamBuilder<StateDiffChunk> for StateDiffStreamBuilder {
let maybe_state_diff_chunk = state_diff_chunks_response_manager
.next()
.await
.ok_or(P2pSyncClientError::ReceiverChannelTerminated {
.ok_or(ParseDataError::BadPeer(BadPeerError::SessionEndedWithoutFin {
type_description: Self::TYPE_DESCRIPTION,
})?;
}))?;
let Some(state_diff_chunk) = maybe_state_diff_chunk?.0 else {
if current_state_diff_len == 0 {
return Ok(None);
Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,9 @@ impl BlockDataStreamBuilder<FullTransaction> for TransactionStreamFactory {
.n_transactions;
while current_transaction_len < target_transaction_len {
let maybe_transaction = transactions_response_manager.next().await.ok_or(
P2pSyncClientError::ReceiverChannelTerminated {
ParseDataError::BadPeer(BadPeerError::SessionEndedWithoutFin {
type_description: Self::TYPE_DESCRIPTION,
},
}),
)?;
let Some(FullTransaction { transaction, transaction_output, transaction_hash }) =
maybe_transaction?.0
Expand Down

0 comments on commit 81a1e77

Please sign in to comment.