Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sync): move non fatal errors from P2PSyncClientError to BadPeerError #1822

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions crates/papyrus_p2p_sync/src/client/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use tracing::debug;

use super::stream_builder::{BlockData, BlockNumberLimit, DataStreamBuilder, ParseDataError};
use super::stream_builder::{
BadPeerError,
BlockData,
BlockNumberLimit,
DataStreamBuilder,
ParseDataError,
};
use super::{P2PSyncClientError, ALLOWED_SIGNATURES_LENGTH, NETWORK_DATA_TIMEOUT};

impl BlockData for SignedBlockHeader {
Expand Down Expand Up @@ -84,7 +90,7 @@ impl DataStreamBuilder<SignedBlockHeader> for HeaderStreamBuilder {
if block_number
!= signed_block_header.block_header.block_header_without_hash.block_number
{
return Err(ParseDataError::Fatal(P2PSyncClientError::HeadersUnordered {
return Err(ParseDataError::BadPeer(BadPeerError::HeadersUnordered {
expected_block_number: block_number,
actual_block_number: signed_block_header
.block_header
Expand All @@ -93,7 +99,7 @@ impl DataStreamBuilder<SignedBlockHeader> for HeaderStreamBuilder {
}));
}
if signed_block_header.signatures.len() != ALLOWED_SIGNATURES_LENGTH {
return Err(ParseDataError::Fatal(P2PSyncClientError::WrongSignaturesLength {
return Err(ParseDataError::BadPeer(BadPeerError::WrongSignaturesLength {
signatures: signed_block_header.signatures,
}));
}
Expand Down
37 changes: 1 addition & 36 deletions crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use papyrus_config::converters::deserialize_seconds_to_duration;
use papyrus_config::dumping::{ser_optional_param, ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use papyrus_network::network_manager::SqmrClientSender;
use papyrus_protobuf::converters::ProtobufConversionError;
use papyrus_protobuf::sync::{
ClassQuery,
DataOrFin,
Expand All @@ -32,7 +31,7 @@ use papyrus_protobuf::sync::{
};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockNumber, BlockSignature};
use starknet_api::block::BlockNumber;
use starknet_api::core::ClassHash;
use starknet_api::transaction::FullTransaction;
use state_diff::StateDiffStreamBuilder;
Expand Down Expand Up @@ -122,43 +121,9 @@ impl Default for P2PSyncClientConfig {

#[derive(thiserror::Error, Debug)]
pub enum P2PSyncClientError {
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
// TODO(shahak): Consider removing this error and handling unordered headers without failing.
#[error(
"Blocks returned unordered from the network. Expected header with \
{expected_block_number}, got {actual_block_number}."
)]
HeadersUnordered { expected_block_number: BlockNumber, actual_block_number: BlockNumber },
#[error(
"Expected to receive {expected} transactions for {block_number} from the network. Got \
{actual} instead."
)]
// TODO(eitan): Remove this and report to network on invalid data once that's possible.
NotEnoughTransactions { expected: usize, actual: usize, block_number: u64 },
#[error("Expected to receive one signature from the network. got {signatures:?} instead.")]
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
// Right now we support only one signature. In the future we will support many signatures.
WrongSignaturesLength { signatures: Vec<BlockSignature> },
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error(
"The header says that the block's state diff should be of length {expected_length}. Can \
only divide the state diff parts into the following lengths: {possible_lengths:?}."
)]
WrongStateDiffLength { expected_length: usize, possible_lengths: Vec<usize> },
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error("Two state diff parts for the same state diff are conflicting.")]
ConflictingStateDiffParts,
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error(
"Received an empty state diff part from the network (this is a potential DDoS vector)."
)]
EmptyStateDiffPart,
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error("Network returned more responses than expected for a query.")]
TooManyResponses,
// TODO(shahak): Remove this and report to network on invalid data once that's possible.
#[error(transparent)]
ProtobufConversionError(#[from] ProtobufConversionError),
#[error(
"Encountered an old header in the storage at {block_number:?} that's missing the field \
{missing_field}. Re-sync the node from {block_number:?} from a node that provides this \
Expand Down
29 changes: 14 additions & 15 deletions crates/papyrus_p2p_sync/src/client/state_diff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use starknet_api::state::ThinStateDiff;

use super::stream_builder::BadPeerError;
use crate::client::stream_builder::{
BlockData,
BlockNumberLimit,
Expand Down Expand Up @@ -76,17 +77,15 @@ impl DataStreamBuilder<StateDiffChunk> for StateDiffStreamBuilder {
if current_state_diff_len == 0 {
return Ok(None);
} else {
return Err(ParseDataError::Fatal(
P2PSyncClientError::WrongStateDiffLength {
expected_length: target_state_diff_len,
possible_lengths: vec![current_state_diff_len],
},
));
return Err(ParseDataError::BadPeer(BadPeerError::WrongStateDiffLength {
expected_length: target_state_diff_len,
possible_lengths: vec![current_state_diff_len],
}));
}
};
prev_result_len = current_state_diff_len;
if state_diff_chunk.is_empty() {
return Err(ParseDataError::Fatal(P2PSyncClientError::EmptyStateDiffPart));
return Err(ParseDataError::BadPeer(BadPeerError::EmptyStateDiffPart));
}
// It's cheaper to calculate the length of `state_diff_part` than the length of
// `result`.
Expand All @@ -95,7 +94,7 @@ impl DataStreamBuilder<StateDiffChunk> for StateDiffStreamBuilder {
}

if current_state_diff_len != target_state_diff_len {
return Err(ParseDataError::Fatal(P2PSyncClientError::WrongStateDiffLength {
return Err(ParseDataError::BadPeer(BadPeerError::WrongStateDiffLength {
expected_length: target_state_diff_len,
possible_lengths: vec![prev_result_len, current_state_diff_len],
}));
Expand All @@ -118,7 +117,7 @@ impl DataStreamBuilder<StateDiffChunk> for StateDiffStreamBuilder {
fn unite_state_diffs(
state_diff: &mut ThinStateDiff,
state_diff_chunk: StateDiffChunk,
) -> Result<(), P2PSyncClientError> {
) -> Result<(), BadPeerError> {
match state_diff_chunk {
StateDiffChunk::ContractDiff(contract_diff) => {
if let Some(class_hash) = contract_diff.class_hash {
Expand All @@ -127,20 +126,20 @@ fn unite_state_diffs(
.insert(contract_diff.contract_address, class_hash)
.is_some()
{
return Err(P2PSyncClientError::ConflictingStateDiffParts);
return Err(BadPeerError::ConflictingStateDiffParts);
}
}
if let Some(nonce) = contract_diff.nonce {
if state_diff.nonces.insert(contract_diff.contract_address, nonce).is_some() {
return Err(P2PSyncClientError::ConflictingStateDiffParts);
return Err(BadPeerError::ConflictingStateDiffParts);
}
}
if !contract_diff.storage_diffs.is_empty() {
match state_diff.storage_diffs.get_mut(&contract_diff.contract_address) {
Some(storage_diffs) => {
for (k, v) in contract_diff.storage_diffs {
if storage_diffs.insert(k, v).is_some() {
return Err(P2PSyncClientError::ConflictingStateDiffParts);
return Err(BadPeerError::ConflictingStateDiffParts);
}
}
}
Expand All @@ -158,7 +157,7 @@ fn unite_state_diffs(
.insert(declared_class.class_hash, declared_class.compiled_class_hash)
.is_some()
{
return Err(P2PSyncClientError::ConflictingStateDiffParts);
return Err(BadPeerError::ConflictingStateDiffParts);
}
}
StateDiffChunk::DeprecatedDeclaredClass(deprecated_declared_class) => {
Expand All @@ -174,13 +173,13 @@ fn unite_state_diffs(
)]
fn validate_deprecated_declared_classes_non_conflicting(
state_diff: &ThinStateDiff,
) -> Result<(), P2PSyncClientError> {
) -> Result<(), BadPeerError> {
// TODO(shahak): Check if sorting is more efficient.
if state_diff.deprecated_declared_classes.len()
== state_diff.deprecated_declared_classes.iter().cloned().collect::<HashSet<_>>().len()
{
Ok(())
} else {
Err(P2PSyncClientError::ConflictingStateDiffParts)
Err(BadPeerError::ConflictingStateDiffParts)
}
}
37 changes: 17 additions & 20 deletions crates/papyrus_p2p_sync/src/client/state_diff_test.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
use std::time::Duration;

use assert_matches::assert_matches;
use futures::{FutureExt, StreamExt};
use indexmap::indexmap;
use papyrus_protobuf::sync::{
Expand Down Expand Up @@ -33,7 +32,7 @@ use super::test_utils::{
STATE_DIFF_QUERY_LENGTH,
WAIT_PERIOD_FOR_NEW_DATA,
};
use super::{P2PSyncClientError, StateDiffQuery};
use super::StateDiffQuery;

const TIMEOUT_FOR_TEST: Duration = Duration::from_secs(5);

Expand Down Expand Up @@ -193,14 +192,15 @@ async fn state_diff_basic_flow() {
}
}

// TODO(noamsp): Consider verifying that ParseDataError::BadPeerError(EmptyStateDiffPart) was
// returned from parse_data_for_block. We currently dont have a way to check this.
#[tokio::test]
async fn state_diff_empty_state_diff() {
validate_state_diff_fails(1, vec![Some(StateDiffChunk::default())], |error| {
assert_matches!(error, P2PSyncClientError::EmptyStateDiffPart)
})
.await;
validate_state_diff_fails(1, vec![Some(StateDiffChunk::default())]).await;
}

// TODO(noamsp): Consider verifying that ParseDataError::BadPeerError(WrongStateDiffLength) was
// returned from parse_data_for_block. We currently dont have a way to check this.
#[tokio::test]
async fn state_diff_stopped_in_middle() {
validate_state_diff_fails(
Expand All @@ -209,29 +209,31 @@ async fn state_diff_stopped_in_middle() {
Some(StateDiffChunk::DeprecatedDeclaredClass(DeprecatedDeclaredClass::default())),
None,
],
|error| assert_matches!(error, P2PSyncClientError::WrongStateDiffLength { expected_length, possible_lengths } if expected_length == 2 && possible_lengths == vec![1]),
)
.await;
}

// TODO(noamsp): Consider verifying that ParseDataError::BadPeerError(WrongStateDiffLength) was
// returned from parse_data_for_block. We currently dont have a way to check this.
#[tokio::test]
async fn state_diff_not_split_correctly() {
validate_state_diff_fails(
2,
vec![
Some(StateDiffChunk::DeprecatedDeclaredClass(DeprecatedDeclaredClass::default())),
Some(StateDiffChunk::ContractDiff(ContractDiff{
Some(StateDiffChunk::ContractDiff(ContractDiff {
contract_address: ContractAddress::default(),
class_hash: Some(ClassHash::default()),
nonce: Some(Nonce::default()),
..Default::default()
}),)
})),
],
|error| assert_matches!(error, P2PSyncClientError::WrongStateDiffLength { expected_length, possible_lengths } if expected_length == 2 && possible_lengths == vec![1, 3]),
)
.await;
}

// TODO(noamsp): Consider verifying that ParseDataError::BadPeerError(ConflictingStateDiffParts)
// was returned from parse_data_for_block. We currently dont have a way to check this.
#[tokio::test]
async fn state_diff_conflicting() {
validate_state_diff_fails(
Expand All @@ -248,7 +250,6 @@ async fn state_diff_conflicting() {
..Default::default()
})),
],
|error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts),
)
.await;
validate_state_diff_fails(
Expand All @@ -265,7 +266,6 @@ async fn state_diff_conflicting() {
..Default::default()
})),
],
|error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts),
)
.await;
validate_state_diff_fails(
Expand All @@ -280,7 +280,6 @@ async fn state_diff_conflicting() {
compiled_class_hash: CompiledClassHash::default(),
})),
],
|error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts),
)
.await;
validate_state_diff_fails(
Expand All @@ -293,7 +292,6 @@ async fn state_diff_conflicting() {
class_hash: ClassHash::default(),
})),
],
|error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts),
)
.await;
validate_state_diff_fails(
Expand All @@ -310,15 +308,13 @@ async fn state_diff_conflicting() {
..Default::default()
})),
],
|error| assert_matches!(error, P2PSyncClientError::ConflictingStateDiffParts),
)
.await;
}

