diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index 15098c8a42..3dbdc9a24f 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -18,7 +18,7 @@ use std::collections::BTreeMap; use std::time::Duration; use class::ClassStreamBuilder; -use futures::channel::mpsc::SendError; +use futures::channel::mpsc::{Receiver, SendError, Sender}; use futures::stream::BoxStream; use futures::Stream; use header::HeaderStreamBuilder; @@ -38,8 +38,9 @@ use papyrus_protobuf::sync::{ }; use papyrus_storage::{StorageError, StorageReader, StorageWriter}; use serde::{Deserialize, Serialize}; -use starknet_api::block::BlockNumber; +use starknet_api::block::{BlockBody, BlockNumber}; use starknet_api::core::ClassHash; +use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses, ThinStateDiff}; use starknet_api::transaction::FullTransaction; use starknet_state_sync_types::state_sync_types::SyncBlock; use state_diff::StateDiffStreamBuilder; @@ -172,6 +173,7 @@ impl P2PSyncClientChannels { self, storage_reader: StorageReader, config: P2PSyncClientConfig, + _internal_blocks_receivers: InternalBlocksReceivers, ) -> impl Stream + Send + 'static { let header_stream = HeaderStreamBuilder::create_stream( self.header_sender, @@ -232,12 +234,73 @@ impl P2PSyncClient { #[instrument(skip(self), level = "debug", err)] pub async fn run(mut self) -> Result<(), P2PSyncClientError> { info!("Starting P2P sync client"); - let mut data_stream = - self.p2p_sync_channels.create_stream(self.storage_reader.clone(), self.config); + + let internal_blocks_channels = InternalBlocksChannels::new(); + self.create_internal_blocks_sender_task(internal_blocks_channels.senders); + let mut data_stream = self.p2p_sync_channels.create_stream( + self.storage_reader.clone(), + self.config, + internal_blocks_channels.receivers, + ); loop { let data = data_stream.next().await.expect("Sync data stream should never end")?; data.write_to_storage(&mut self.storage_writer)?; } } + + fn create_internal_blocks_sender_task( + &self, + #[allow(unused_variables)] internal_blocks_senders: InternalBlocksSenders, + ) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move {}) + } +} + +#[allow(dead_code)] +pub(crate) struct InternalBlocksReceivers { + header_receiver: Receiver<(BlockNumber, SignedBlockHeader)>, + state_diff_receiver: Receiver<(BlockNumber, (ThinStateDiff, BlockNumber))>, + transaction_receiver: Receiver<(BlockNumber, (BlockBody, BlockNumber))>, + #[allow(dead_code)] + class_receiver: + Receiver<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>, +} + +#[allow(dead_code)] +struct InternalBlocksSenders { + header_sender: Sender<(BlockNumber, SignedBlockHeader)>, + state_diff_sender: Sender<(BlockNumber, (ThinStateDiff, BlockNumber))>, + transaction_sender: Sender<(BlockNumber, (BlockBody, BlockNumber))>, + #[allow(dead_code)] + class_sender: Sender<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>, +} + +struct InternalBlocksChannels { + receivers: InternalBlocksReceivers, + senders: InternalBlocksSenders, +} + +impl InternalBlocksChannels { + pub fn new() -> Self { + let (header_sender, header_receiver) = futures::channel::mpsc::channel(100); + let (state_diff_sender, state_diff_receiver) = futures::channel::mpsc::channel(100); + let (transaction_sender, transaction_receiver) = futures::channel::mpsc::channel(100); + let (class_sender, class_receiver) = futures::channel::mpsc::channel(100); + + Self { + receivers: InternalBlocksReceivers { + header_receiver, + state_diff_receiver, + transaction_receiver, + class_receiver, + }, + senders: InternalBlocksSenders { + header_sender, + state_diff_sender, + transaction_sender, + class_sender, + }, + } + } }