diff --git a/crates/papyrus_p2p_sync/Cargo.toml b/crates/papyrus_p2p_sync/Cargo.toml index f39a8bc76b..0121d5cf7e 100644 --- a/crates/papyrus_p2p_sync/Cargo.toml +++ b/crates/papyrus_p2p_sync/Cargo.toml @@ -19,7 +19,9 @@ papyrus_network.workspace = true papyrus_proc_macros.workspace = true papyrus_protobuf.workspace = true papyrus_storage.workspace = true +papyrus_test_utils.workspace = true rand.workspace = true +rand_chacha.workspace = true serde.workspace = true starknet_api.workspace = true starknet_state_sync_types.workspace = true @@ -35,8 +37,6 @@ lazy_static.workspace = true papyrus_network = { workspace = true, features = ["testing"] } papyrus_protobuf = { workspace = true, features = ["testing"] } papyrus_storage = { workspace = true, features = ["testing"] } -papyrus_test_utils.workspace = true -rand_chacha.workspace = true static_assertions.workspace = true tokio = { workspace = true, features = ["test-util"] } diff --git a/crates/papyrus_p2p_sync/src/client/class.rs b/crates/papyrus_p2p_sync/src/client/class.rs index 9cb5dfaa39..cf93e49487 100644 --- a/crates/papyrus_p2p_sync/src/client/class.rs +++ b/crates/papyrus_p2p_sync/src/client/class.rs @@ -11,6 +11,7 @@ use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::BlockNumber; use starknet_api::core::ClassHash; use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses}; +use starknet_state_sync_types::state_sync_types::SyncBlock; use super::stream_builder::{ BadPeerError, @@ -129,4 +130,12 @@ impl DataStreamBuilder<(ApiContractClass, ClassHash)> for ClassStreamBuilder { fn get_start_block_number(storage_reader: &StorageReader) -> Result { storage_reader.begin_ro_txn()?.get_class_marker() } + + // TODO(Eitan): Implement this function once we have a class manager component. + fn convert_sync_block_to_block_data( + _block_number: BlockNumber, + _sync_block: SyncBlock, + ) -> Option<(DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber)> { + None + } } diff --git a/crates/papyrus_p2p_sync/src/client/header.rs b/crates/papyrus_p2p_sync/src/client/header.rs index 947f8ede25..37f5d101de 100644 --- a/crates/papyrus_p2p_sync/src/client/header.rs +++ b/crates/papyrus_p2p_sync/src/client/header.rs @@ -7,7 +7,15 @@ use papyrus_network::network_manager::ClientResponsesManager; use papyrus_protobuf::sync::{DataOrFin, SignedBlockHeader}; use papyrus_storage::header::{HeaderStorageReader, HeaderStorageWriter}; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; -use starknet_api::block::BlockNumber; +use starknet_api::block::{ + BlockHash, + BlockHeader, + BlockHeaderWithoutHash, + BlockNumber, + BlockSignature, +}; +use starknet_api::hash::StarkHash; +use starknet_state_sync_types::state_sync_types::SyncBlock; use tracing::debug; use super::stream_builder::{ @@ -112,4 +120,25 @@ impl DataStreamBuilder for HeaderStreamBuilder { fn get_start_block_number(storage_reader: &StorageReader) -> Result { storage_reader.begin_ro_txn()?.get_header_marker() } + + // TODO(Eitan): Use real header once SyncBlock contains data required by full nodes + // TODO(Eitan): Fill this with real header once SyncBlock has it. + fn convert_sync_block_to_block_data( + block_number: BlockNumber, + sync_block: SyncBlock, + ) -> Option { + Some(SignedBlockHeader { + block_header: BlockHeader { + block_hash: BlockHash(StarkHash::from(block_number.0)), + block_header_without_hash: BlockHeaderWithoutHash { + block_number, + ..Default::default() + }, + state_diff_length: Some(sync_block.state_diff.len()), + n_transactions: sync_block.transaction_hashes.len(), + ..Default::default() + }, + signatures: vec![BlockSignature::default()], + }) + } } diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index 3dbdc9a24f..51eceb5d50 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -20,7 +20,7 @@ use std::time::Duration; use class::ClassStreamBuilder; use futures::channel::mpsc::{Receiver, SendError, Sender}; use futures::stream::BoxStream; -use futures::Stream; +use futures::{SinkExt as _, Stream}; use header::HeaderStreamBuilder; use papyrus_common::pending_classes::ApiContractClass; use papyrus_config::converters::deserialize_milliseconds_to_duration; @@ -38,9 +38,8 @@ use papyrus_protobuf::sync::{ }; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use serde::{Deserialize, Serialize}; -use starknet_api::block::{BlockBody, BlockNumber}; +use starknet_api::block::BlockNumber; use starknet_api::core::ClassHash; -use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses, ThinStateDiff}; use starknet_api::transaction::FullTransaction; use starknet_state_sync_types::state_sync_types::SyncBlock; use state_diff::StateDiffStreamBuilder; @@ -173,12 +172,12 @@ impl P2PSyncClientChannels { self, storage_reader: StorageReader, config: P2PSyncClientConfig, - _internal_blocks_receivers: InternalBlocksReceivers, + internal_blocks_receivers: InternalBlocksReceivers, ) -> impl Stream + Send + 'static { let header_stream = HeaderStreamBuilder::create_stream( self.header_sender, storage_reader.clone(), - None, + Some(internal_blocks_receivers.header_receiver), config.wait_period_for_new_data, config.num_headers_per_query, ); @@ -186,7 +185,7 @@ impl P2PSyncClientChannels { let state_diff_stream = StateDiffStreamBuilder::create_stream( self.state_diff_sender, storage_reader.clone(), - None, + Some(internal_blocks_receivers.state_diff_receiver), config.wait_period_for_new_data, config.num_block_state_diffs_per_query, ); @@ -194,7 +193,7 @@ impl P2PSyncClientChannels { let transaction_stream = TransactionStreamFactory::create_stream( self.transaction_sender, storage_reader.clone(), - None, + Some(internal_blocks_receivers.transaction_receiver), config.wait_period_for_new_data, config.num_block_transactions_per_query, ); @@ -216,7 +215,6 @@ pub struct P2PSyncClient { storage_reader: StorageReader, storage_writer: StorageWriter, p2p_sync_channels: P2PSyncClientChannels, - #[allow(dead_code)] internal_blocks_receiver: BoxStream<'static, (BlockNumber, SyncBlock)>, } @@ -232,50 +230,77 @@ impl P2PSyncClient { } #[instrument(skip(self), level = "debug", err)] - pub async fn run(mut self) -> Result<(), P2PSyncClientError> { + pub async fn run(self) -> Result<(), P2PSyncClientError> { info!("Starting P2P sync client"); - let internal_blocks_channels = InternalBlocksChannels::new(); - self.create_internal_blocks_sender_task(internal_blocks_channels.senders); - let mut data_stream = self.p2p_sync_channels.create_stream( - self.storage_reader.clone(), - self.config, - internal_blocks_channels.receivers, - ); + let InternalBlocksChannels { + receivers: internal_blocks_receivers, + senders: mut internal_blocks_senders, + } = InternalBlocksChannels::new(); + let P2PSyncClient { + config, + storage_reader, + mut storage_writer, + p2p_sync_channels, + mut internal_blocks_receiver, + } = self; + let mut data_stream = + p2p_sync_channels.create_stream(storage_reader, config, internal_blocks_receivers); loop { + tokio::select! { + maybe_internal_block = internal_blocks_receiver.next() => { + let (block_number, sync_block) = maybe_internal_block.expect("Internal blocks stream should never end"); + internal_blocks_senders.send(block_number, sync_block).await?; + } + data = data_stream.next() => { + let data = data.expect("Sync data stream should never end")?; + data.write_to_storage(&mut storage_writer)?; + } + } let data = data_stream.next().await.expect("Sync data stream should never end")?; - data.write_to_storage(&mut self.storage_writer)?; + data.write_to_storage(&mut storage_writer)?; } } - - fn create_internal_blocks_sender_task( - &self, - #[allow(unused_variables)] internal_blocks_senders: InternalBlocksSenders, - ) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move {}) - } } -#[allow(dead_code)] pub(crate) struct InternalBlocksReceivers { - header_receiver: Receiver<(BlockNumber, SignedBlockHeader)>, - state_diff_receiver: Receiver<(BlockNumber, (ThinStateDiff, BlockNumber))>, - transaction_receiver: Receiver<(BlockNumber, (BlockBody, BlockNumber))>, + header_receiver: Receiver<(BlockNumber, SyncBlock)>, + state_diff_receiver: Receiver<(BlockNumber, SyncBlock)>, + transaction_receiver: Receiver<(BlockNumber, SyncBlock)>, #[allow(dead_code)] - class_receiver: - Receiver<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>, + class_receiver: Receiver<(BlockNumber, SyncBlock)>, } -#[allow(dead_code)] -struct InternalBlocksSenders { - header_sender: Sender<(BlockNumber, SignedBlockHeader)>, - state_diff_sender: Sender<(BlockNumber, (ThinStateDiff, BlockNumber))>, - transaction_sender: Sender<(BlockNumber, (BlockBody, BlockNumber))>, +pub struct InternalBlocksSenders { + header_sender: Sender<(BlockNumber, SyncBlock)>, + state_diff_sender: Sender<(BlockNumber, SyncBlock)>, + transaction_sender: Sender<(BlockNumber, SyncBlock)>, #[allow(dead_code)] - class_sender: Sender<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>, + class_sender: Sender<(BlockNumber, SyncBlock)>, +} +impl InternalBlocksSenders { + pub async fn send( + &mut self, + block_number: BlockNumber, + sync_block: SyncBlock, + ) -> Result<(), SendError> { + let header_send = self.header_sender.send((block_number, sync_block.clone())); + let state_diff_send = self.state_diff_sender.send((block_number, sync_block.clone())); + let transaction_send = self.transaction_sender.send((block_number, sync_block.clone())); + let class_send = self.class_sender.send((block_number, sync_block)); + let res = + futures::future::join4(header_send, state_diff_send, transaction_send, class_send) + .await; + match res { + (Ok(()), Ok(()), Ok(()), Ok(())) => Ok(()), + (Err(e), _, _, _) => Err(e), + (_, Err(e), _, _) => Err(e), + (_, _, Err(e), _) => Err(e), + (_, _, _, Err(e)) => Err(e), + } + } } - struct InternalBlocksChannels { receivers: InternalBlocksReceivers, senders: InternalBlocksSenders, diff --git a/crates/papyrus_p2p_sync/src/client/state_diff.rs b/crates/papyrus_p2p_sync/src/client/state_diff.rs index 4eeebd36c4..362daca121 100644 --- a/crates/papyrus_p2p_sync/src/client/state_diff.rs +++ b/crates/papyrus_p2p_sync/src/client/state_diff.rs @@ -12,6 +12,7 @@ use papyrus_storage::state::{StateStorageReader, StateStorageWriter}; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::BlockNumber; use starknet_api::state::ThinStateDiff; +use starknet_state_sync_types::state_sync_types::SyncBlock; use super::stream_builder::BadPeerError; use crate::client::stream_builder::{ @@ -110,6 +111,13 @@ impl DataStreamBuilder for StateDiffStreamBuilder { fn get_start_block_number(storage_reader: &StorageReader) -> Result { storage_reader.begin_ro_txn()?.get_state_marker() } + + fn convert_sync_block_to_block_data( + block_number: BlockNumber, + sync_block: SyncBlock, + ) -> Option<(ThinStateDiff, BlockNumber)> { + Some((sync_block.state_diff, block_number)) + } } // For performance reasons, this function does not check if a deprecated class was declared twice. diff --git a/crates/papyrus_p2p_sync/src/client/stream_builder.rs b/crates/papyrus_p2p_sync/src/client/stream_builder.rs index 66f9c98879..0869fe5b92 100644 --- a/crates/papyrus_p2p_sync/src/client/stream_builder.rs +++ b/crates/papyrus_p2p_sync/src/client/stream_builder.rs @@ -15,6 +15,7 @@ use papyrus_storage::state::StateStorageReader; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use starknet_api::block::{BlockNumber, BlockSignature}; use starknet_api::core::ClassHash; +use starknet_state_sync_types::state_sync_types::SyncBlock; use tracing::{debug, info, warn}; use super::{P2PSyncClientError, STEP}; @@ -54,21 +55,35 @@ where fn get_start_block_number(storage_reader: &StorageReader) -> Result; + // TODO(Eitan): Remove option on return once we have a class manager component. + /// Returning None happens when internal blocks are disabled for this stream. + fn convert_sync_block_to_block_data( + block_number: BlockNumber, + sync_block: SyncBlock, + ) -> Option; + fn get_internal_block_at( internal_blocks_received: &mut HashMap, - internal_block_receiver: &mut Option>, + internal_block_receiver: &mut Option>, current_block_number: BlockNumber, ) -> Option { if let Some(block) = internal_blocks_received.remove(¤t_block_number) { return Some(block); } let Some(internal_block_receiver) = internal_block_receiver else { return None }; - while let Some((block_number, block_data)) = internal_block_receiver + while let Some((block_number, sync_block)) = internal_block_receiver .next() .now_or_never() .map(|now_or_never_res| now_or_never_res.expect("Internal block receiver closed")) { if block_number >= current_block_number { + let block_data = + match Self::convert_sync_block_to_block_data(block_number, sync_block) { + Some(block_data) => block_data, + // If None is received then we don't use internal blocks for this stream + // TODO(Eitan): Remove this once we have a class manager component. + None => return None, + }; if block_number == current_block_number { return Some(block_data); } @@ -81,7 +96,7 @@ where fn create_stream( mut sqmr_sender: SqmrClientSender>, storage_reader: StorageReader, - mut internal_block_receiver: Option>, + mut internal_block_receiver: Option>, wait_period_for_new_data: Duration, num_blocks_per_query: u64, ) -> BoxStream<'static, DataStreamResult> diff --git a/crates/papyrus_p2p_sync/src/client/transaction.rs b/crates/papyrus_p2p_sync/src/client/transaction.rs index e3c11a8df8..deea071ae3 100644 --- a/crates/papyrus_p2p_sync/src/client/transaction.rs +++ b/crates/papyrus_p2p_sync/src/client/transaction.rs @@ -5,8 +5,10 @@ use papyrus_protobuf::sync::DataOrFin; use papyrus_storage::body::{BodyStorageReader, BodyStorageWriter}; use papyrus_storage::header::HeaderStorageReader; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; +use papyrus_test_utils::{get_rng, GetTestInstance}; use starknet_api::block::{BlockBody, BlockNumber}; -use starknet_api::transaction::FullTransaction; +use starknet_api::transaction::{FullTransaction, Transaction, TransactionOutput}; +use starknet_state_sync_types::state_sync_types::SyncBlock; use super::stream_builder::{ BadPeerError, @@ -84,4 +86,25 @@ impl DataStreamBuilder for TransactionStreamFactory { fn get_start_block_number(storage_reader: &StorageReader) -> Result { storage_reader.begin_ro_txn()?.get_body_marker() } + + // TODO(Eitan): Use real transactions once SyncBlock contains data required by full nodes + fn convert_sync_block_to_block_data( + block_number: BlockNumber, + sync_block: SyncBlock, + ) -> Option<(BlockBody, BlockNumber)> { + let num_transactions = sync_block.transaction_hashes.len(); + let mut rng = get_rng(); + let block_body = BlockBody { + transaction_hashes: sync_block.transaction_hashes, + transaction_outputs: std::iter::repeat_with(|| { + TransactionOutput::get_test_instance(&mut rng) + }) + .take(num_transactions) + .collect::>(), + transactions: std::iter::repeat_with(|| Transaction::get_test_instance(&mut rng)) + .take(num_transactions) + .collect::>(), + }; + Some((block_body, block_number)) + } }