async fn validate_state_diff_fails(
state_diff_length_in_header: usize,
state_diff_chunks: Vec<Option<StateDiffChunk>>,
error_validator: impl Fn(P2PSyncClientError),
) {
let TestArgs {
p2p_sync,
Expand Down Expand Up @@ -386,14 +382,15 @@ async fn validate_state_diff_fails(
.await
.unwrap();
}
tokio::time::sleep(TIMEOUT_FOR_TEST).await;
panic!("P2P sync did not receive error");

// Asserts that a peer was reported due to a non-fatal error.
mock_state_diff_responses_manager.assert_reported(TIMEOUT_FOR_TEST).await;
};

tokio::select! {
sync_result = p2p_sync.run() => {
let sync_err = sync_result.unwrap_err();
error_validator(sync_err);
sync_result.unwrap();
panic!("P2P sync aborted with no failure.");
}
_ = parse_queries_future => {}
}
Expand Down
32 changes: 29 additions & 3 deletions crates/papyrus_p2p_sync/src/client/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use papyrus_protobuf::converters::ProtobufConversionError;
use papyrus_protobuf::sync::{BlockHashOrNumber, DataOrFin, Direction, Query};
use papyrus_storage::header::HeaderStorageReader;
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use starknet_api::block::BlockNumber;
use starknet_api::block::{BlockNumber, BlockSignature};
use tracing::{debug, info, warn};

use super::{P2PSyncClientError, STEP};
Expand Down Expand Up @@ -154,7 +154,33 @@ where
}

