From b6c3ec8886853bf259ccd47a3a2e85075b3865d8 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Wed, 18 Sep 2024 20:02:37 +0300 Subject: [PATCH] refactor(node): introduce PapyrusTaskHandles to hold task overrides --- crates/papyrus_node/src/main.rs | 134 ++++++++++++++++++--------- crates/papyrus_node/src/main_test.rs | 5 +- 2 files changed, 92 insertions(+), 47 deletions(-) diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index bbfe673ea4..75b1011f9c 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -8,7 +8,6 @@ use std::sync::Arc; use std::time::Duration; use futures::stream::StreamExt; -use futures::FutureExt; use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig; use papyrus_common::metrics::COLLECT_PROFILING_METRICS; use papyrus_common::pending_classes::PendingClasses; @@ -68,6 +67,21 @@ pub struct PapyrusResources { pub pending_classes: Arc>, } +/// Struct which allows configuring how the node will run. +/// - If left `None`, the task will be spawn with its default (prod) configuration. +/// - If set to Some, that variant of the task will be run, and the default ignored. +/// - If you want to disable a task set it to `Some(tokio::spawn(pending()))`. +#[derive(Default)] +pub struct PapyrusTaskHandles { + pub storage_metrics_handle: Option>>, + pub rpc_server_handle: Option>>, + pub sync_client_handle: Option>>, + pub monitoring_server_handle: Option>>, + pub p2p_sync_server_handle: Option>>, + pub consensus_handle: Option>>, + pub network_handle: Option>>, +} + impl PapyrusResources { pub fn new(config: &NodeConfig) -> anyhow::Result { let (storage_reader, storage_writer) = open_storage(config.storage.clone())?; @@ -338,61 +352,90 @@ fn spawn_p2p_sync_server( }) } -async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> anyhow::Result<()> { - let consensus_handle = spawn_consensus( - config.consensus.as_ref(), - resources.storage_reader.clone(), - resources.maybe_network_manager.as_mut(), - )?; - - let storage_metrics_handle = spawn_storage_metrics_collector( - config.monitoring_gateway.collect_metrics, - resources.storage_reader.clone(), - STORAGE_METRICS_UPDATE_INTERVAL, - ); +async fn run_threads( + config: NodeConfig, + mut resources: PapyrusResources, + tasks: PapyrusTaskHandles, +) -> anyhow::Result<()> { + let consensus_handle = if let Some(handle) = tasks.consensus_handle { + handle + } else { + spawn_consensus( + config.consensus.as_ref(), + resources.storage_reader.clone(), + resources.maybe_network_manager.as_mut(), + )? + }; + let storage_metrics_handle = if let Some(handle) = tasks.storage_metrics_handle { + handle + } else { + spawn_storage_metrics_collector( + config.monitoring_gateway.collect_metrics, + resources.storage_reader.clone(), + STORAGE_METRICS_UPDATE_INTERVAL, + ) + }; // Monitoring server. - let monitoring_server_handle = spawn_monitoring_server( - resources.storage_reader.clone(), - resources.local_peer_id.clone(), - &config, - )?; + let monitoring_server_handle = if let Some(handle) = tasks.monitoring_server_handle { + handle + } else { + spawn_monitoring_server( + resources.storage_reader.clone(), + resources.local_peer_id.clone(), + &config, + )? + }; // JSON-RPC server. - let rpc_server_handle = spawn_rpc_server( - &config, - resources.shared_highest_block.clone(), - resources.pending_data.clone(), - resources.pending_classes.clone(), - resources.storage_reader.clone(), - ) - .await?; + let rpc_server_handle = if let Some(handle) = tasks.rpc_server_handle { + handle + } else { + spawn_rpc_server( + &config, + resources.shared_highest_block.clone(), + resources.pending_data.clone(), + resources.pending_classes.clone(), + resources.storage_reader.clone(), + ) + .await? + }; // P2P Sync Server task. - let p2p_sync_server_handle = spawn_p2p_sync_server( - resources.maybe_network_manager.as_mut(), - resources.storage_reader.clone(), - ); + let p2p_sync_server_handle = if let Some(handle) = tasks.p2p_sync_server_handle { + handle + } else { + spawn_p2p_sync_server( + resources.maybe_network_manager.as_mut(), + resources.storage_reader.clone(), + ) + }; // Sync task. - let sync_client_handle = spawn_sync_client( - resources.maybe_network_manager.as_mut(), - resources.storage_reader, - resources.storage_writer, - &config, - resources.shared_highest_block, - resources.pending_data, - resources.pending_classes, - ) - .await; + let sync_client_handle = if let Some(handle) = tasks.sync_client_handle { + handle + } else { + spawn_sync_client( + resources.maybe_network_manager.as_mut(), + resources.storage_reader, + resources.storage_writer, + &config, + resources.shared_highest_block, + resources.pending_data, + resources.pending_classes, + ) + .await + }; // Created last since it consumes the network manager. - let network_handle = tokio::spawn(async move { + let network_handle = if let Some(handle) = tasks.network_handle { + handle + } else { match resources.maybe_network_manager { - Some(manager) => manager.run().boxed().await, - None => pending().boxed().await, + Some(manager) => tokio::spawn(async move { Ok(manager.run().await?) }), + None => tokio::spawn(pending()), } - }); + }; tokio::select! { res = storage_metrics_handle => { error!("collecting storage metrics stopped."); @@ -499,5 +542,6 @@ async fn main() -> anyhow::Result<()> { info!("Booting up."); let resources = PapyrusResources::new(&config)?; - run_threads(config, resources).await + let tasks = PapyrusTaskHandles::default(); + run_threads(config, resources, tasks).await } diff --git a/crates/papyrus_node/src/main_test.rs b/crates/papyrus_node/src/main_test.rs index a615835f3d..366ec89421 100644 --- a/crates/papyrus_node/src/main_test.rs +++ b/crates/papyrus_node/src/main_test.rs @@ -6,7 +6,7 @@ use papyrus_storage::{open_storage, StorageConfig}; use papyrus_test_utils::prometheus_is_contained; use tempfile::TempDir; -use crate::{run_threads, spawn_storage_metrics_collector, PapyrusResources}; +use crate::{run_threads, spawn_storage_metrics_collector, PapyrusResources, PapyrusTaskHandles}; // The mission of this test is to ensure that if an error is returned from one of the spawned tasks, // the node will stop, and this error will be returned. This is done by checking the case of an @@ -20,7 +20,8 @@ async fn run_threads_stop() { // Error when not supplying legal central URL. config.central.url = "_not_legal_url".to_string(); let resources = PapyrusResources::new(&config).unwrap(); - let error = run_threads(config, resources).await.expect_err("Should be an error."); + let tasks = PapyrusTaskHandles::default(); + let error = run_threads(config, resources, tasks).await.expect_err("Should be an error."); assert_eq!("relative URL without a base", error.to_string()); }