From 10e5d20dced21aa7ccfdc8a8396e3e533850cf10 Mon Sep 17 00:00:00 2001 From: Matan Markind Date: Wed, 18 Sep 2024 15:29:05 +0300 Subject: [PATCH] refactor(node): convert storage metrics handle to the desired task type --- crates/papyrus_node/src/main.rs | 49 +++++++++++++++------------- crates/papyrus_node/src/main_test.rs | 2 +- 2 files changed, 28 insertions(+), 23 deletions(-) diff --git a/crates/papyrus_node/src/main.rs b/crates/papyrus_node/src/main.rs index 4d6cade6c5..f6bbbf8420 100644 --- a/crates/papyrus_node/src/main.rs +++ b/crates/papyrus_node/src/main.rs @@ -166,11 +166,18 @@ fn run_consensus( async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { let (storage_reader, storage_writer) = open_storage(config.storage.clone())?; - let storage_metrics_handle = if config.monitoring_gateway.collect_metrics { - spawn_storage_metrics_collector(storage_reader.clone(), STORAGE_METRICS_UPDATE_INTERVAL) - } else { - tokio::spawn(pending()) - }; + // The sync is the only writer of the syncing state. + let shared_highest_block = Arc::new(RwLock::new(None)); + let pending_data = Arc::new(RwLock::new(PendingData { + // The pending data might change later to DeprecatedPendingBlock, depending on the response + // from the feeder gateway. + block: PendingBlockOrDeprecated::Current(PendingBlock { + parent_block_hash: BlockHash(felt!(GENESIS_HASH)), + ..Default::default() + }), + ..Default::default() + })); + let pending_classes = Arc::new(RwLock::new(PendingClasses::default())); // P2P network. let ( @@ -191,6 +198,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { } }); + let storage_metrics_handle = spawn_storage_metrics_collector( + config.monitoring_gateway.collect_metrics, + storage_reader.clone(), + STORAGE_METRICS_UPDATE_INTERVAL, + ); + // Monitoring server. let monitoring_server = MonitoringServer::new( config.monitoring_gateway.clone(), @@ -202,19 +215,6 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { )?; let monitoring_server_handle = monitoring_server.spawn_server().await; - // The sync is the only writer of the syncing state. - let shared_highest_block = Arc::new(RwLock::new(None)); - let pending_data = Arc::new(RwLock::new(PendingData { - // The pending data might change later to DeprecatedPendingBlock, depending on the response - // from the feeder gateway. - block: PendingBlockOrDeprecated::Current(PendingBlock { - parent_block_hash: BlockHash(felt!(GENESIS_HASH)), - ..Default::default() - }), - ..Default::default() - })); - let pending_classes = Arc::new(RwLock::new(PendingClasses::default())); - // JSON-RPC server. let server_handle_future = create_rpc_server_future( &config, @@ -270,7 +270,7 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> { tokio::select! { res = storage_metrics_handle => { error!("collecting storage metrics stopped."); - res? + res?? } res = server_handle_future => { error!("RPC server stopped."); @@ -418,16 +418,21 @@ fn configure_tracing() { } fn spawn_storage_metrics_collector( + collect_metrics: bool, storage_reader: StorageReader, - update_interval: Duration, -) -> JoinHandle<()> { + interval: Duration, +) -> JoinHandle> { + if !collect_metrics { + return tokio::spawn(pending()); + } + tokio::spawn( async move { loop { if let Err(error) = update_storage_metrics(&storage_reader) { warn!("Failed to update storage metrics: {error}"); } - tokio::time::sleep(update_interval).await; + tokio::time::sleep(interval).await; } } .instrument(debug_span!("collect_storage_metrics")), diff --git a/crates/papyrus_node/src/main_test.rs b/crates/papyrus_node/src/main_test.rs index 80a07421e9..8d7f4ce97d 100644 --- a/crates/papyrus_node/src/main_test.rs +++ b/crates/papyrus_node/src/main_test.rs @@ -35,7 +35,7 @@ async fn storage_metrics_collector() { assert!(prometheus_is_contained(handle.render(), "storage_free_pages_number", &[]).is_none()); - spawn_storage_metrics_collector(storage_reader, Duration::from_secs(1)); + spawn_storage_metrics_collector(true, storage_reader, Duration::from_secs(1)); // To make sure the metrics in the spawned thread are updated. tokio::time::sleep(Duration::from_millis(1)).await;