#[derive(thiserror::Error, Debug)]
pub(crate) enum BadPeerError {}
pub(crate) enum BadPeerError {
#[error(
"Blocks returned unordered from the network. Expected header with \
{expected_block_number}, got {actual_block_number}."
)]
HeadersUnordered { expected_block_number: BlockNumber, actual_block_number: BlockNumber },
#[error(
"Expected to receive {expected} transactions for {block_number} from the network. Got \
{actual} instead."
)]
NotEnoughTransactions { expected: usize, actual: usize, block_number: u64 },
#[error("Expected to receive one signature from the network. got {signatures:?} instead.")]
WrongSignaturesLength { signatures: Vec<BlockSignature> },
#[error(
"The header says that the block's state diff should be of length {expected_length}. Can \
only divide the state diff parts into the following lengths: {possible_lengths:?}."
)]
WrongStateDiffLength { expected_length: usize, possible_lengths: Vec<usize> },
#[error("Two state diff parts for the same state diff are conflicting.")]
ConflictingStateDiffParts,
#[error(
"Received an empty state diff part from the network (this is a potential DDoS vector)."
)]
EmptyStateDiffPart,
#[error(transparent)]
ProtobufConversionError(#[from] ProtobufConversionError),
}

#[derive(thiserror::Error, Debug)]
pub(crate) enum ParseDataError {
Expand All @@ -178,6 +204,6 @@ impl From<tokio::time::error::Elapsed> for ParseDataError {

impl From<ProtobufConversionError> for ParseDataError {
fn from(err: ProtobufConversionError) -> Self {
ParseDataError::Fatal(P2PSyncClientError::ProtobufConversionError(err))
ParseDataError::BadPeer(BadPeerError::ProtobufConversionError(err))
}
}
Loading
Loading