diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index bf2be682ec..3dc3f8162a 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -59,6 +59,43 @@ const GENESIS_HASH: &str = "0x0"; // Duration between updates to the storage metrics (those in the collect_storage_metrics function). const STORAGE_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10); +pub struct PapyrusResources { + pub storage_reader: StorageReader, + pub storage_writer: StorageWriter, + pub maybe_network_manager: Option, + pub local_peer_id: String, + pub shared_highest_block: Arc>>, + pub pending_data: Arc>, + pub pending_classes: Arc>, +} + +impl PapyrusResources { + pub fn new(config: &NodeConfig) -> anyhow::Result { + let (storage_reader, storage_writer) = open_storage(config.storage.clone())?; + let (maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?; + let shared_highest_block = Arc::new(RwLock::new(None)); + let pending_data = Arc::new(RwLock::new(PendingData { + // The pending data might change later to DeprecatedPendingBlock, depending on the + // response from the feeder gateway. + block: PendingBlockOrDeprecated::Current(PendingBlock { + parent_block_hash: BlockHash(felt!(GENESIS_HASH)), + ..Default::default() + }), + ..Default::default() + })); + let pending_classes = Arc::new(RwLock::new(PendingClasses::default())); + Ok(Self { + storage_reader, + storage_writer, + maybe_network_manager, + local_peer_id, + shared_highest_block, + pending_data, + pending_classes, + }) + } +} + #[cfg(feature = "rpc")] async fn create_rpc_server_future( config: &NodeConfig, @@ -273,33 +310,16 @@ fn spawn_p2p_sync_server( tokio::spawn(async move { Ok(p2p_sync_server.run().await) }) } -async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { - let (storage_reader, storage_writer) = open_storage(config.storage.clone())?; - - // The sync is the only writer of the syncing state. - let shared_highest_block = Arc::new(RwLock::new(None)); - let pending_data = Arc::new(RwLock::new(PendingData { - // The pending data might change later to DeprecatedPendingBlock, depending on the response - // from the feeder gateway. - block: PendingBlockOrDeprecated::Current(PendingBlock { - parent_block_hash: BlockHash(felt!(GENESIS_HASH)), - ..Default::default() - }), - ..Default::default() - })); - let pending_classes = Arc::new(RwLock::new(PendingClasses::default())); - - let (mut maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?; - +async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> anyhow::Result<()> { let consensus_handle = run_consensus( config.consensus.as_ref(), - storage_reader.clone(), - maybe_network_manager.as_mut(), + resources.storage_reader.clone(), + resources.maybe_network_manager.as_mut(), )?; let storage_metrics_handle = spawn_storage_metrics_collector( config.monitoring_gateway.collect_metrics, - storage_reader.clone(), + resources.storage_reader.clone(), STORAGE_METRICS_UPDATE_INTERVAL, ); @@ -308,41 +328,43 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { config.monitoring_gateway.clone(), get_config_presentation(&config, true)?, get_config_presentation(&config, false)?, - storage_reader.clone(), + resources.storage_reader.clone(), VERSION_FULL, - local_peer_id, + resources.local_peer_id, )?; let monitoring_server_handle = monitoring_server.spawn_server().await; // JSON-RPC server. let server_handle_future = create_rpc_server_future( &config, - shared_highest_block.clone(), - pending_data.clone(), - pending_classes.clone(), - storage_reader.clone(), + resources.shared_highest_block.clone(), + resources.pending_data.clone(), + resources.pending_classes.clone(), + resources.storage_reader.clone(), ) .await?; // P2P Sync Server task. - let p2p_sync_server_handle = - spawn_p2p_sync_server(maybe_network_manager.as_mut(), storage_reader.clone()); + let p2p_sync_server_handle = spawn_p2p_sync_server( + resources.maybe_network_manager.as_mut(), + resources.storage_reader.clone(), + ); // Sync task. let sync_client_handle = spawn_sync_client( - maybe_network_manager.as_mut(), - storage_reader, - storage_writer, + resources.maybe_network_manager.as_mut(), + resources.storage_reader, + resources.storage_writer, &config, - shared_highest_block, - pending_data, - pending_classes, + resources.shared_highest_block, + resources.pending_data, + resources.pending_classes, ) .await; // Created last since it consumes the network manager. let network_handle = tokio::spawn(async move { - match maybe_network_manager { + match resources.maybe_network_manager { Some(manager) => manager.run().boxed().await, None => pending().boxed().await, } @@ -451,5 +473,7 @@ async fn main() -> anyhow::Result<()> { .expect("This should be the first and only time we set this value."); info!("Booting up."); - run_threads(config).await + + let resources = PapyrusResources::new(&config)?; + run_threads(config, resources).await } diff --git a/crates/papyrus_node/src/main_test.rs b/crates/papyrus_node/src/main_test.rs index 8d7f4ce97d..a615835f3d 100644 --- a/crates/papyrus_node/src/main_test.rs +++ b/crates/papyrus_node/src/main_test.rs @@ -6,7 +6,7 @@ use papyrus_storage::{open_storage, StorageConfig}; use papyrus_test_utils::prometheus_is_contained; use tempfile::TempDir; -use crate::{run_threads, spawn_storage_metrics_collector}; +use crate::{run_threads, spawn_storage_metrics_collector, PapyrusResources}; // The mission of this test is to ensure that if an error is returned from one of the spawned tasks, // the node will stop, and this error will be returned. This is done by checking the case of an @@ -19,7 +19,8 @@ async fn run_threads_stop() { // Error when not supplying legal central URL. config.central.url = "_not_legal_url".to_string(); - let error = run_threads(config).await.expect_err("Should be an error."); + let resources = PapyrusResources::new(&config).unwrap(); + let error = run_threads(config, resources).await.expect_err("Should be an error."); assert_eq!("relative URL without a base", error.to_string()); }