diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index fab7ed4d80..98a1f5c443 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -189,7 +189,7 @@ async fn run_sync( } async fn build_sync_client( - maybe_sync_client_channels: Option, + maybe_network_manager: Option<&mut NetworkManager>, storage_reader: StorageReader, storage_writer: StorageWriter, config: &NodeConfig, @@ -214,12 +214,27 @@ async fn build_sync_client( )) } (None, Some(p2p_sync_client_config)) => { + let network_manager = maybe_network_manager + .expect("If p2p sync is enabled, network needs to be enabled too"); + let header_client_sender = network_manager + .register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE); + let state_diff_client_sender = network_manager + .register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE); + let transaction_client_sender = network_manager + .register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE); + let class_client_sender = + network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE); + let p2p_sync_client_channels = P2PSyncClientChannels::new( + header_client_sender, + state_diff_client_sender, + transaction_client_sender, + class_client_sender, + ); let p2p_sync = P2PSyncClient::new( p2p_sync_client_config, storage_reader, storage_writer, - maybe_sync_client_channels - .expect("If p2p sync is enabled, network needs to be enabled too"), + p2p_sync_client_channels, ); tokio::spawn(async move { Ok(p2p_sync.run().await?) }) } @@ -227,12 +242,33 @@ async fn build_sync_client( } fn build_p2p_sync_server( - maybe_sync_server_channels: Option, + network_manager: Option<&mut NetworkManager>, storage_reader: StorageReader, ) -> JoinHandle> { - let Some(p2p_sync_server_channels) = maybe_sync_server_channels else { + let Some(network_manager) = network_manager else { + info!("P2P Sync is disabled."); return tokio::spawn(pending()); }; + + let header_server_receiver = network_manager + .register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE); + let state_diff_server_receiver = + network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE); + let transaction_server_receiver = + network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE); + let class_server_receiver = + network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE); + let event_server_receiver = + network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE); + + let p2p_sync_server_channels = P2PSyncServerChannels::new( + header_server_receiver, + state_diff_server_receiver, + transaction_server_receiver, + class_server_receiver, + event_server_receiver, + ); + let p2p_sync_server = P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels); tokio::spawn(async move { Ok(p2p_sync_server.run().await) }) } @@ -253,24 +289,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { })); let pending_classes = Arc::new(RwLock::new(PendingClasses::default())); - // P2P network. - let ( - mut maybe_network_manager, - maybe_sync_client_channels, - maybe_sync_server_channels, - local_peer_id, - ) = register_to_network(config.network.clone())?; + let (mut maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?; + let consensus_handle = run_consensus( config.consensus.as_ref(), storage_reader.clone(), maybe_network_manager.as_mut(), )?; - let network_handle = tokio::spawn(async move { - match maybe_network_manager { - Some(manager) => manager.run().boxed().await, - None => pending().boxed().await, - } - }); let storage_metrics_handle = spawn_storage_metrics_collector( config.monitoring_gateway.collect_metrics, @@ -301,11 +326,11 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { // P2P Sync Server task. let p2p_sync_server_handle = - build_p2p_sync_server(maybe_sync_server_channels, storage_reader.clone()); + build_p2p_sync_server(maybe_network_manager.as_mut(), storage_reader.clone()); // Sync task. let sync_client_handle = build_sync_client( - maybe_sync_client_channels, + maybe_network_manager.as_mut(), storage_reader, storage_writer, &config, @@ -315,6 +340,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { ) .await; + // Created last since it consumes the network manager. + let network_handle = tokio::spawn(async move { + match maybe_network_manager { + Some(manager) => manager.run().boxed().await, + None => pending().boxed().await, + } + }); tokio::select! { res = storage_metrics_handle => { error!("collecting storage metrics stopped."); @@ -349,58 +381,19 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { return Ok(()); } -type NetworkRunReturn = - (Option, Option, Option, String); - -fn register_to_network(network_config: Option) -> anyhow::Result { +fn build_network_manager( + network_config: Option, +) -> anyhow::Result<(Option, String)> { let Some(network_config) = network_config else { - return Ok((None, None, None, "".to_string())); + return Ok((None, "".to_string())); }; - let mut network_manager = network_manager::NetworkManager::new( + let network_manager = network_manager::NetworkManager::new( network_config.clone(), Some(VERSION_FULL.to_string()), ); let local_peer_id = network_manager.get_local_peer_id(); - let header_client_sender = network_manager - .register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE); - let state_diff_client_sender = - network_manager.register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE); - let transaction_client_sender = - network_manager.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE); - let class_client_sender = - network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE); - let p2p_sync_client_channels = P2PSyncClientChannels::new( - header_client_sender, - state_diff_client_sender, - transaction_client_sender, - class_client_sender, - ); - - let header_server_receiver = network_manager - .register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE); - let state_diff_server_receiver = - network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE); - let transaction_server_receiver = - network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE); - let class_server_receiver = - network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE); - let event_server_receiver = - network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE); - let p2p_sync_server_channels = P2PSyncServerChannels::new( - header_server_receiver, - state_diff_server_receiver, - transaction_server_receiver, - class_server_receiver, - event_server_receiver, - ); - - Ok(( - Some(network_manager), - Some(p2p_sync_client_channels), - Some(p2p_sync_server_channels), - local_peer_id, - )) + Ok((Some(network_manager), local_peer_id)) } // TODO(yair): add dynamic level filtering.