From acc669791f1c4bc39cfa1035d5bc3fa5a9d70545 Mon Sep 17 00:00:00 2001 From: Noam Spiegelstein Date: Wed, 30 Oct 2024 17:36:40 +0200 Subject: [PATCH 1/3] feat: allow DataStreamBuilder to retry queries if non fatal err Create err types BadPeerError,ParseDataError. later on the errors from P2PSyncClientError will be sorted to fatal and non-fatal(BadPeer). this will allow to retry on non fatal errors during parsing of data blocks. --- crates/papyrus_p2p_sync/src/client/header.rs | 12 ++-- .../papyrus_p2p_sync/src/client/state_diff.rs | 25 +++++--- .../src/client/stream_builder.rs | 59 ++++++++++++++++--- .../src/client/transaction.rs | 16 ++--- 4 files changed, 81 insertions(+), 31 deletions(-) diff --git a/crates/papyrus_p2p_sync/src/client/header.rs b/crates/papyrus_p2p_sync/src/client/header.rs index c8ebea3bac..39c452e2bb 100644 --- a/crates/papyrus_p2p_sync/src/client/header.rs +++ b/crates/papyrus_p2p_sync/src/client/header.rs @@ -10,7 +10,7 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::BlockNumber; use tracing::debug; -use super::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder}; +use super::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder, ParseDataError}; use super::{P2PSyncClientError, ALLOWED_SIGNATURES_LENGTH, NETWORK_DATA_TIMEOUT}; impl BlockData for SignedBlockHeader { @@ -69,7 +69,7 @@ impl DataStreamBuilder for HeaderStreamBuilder { >, block_number: BlockNumber, _storage_reader: &'a StorageReader, - ) -> BoxFuture<'a, Result, P2PSyncClientError>> { + ) -> BoxFuture<'a, Result, ParseDataError>> { async move { let maybe_signed_header = tokio::time::timeout(NETWORK_DATA_TIMEOUT, signed_headers_response_manager.next()) @@ -85,18 +85,18 @@ impl DataStreamBuilder for HeaderStreamBuilder { if block_number != signed_block_header.block_header.block_header_without_hash.block_number { - return Err(P2PSyncClientError::HeadersUnordered { + return Err(ParseDataError::Fatal(P2PSyncClientError::HeadersUnordered { expected_block_number: block_number, actual_block_number: signed_block_header .block_header .block_header_without_hash .block_number, - }); + })); } if signed_block_header.signatures.len() != ALLOWED_SIGNATURES_LENGTH { - return Err(P2PSyncClientError::WrongSignaturesLength { + return Err(ParseDataError::Fatal(P2PSyncClientError::WrongSignaturesLength { signatures: signed_block_header.signatures, - }); + })); } Ok(Some(signed_block_header)) } diff --git a/crates/papyrus_p2p_sync/src/client/state_diff.rs b/crates/papyrus_p2p_sync/src/client/state_diff.rs index c722dd0866..f50e739db6 100644 --- a/crates/papyrus_p2p_sync/src/client/state_diff.rs +++ b/crates/papyrus_p2p_sync/src/client/state_diff.rs @@ -13,7 +13,12 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::BlockNumber; use starknet_api::state::ThinStateDiff; -use crate::client::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder}; +use crate::client::stream_builder::{ + BlockData, + BlockNumberLimit, + DataStreamBuilder, + ParseDataError, +}; use crate::client::{P2PSyncClientError, NETWORK_DATA_TIMEOUT}; impl BlockData for (ThinStateDiff, BlockNumber) { @@ -44,7 +49,7 @@ impl DataStreamBuilder for StateDiffStreamBuilder { >, block_number: BlockNumber, storage_reader: &'a StorageReader, - ) -> BoxFuture<'a, Result, P2PSyncClientError>> { + ) -> BoxFuture<'a, Result, ParseDataError>> { async move { let mut result = ThinStateDiff::default(); let mut prev_result_len = 0; @@ -72,15 +77,17 @@ impl DataStreamBuilder for StateDiffStreamBuilder { if current_state_diff_len == 0 { return Ok(None); } else { - return Err(P2PSyncClientError::WrongStateDiffLength { - expected_length: target_state_diff_len, - possible_lengths: vec![current_state_diff_len], - }); + return Err(ParseDataError::Fatal( + P2PSyncClientError::WrongStateDiffLength { + expected_length: target_state_diff_len, + possible_lengths: vec![current_state_diff_len], + }, + )); } }; prev_result_len = current_state_diff_len; if state_diff_chunk.is_empty() { - return Err(P2PSyncClientError::EmptyStateDiffPart); + return Err(ParseDataError::Fatal(P2PSyncClientError::EmptyStateDiffPart)); } // It's cheaper to calculate the length of `state_diff_part` than the length of // `result`. @@ -89,10 +96,10 @@ impl DataStreamBuilder for StateDiffStreamBuilder { } if current_state_diff_len != target_state_diff_len { - return Err(P2PSyncClientError::WrongStateDiffLength { + return Err(ParseDataError::Fatal(P2PSyncClientError::WrongStateDiffLength { expected_length: target_state_diff_len, possible_lengths: vec![prev_result_len, current_state_diff_len], - }); + })); } validate_deprecated_declared_classes_non_conflicting(&result)?; diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index a7cd2cfa31..956ea976a3 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -11,7 +11,7 @@ use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query}; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::BlockNumber; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use super::{P2PSyncClientError, STEP}; @@ -46,7 +46,7 @@ where client_response_manager: &'a mut ClientResponsesManager>, block_number: BlockNumber, storage_reader: &'a StorageReader, - ) -> BoxFuture<'a, Result, P2PSyncClientError>>; + ) -> BoxFuture<'a, Result, ParseDataError>>; fn get_start_block_number(storage_reader: &StorageReader) -> Result; @@ -102,18 +102,30 @@ where while current_block_number.0 < end_block_number { match Self::parse_data_for_block( &mut client_response_manager, current_block_number, &storage_reader - ).await? { - Some(output) => yield Ok(Box::::from(Box::new(output))), - None => { + ).await { + Ok(Some(output)) => yield Ok(Box::::from(Box::new(output))), + Ok(None) => { debug!( - "Query for {:?} returned with partial data. Waiting {:?} before \ + "Query for {:?} on {:?} returned with partial data. Waiting {:?} before \ sending another query.", - Self::TYPE_DESCRIPTION, - wait_period_for_new_data + Self::TYPE_DESCRIPTION, current_block_number, wait_period_for_new_data ); tokio::time::sleep(wait_period_for_new_data).await; continue 'send_query_and_parse_responses; - } + }, + Err(ParseDataError::BadPeer(err)) => { + warn!( + "Query for {:?} on {:?} returned with bad peer error: {:?}. reporting \ + peer and retrying query.", + Self::TYPE_DESCRIPTION, current_block_number, err + ); + client_response_manager.report_peer(); + continue 'send_query_and_parse_responses; + }, + Err(ParseDataError::Fatal(err)) => { + yield Err(err); + return; + }, } info!("Added {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number); current_block_number = current_block_number.unchecked_next(); @@ -140,3 +152,32 @@ where .boxed() } } + +#[derive(thiserror::Error, Debug)] +pub(crate) enum BadPeerError {} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum ParseDataError { + #[error(transparent)] + Fatal(#[from] P2PSyncClientError), + #[error(transparent)] + BadPeer(#[from] BadPeerError), +} + +impl From for ParseDataError { + fn from(err: StorageError) -> Self { + ParseDataError::Fatal(P2PSyncClientError::StorageError(err)) + } +} + +impl From for ParseDataError { + fn from(err: tokio::time::error::Elapsed) -> Self { + ParseDataError::Fatal(P2PSyncClientError::NetworkTimeout(err)) + } +} + +impl From for ParseDataError { + fn from(err: ProtobufConversionError) -> Self { + ParseDataError::Fatal(P2PSyncClientError::ProtobufConversionError(err)) + } +} diff --git a/crates/papyrus_p2p_sync/src/client/transaction.rs b/crates/papyrus_p2p_sync/src/client/transaction.rs index 69589c2009..89f74646e1 100644 --- a/crates/papyrus_p2p_sync/src/client/transaction.rs +++ b/crates/papyrus_p2p_sync/src/client/transaction.rs @@ -8,7 +8,7 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::{BlockBody, BlockNumber}; use starknet_api::transaction::FullTransaction; -use super::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder}; +use super::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder, ParseDataError}; use super::{P2PSyncClientError, NETWORK_DATA_TIMEOUT}; impl BlockData for (BlockBody, BlockNumber) { @@ -33,7 +33,7 @@ impl DataStreamBuilder for TransactionStreamFactory { transactions_response_manager: &'a mut ClientResponsesManager>, block_number: BlockNumber, storage_reader: &'a StorageReader, - ) -> BoxFuture<'a, Result, P2PSyncClientError>> { + ) -> BoxFuture<'a, Result, ParseDataError>> { async move { let mut block_body = BlockBody::default(); let mut current_transaction_len = 0; @@ -57,11 +57,13 @@ impl DataStreamBuilder for TransactionStreamFactory { if current_transaction_len == 0 { return Ok(None); } else { - return Err(P2PSyncClientError::NotEnoughTransactions { - expected: target_transaction_len, - actual: current_transaction_len, - block_number: block_number.0, - }); + return Err(ParseDataError::Fatal( + P2PSyncClientError::NotEnoughTransactions { + expected: target_transaction_len, + actual: current_transaction_len, + block_number: block_number.0, + }, + )); } }; block_body.transactions.push(transaction); From 1500b59ebe689155a23d6e72d8806af933f4aca9 Mon Sep 17 00:00:00 2001 From: Noam Spiegelstein Date: Wed, 30 Oct 2024 17:53:04 +0200 Subject: [PATCH 2/3] refactor(sync): move non fatal errors from P2PSyncClientError to BadPeerError --- crates/papyrus_p2p_sync/src/client/header.rs | 12 +++++++--- crates/papyrus_p2p_sync/src/client/mod.rs | 23 +------------------ .../src/client/stream_builder.rs | 21 ++++++++++++++--- .../src/client/transaction.rs | 20 +++++++++------- 4 files changed, 40 insertions(+), 36 deletions(-) diff --git a/crates/papyrus_p2p_sync/src/client/header.rs b/crates/papyrus_p2p_sync/src/client/header.rs index 39c452e2bb..947f8ede25 100644 --- a/crates/papyrus_p2p_sync/src/client/header.rs +++ b/crates/papyrus_p2p_sync/src/client/header.rs @@ -10,7 +10,13 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::BlockNumber; use tracing::debug; -use super::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder, ParseDataError}; +use super::stream_builder::{ + BadPeerError, + BlockData, + BlockNumberLimit, + DataStreamBuilder, + ParseDataError, +}; use super::{P2PSyncClientError, ALLOWED_SIGNATURES_LENGTH, NETWORK_DATA_TIMEOUT}; impl BlockData for SignedBlockHeader { @@ -85,7 +91,7 @@ impl DataStreamBuilder for HeaderStreamBuilder { if block_number != signed_block_header.block_header.block_header_without_hash.block_number { - return Err(ParseDataError::Fatal(P2PSyncClientError::HeadersUnordered { + return Err(ParseDataError::BadPeer(BadPeerError::HeadersUnordered { expected_block_number: block_number, actual_block_number: signed_block_header .block_header @@ -94,7 +100,7 @@ impl DataStreamBuilder for HeaderStreamBuilder { })); } if signed_block_header.signatures.len() != ALLOWED_SIGNATURES_LENGTH { - return Err(ParseDataError::Fatal(P2PSyncClientError::WrongSignaturesLength { + return Err(ParseDataError::BadPeer(BadPeerError::WrongSignaturesLength { signatures: signed_block_header.signatures, })); } diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index 8cab726cd2..ea06367172 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -20,7 +20,6 @@ use papyrus_config::converters::deserialize_seconds_to_duration; use papyrus_config::dumping::{ser_optional_param, ser_param, SerializeConfig}; use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam}; use papyrus_network::network_manager::SqmrClientSender; -use papyrus_protobuf::converters::ProtobufConversionError; use papyrus_protobuf::sync::{ ClassQuery, DataOrFin, @@ -32,7 +31,7 @@ use papyrus_protobuf::sync::{ }; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use serde::{Deserialize, Serialize}; -use starknet_api::block::{BlockNumber, BlockSignature}; +use starknet_api::block::BlockNumber; use starknet_api::core::ClassHash; use starknet_api::transaction::FullTransaction; use state_diff::StateDiffStreamBuilder; @@ -122,23 +121,6 @@ impl Default for P2PSyncClientConfig { #[derive(thiserror::Error, Debug)] pub enum P2PSyncClientError { - // TODO(shahak): Remove this and report to network on invalid data once that's possible. - // TODO(shahak): Consider removing this error and handling unordered headers without failing. - #[error( - "Blocks returned unordered from the network. Expected header with \ - {expected_block_number}, got {actual_block_number}." - )] - HeadersUnordered { expected_block_number: BlockNumber, actual_block_number: BlockNumber }, - #[error( - "Expected to receive {expected} transactions for {block_number} from the network. Got \ - {actual} instead." - )] - // TODO(eitan): Remove this and report to network on invalid data once that's possible. - NotEnoughTransactions { expected: usize, actual: usize, block_number: u64 }, - #[error("Expected to receive one signature from the network. got {signatures:?} instead.")] - // TODO(shahak): Remove this and report to network on invalid data once that's possible. - // Right now we support only one signature. In the future we will support many signatures. - WrongSignaturesLength { signatures: Vec }, // TODO(shahak): Remove this and report to network on invalid data once that's possible. #[error( "The header says that the block's state diff should be of length {expected_length}. Can \ @@ -156,9 +138,6 @@ pub enum P2PSyncClientError { // TODO(shahak): Remove this and report to network on invalid data once that's possible. #[error("Network returned more responses than expected for a query.")] TooManyResponses, - // TODO(shahak): Remove this and report to network on invalid data once that's possible. - #[error(transparent)] - ProtobufConversionError(#[from] ProtobufConversionError), #[error( "Encountered an old header in the storage at {block_number:?} that's missing the field \ {missing_field}. Re-sync the node from {block_number:?} from a node that provides this \ diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index 956ea976a3..d1b3f1e188 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -10,7 +10,7 @@ use papyrus_protobuf::converters::ProtobufConversionError; use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query}; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; -use starknet_api::block::BlockNumber; +use starknet_api::block::{BlockNumber, BlockSignature}; use tracing::{debug, info, warn}; use super::{P2PSyncClientError, STEP}; @@ -154,7 +154,22 @@ where } #[derive(thiserror::Error, Debug)] -pub(crate) enum BadPeerError {} +pub(crate) enum BadPeerError { + #[error( + "Blocks returned unordered from the network. Expected header with \ + {expected_block_number}, got {actual_block_number}." + )] + HeadersUnordered { expected_block_number: BlockNumber, actual_block_number: BlockNumber }, + #[error( + "Expected to receive {expected} transactions for {block_number} from the network. Got \ + {actual} instead." + )] + NotEnoughTransactions { expected: usize, actual: usize, block_number: u64 }, + #[error("Expected to receive one signature from the network. got {signatures:?} instead.")] + WrongSignaturesLength { signatures: Vec }, + #[error(transparent)] + ProtobufConversionError(#[from] ProtobufConversionError), +} #[derive(thiserror::Error, Debug)] pub(crate) enum ParseDataError { @@ -178,6 +193,6 @@ impl From for ParseDataError { impl From for ParseDataError { fn from(err: ProtobufConversionError) -> Self { - ParseDataError::Fatal(P2PSyncClientError::ProtobufConversionError(err)) + ParseDataError::BadPeer(BadPeerError::ProtobufConversionError(err)) } } diff --git a/crates/papyrus_p2p_sync/src/client/transaction.rs b/crates/papyrus_p2p_sync/src/client/transaction.rs index 89f74646e1..e3c11a8df8 100644 --- a/crates/papyrus_p2p_sync/src/client/transaction.rs +++ b/crates/papyrus_p2p_sync/src/client/transaction.rs @@ -8,7 +8,13 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::{BlockBody, BlockNumber}; use starknet_api::transaction::FullTransaction; -use super::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder, ParseDataError}; +use super::stream_builder::{ + BadPeerError, + BlockData, + BlockNumberLimit, + DataStreamBuilder, + ParseDataError, +}; use super::{P2PSyncClientError, NETWORK_DATA_TIMEOUT}; impl BlockData for (BlockBody, BlockNumber) { @@ -57,13 +63,11 @@ impl DataStreamBuilder for TransactionStreamFactory { if current_transaction_len == 0 { return Ok(None); } else { - return Err(ParseDataError::Fatal( - P2PSyncClientError::NotEnoughTransactions { - expected: target_transaction_len, - actual: current_transaction_len, - block_number: block_number.0, - }, - )); + return Err(ParseDataError::BadPeer(BadPeerError::NotEnoughTransactions { + expected: target_transaction_len, + actual: current_transaction_len, + block_number: block_number.0, + })); } }; block_body.transactions.push(transaction); From 780199e8c51c6bab7c0293ef75e86006a211d0f3 Mon Sep 17 00:00:00 2001 From: Noam Spiegelstein Date: Mon, 4 Nov 2024 11:28:09 +0200 Subject: [PATCH 3/3] refactor(sync): move state diff related errors to BadPeerError moves non-fatal state diff related errors to BadPeerError to allow retrying the query with a different peer. previously state diff related tests checked these non-fatal errors as assertions to certain scenarios. we now just check that these scenarios cause peer reporting. --- crates/papyrus_p2p_sync/src/client/mod.rs | 14 ------- .../papyrus_p2p_sync/src/client/state_diff.rs | 29 +++++++-------- .../src/client/state_diff_test.rs | 37 +++++++++---------- .../src/client/stream_builder.rs | 11 ++++++ 4 files changed, 42 insertions(+), 49 deletions(-) diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index ea06367172..34b0efb9cd 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -121,20 +121,6 @@ impl Default for P2PSyncClientConfig { #[derive(thiserror::Error, Debug)] pub enum P2PSyncClientError { - // TODO(shahak): Remove this and report to network on invalid data once that's possible. - #[error( - "The header says that the block's state diff should be of length {expected_length}. Can \ - only divide the state diff parts into the following lengths: {possible_lengths:?}." - )] - WrongStateDiffLength { expected_length: usize, possible_lengths: Vec }, - // TODO(shahak): Remove this and report to network on invalid data once that's possible. - #[error("Two state diff parts for the same state diff are conflicting.")] - ConflictingStateDiffParts, - // TODO(shahak): Remove this and report to network on invalid data once that's possible. - #[error( - "Received an empty state diff part from the network (this is a potential DDoS vector)." - )] - EmptyStateDiffPart, // TODO(shahak): Remove this and report to network on invalid data once that's possible. #[error("Network returned more responses than expected for a query.")] TooManyResponses, diff --git a/crates/papyrus_p2p_sync/src/client/state_diff.rs b/crates/papyrus_p2p_sync/src/client/state_diff.rs index f50e739db6..4eeebd36c4 100644 --- a/crates/papyrus_p2p_sync/src/client/state_diff.rs +++ b/crates/papyrus_p2p_sync/src/client/state_diff.rs @@ -13,6 +13,7 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::BlockNumber; use starknet_api::state::ThinStateDiff; +use super::stream_builder::BadPeerError; use crate::client::stream_builder::{ BlockData, BlockNumberLimit, @@ -77,17 +78,15 @@ impl DataStreamBuilder for StateDiffStreamBuilder { if current_state_diff_len == 0 { return Ok(None); } else { - return Err(ParseDataError::Fatal( - P2PSyncClientError::WrongStateDiffLength { - expected_length: target_state_diff_len, - possible_lengths: vec![current_state_diff_len], - }, - )); + return Err(ParseDataError::BadPeer(BadPeerError::WrongStateDiffLength { + expected_length: target_state_diff_len, + possible_lengths: vec![current_state_diff_len], + })); } }; prev_result_len = current_state_diff_len; if state_diff_chunk.is_empty() { - return Err(ParseDataError::Fatal(P2PSyncClientError::EmptyStateDiffPart)); + return Err(ParseDataError::BadPeer(BadPeerError::EmptyStateDiffPart)); } // It's cheaper to calculate the length of `state_diff_part` than the length of // `result`. @@ -96,7 +95,7 @@ impl DataStreamBuilder for StateDiffStreamBuilder { } if current_state_diff_len != target_state_diff_len { - return Err(ParseDataError::Fatal(P2PSyncClientError::WrongStateDiffLength { + return Err(ParseDataError::BadPeer(BadPeerError::WrongStateDiffLength { expected_length: target_state_diff_len, possible_lengths: vec![prev_result_len, current_state_diff_len], })); @@ -119,7 +118,7 @@ impl DataStreamBuilder for StateDiffStreamBuilder { fn unite_state_diffs( state_diff: &mut ThinStateDiff, state_diff_chunk: StateDiffChunk, -) -> Result<(), P2PSyncClientError> { +) -> Result<(), BadPeerError> { match state_diff_chunk { StateDiffChunk::ContractDiff(contract_diff) => { if let Some(class_hash) = contract_diff.class_hash { @@ -128,12 +127,12 @@ fn unite_state_diffs( .insert(contract_diff.contract_address, class_hash) .is_some() { - return Err(P2PSyncClientError::ConflictingStateDiffParts); + return Err(BadPeerError::ConflictingStateDiffParts); } } if let Some(nonce) = contract_diff.nonce { if state_diff.nonces.insert(contract_diff.contract_address, nonce).is_some() { - return Err(P2PSyncClientError::ConflictingStateDiffParts); + return Err(BadPeerError::ConflictingStateDiffParts); } } if !contract_diff.storage_diffs.is_empty() { @@ -141,7 +140,7 @@ fn unite_state_diffs( Some(storage_diffs) => { for (k, v) in contract_diff.storage_diffs { if storage_diffs.insert(k, v).is_some() { - return Err(P2PSyncClientError::ConflictingStateDiffParts); + return Err(BadPeerError::ConflictingStateDiffParts); } } } @@ -159,7 +158,7 @@ fn unite_state_diffs( .insert(declared_class.class_hash, declared_class.compiled_class_hash) .is_some() { - return Err(P2PSyncClientError::ConflictingStateDiffParts); + return Err(BadPeerError::ConflictingStateDiffParts); } } StateDiffChunk::DeprecatedDeclaredClass(deprecated_declared_class) => { @@ -175,13 +174,13 @@ fn unite_state_diffs( )] fn validate_deprecated_declared_classes_non_conflicting( state_diff: &ThinStateDiff, -) -> Result<(), P2PSyncClientError> { +) -> Result<(), BadPeerError> { // TODO(shahak): Check if sorting is more efficient. if state_diff.deprecated_declared_classes.len() == state_diff.deprecated_declared_classes.iter().cloned().collect::>().len() { Ok(()) } else { - Err(P2PSyncClientError::ConflictingStateDiffParts) + Err(BadPeerError::ConflictingStateDiffParts) } } diff --git a/crates/papyrus_p2p_sync/src/client/state_diff_test.rs b/crates/papyrus_p2p_sync/src/client/state_diff_test.rs index 81f61f4148..2a321570e7 100644 --- a/crates/papyrus_p2p_sync/src/client/state_diff_test.rs +++ b/crates/papyrus_p2p_sync/src/client/state_diff_test.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use assert_matches::assert_matches; use futures::{FutureExt, StreamExt}; use indexmap::indexmap; use papyrus_protobuf::sync::{ @@ -33,7 +32,7 @@ use super::test_utils::{ STATE_DIFF_QUERY_LENGTH, WAIT_PERIOD_FOR_NEW_DATA, }; -use super::{P2PSyncClientError, StateDiffQuery}; +use super::StateDiffQuery; const TIMEOUT_FOR_TEST: Duration = Duration::from_secs(5); @@ -193,14 +192,15 @@ async fn state_diff_basic_flow() { } } +// TODO(noamsp): Consider verifying that ParseDataError::BadPeerError(EmptyStateDiffPart) was +// returned from parse_data_for_block. We currently dont have a way to check this. #[tokio::test] async fn state_diff_empty_state_diff() { - validate_state_diff_fails(1, vec![Some(StateDiffChunk::default())], |error| { - assert_matches!(error, P2PSyncClientError::EmptyStateDiffPart) - }) - .await; + validate_state_diff_fails(1, vec![Some(StateDiffChunk::default())]).await; } +// TODO(noamsp): Consider verifying that ParseDataError::BadPeerError(WrongStateDiffLength) was +// returned from parse_data_for_block. We currently dont have a way to check this. #[tokio::test] async fn state_diff_stopped_in_middle() { validate_state_diff_fails( @@ -209,29 +209,31 @@ async fn state_diff_stopped_in_middle() { Some(StateDiffChunk::DeprecatedDeclaredClass(DeprecatedDeclaredClass::default())), None, ], - |error| assert_matches!(error, P2PSyncClientError::WrongStateDiffLength { expected_length, possible_lengths } if expected_length == 2 && possible_lengths == vec![1]), ) .await; } +// TODO(noamsp): Consider verifying that ParseDataError::BadPeerError(WrongStateDiffLength) was +// returned from parse_data_for_block. We currently dont have a way to check this. #[tokio::test] async fn state_diff_not_split_correctly() { validate_state_diff_fails( 2, vec![ Some(StateDiffChunk::DeprecatedDeclaredClass(DeprecatedDeclaredClass::default())), - Some(StateDiffChunk::ContractDiff(ContractDiff{ + Some(StateDiffChunk::ContractDiff(ContractDiff { contract_address: ContractAddress::default(), class_hash: Some(ClassHash::default()), nonce: Some(Nonce::default()), ..Default::default() - }),) + })), ], - |error| assert_matches!(error, P2PSyncClientError::WrongStateDiffLength { expected_length, possible_lengths } if expected_length == 2 && possible_lengths == vec![1, 3]), ) .await; } +// TODO(noamsp): Consider verifying that ParseDataError::BadPeerError(ConflictingStateDiffParts) +// was returned from parse_data_for_block. We currently dont have a way to check this. #[tokio::test] async fn state_diff_conflicting() { validate_state_diff_fails( @@ -248,7 +250,6 @@ async fn state_diff_conflicting() { ..Default::default() })), ], - |error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts), ) .await; validate_state_diff_fails( @@ -265,7 +266,6 @@ async fn state_diff_conflicting() { ..Default::default() })), ], - |error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts), ) .await; validate_state_diff_fails( @@ -280,7 +280,6 @@ async fn state_diff_conflicting() { compiled_class_hash: CompiledClassHash::default(), })), ], - |error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts), ) .await; validate_state_diff_fails( @@ -293,7 +292,6 @@ async fn state_diff_conflicting() { class_hash: ClassHash::default(), })), ], - |error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts), ) .await; validate_state_diff_fails( @@ -310,7 +308,6 @@ async fn state_diff_conflicting() { ..Default::default() })), ], - |error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts), ) .await; } @@ -318,7 +315,6 @@ async fn state_diff_conflicting() { async fn validate_state_diff_fails( state_diff_length_in_header: usize, state_diff_chunks: Vec>, - error_validator: impl Fn(P2PSyncClientError), ) { let TestArgs { p2p_sync, @@ -386,14 +382,15 @@ async fn validate_state_diff_fails( .await .unwrap(); } - tokio::time::sleep(TIMEOUT_FOR_TEST).await; - panic!("P2P sync did not receive error"); + + // Asserts that a peer was reported due to a non-fatal error. + mock_state_diff_responses_manager.assert_reported(TIMEOUT_FOR_TEST).await; }; tokio::select! { sync_result = p2p_sync.run() => { - let sync_err = sync_result.unwrap_err(); - error_validator(sync_err); + sync_result.unwrap(); + panic!("P2P sync aborted with no failure."); } _ = parse_queries_future => {} } diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index d1b3f1e188..7a1a258d35 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -167,6 +167,17 @@ pub(crate) enum BadPeerError { NotEnoughTransactions { expected: usize, actual: usize, block_number: u64 }, #[error("Expected to receive one signature from the network. got {signatures:?} instead.")] WrongSignaturesLength { signatures: Vec }, + #[error( + "The header says that the block's state diff should be of length {expected_length}. Can \ + only divide the state diff parts into the following lengths: {possible_lengths:?}." + )] + WrongStateDiffLength { expected_length: usize, possible_lengths: Vec }, + #[error("Two state diff parts for the same state diff are conflicting.")] + ConflictingStateDiffParts, + #[error( + "Received an empty state diff part from the network (this is a potential DDoS vector)." + )] + EmptyStateDiffPart, #[error(transparent)] ProtobufConversionError(#[from] ProtobufConversionError), }