Skip to content

Commit

Permalink
chore(papyrus_p2p_sync): create internal blocks senders and receivers…
Browse files Browse the repository at this point in the history
… structs
  • Loading branch information
eitanm-starkware committed Dec 24, 2024
1 parent 440c12c commit c00e9c0
Showing 1 changed file with 67 additions and 4 deletions.
71 changes: 67 additions & 4 deletions crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::collections::BTreeMap;
use std::time::Duration;

use class::ClassStreamBuilder;
use futures::channel::mpsc::SendError;
use futures::channel::mpsc::{Receiver, SendError, Sender};
use futures::stream::BoxStream;
use futures::Stream;
use header::HeaderStreamBuilder;
Expand All @@ -38,8 +38,9 @@ use papyrus_protobuf::sync::{
};
use papyrus_storage::{StorageError, StorageReader, StorageWriter};
use serde::{Deserialize, Serialize};
use starknet_api::block::BlockNumber;
use starknet_api::block::{BlockBody, BlockNumber};
use starknet_api::core::ClassHash;
use starknet_api::state::{DeclaredClasses, DeprecatedDeclaredClasses, ThinStateDiff};
use starknet_api::transaction::FullTransaction;
use starknet_state_sync_types::state_sync_types::SyncBlock;
use state_diff::StateDiffStreamBuilder;
Expand Down Expand Up @@ -172,6 +173,7 @@ impl P2PSyncClientChannels {
self,
storage_reader: StorageReader,
config: P2PSyncClientConfig,
_internal_blocks_receivers: InternalBlocksReceivers,
) -> impl Stream<Item = DataStreamResult> + Send + 'static {
let header_stream = HeaderStreamBuilder::create_stream(
self.header_sender,
Expand Down Expand Up @@ -232,12 +234,73 @@ impl P2PSyncClient {
#[instrument(skip(self), level = "debug", err)]
pub async fn run(mut self) -> Result<(), P2PSyncClientError> {
info!("Starting P2P sync client");
let mut data_stream =
self.p2p_sync_channels.create_stream(self.storage_reader.clone(), self.config);

let internal_blocks_channels = InternalBlocksChannels::new();
self.create_internal_blocks_sender_task(internal_blocks_channels.senders);
let mut data_stream = self.p2p_sync_channels.create_stream(
self.storage_reader.clone(),
self.config,
internal_blocks_channels.receivers,
);

loop {
let data = data_stream.next().await.expect("Sync data stream should never end")?;
data.write_to_storage(&mut self.storage_writer)?;
}
}

fn create_internal_blocks_sender_task(
&self,
#[allow(unused_variables)] internal_blocks_senders: InternalBlocksSenders,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {})
}
}

#[allow(dead_code)]
pub(crate) struct InternalBlocksReceivers {
header_receiver: Receiver<(BlockNumber, SignedBlockHeader)>,
state_diff_receiver: Receiver<(BlockNumber, (ThinStateDiff, BlockNumber))>,
transaction_receiver: Receiver<(BlockNumber, (BlockBody, BlockNumber))>,
#[allow(dead_code)]
class_receiver:
Receiver<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>,
}

#[allow(dead_code)]
struct InternalBlocksSenders {
header_sender: Sender<(BlockNumber, SignedBlockHeader)>,
state_diff_sender: Sender<(BlockNumber, (ThinStateDiff, BlockNumber))>,
transaction_sender: Sender<(BlockNumber, (BlockBody, BlockNumber))>,
#[allow(dead_code)]
class_sender: Sender<(BlockNumber, (DeclaredClasses, DeprecatedDeclaredClasses, BlockNumber))>,
}

struct InternalBlocksChannels {
receivers: InternalBlocksReceivers,
senders: InternalBlocksSenders,
}

impl InternalBlocksChannels {
pub fn new() -> Self {
let (header_sender, header_receiver) = futures::channel::mpsc::channel(100);
let (state_diff_sender, state_diff_receiver) = futures::channel::mpsc::channel(100);
let (transaction_sender, transaction_receiver) = futures::channel::mpsc::channel(100);
let (class_sender, class_receiver) = futures::channel::mpsc::channel(100);

Self {
receivers: InternalBlocksReceivers {
header_receiver,
state_diff_receiver,
transaction_receiver,
class_receiver,
},
senders: InternalBlocksSenders {
header_sender,
state_diff_sender,
transaction_sender,
class_sender,
},
}
}
}

0 comments on commit c00e9c0

Please sign in to comment.