Skip to content

Commit

Permalink
refactor(node): convert storage metrics handle to the desired task type
Browse files Browse the repository at this point in the history
  • Loading branch information
matan-starkware committed Sep 18, 2024
1 parent 50e6493 commit 6495646
Showing 1 changed file with 25 additions and 24 deletions.
49 changes: 25 additions & 24 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus::types::ConsensusError;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_monitoring_gateway::{MonitoringGatewayConfig, MonitoringServer};
use papyrus_network::gossipsub_impl::Topic;
use papyrus_network::network_manager::{BroadcastTopicChannels, NetworkManager};
use papyrus_network::{network_manager, NetworkConfig};
Expand Down Expand Up @@ -166,11 +166,18 @@ fn run_consensus(
async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
let (storage_reader, storage_writer) = open_storage(config.storage.clone())?;

let storage_metrics_handle = if config.monitoring_gateway.collect_metrics {
spawn_storage_metrics_collector(storage_reader.clone(), STORAGE_METRICS_UPDATE_INTERVAL)
} else {
tokio::spawn(pending())
};
// The sync is the only writer of the syncing state.
let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData {
// The pending data might change later to DeprecatedPendingBlock, depending on the response
// from the feeder gateway.
block: PendingBlockOrDeprecated::Current(PendingBlock {
parent_block_hash: BlockHash(felt!(GENESIS_HASH)),
..Default::default()
}),
..Default::default()
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));

// P2P network.
let (
Expand All @@ -191,6 +198,9 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
}
});

let storage_metrics_handle =
build_metrics_collector(&config.monitoring_gateway, storage_reader.clone());

// Monitoring server.
let monitoring_server = MonitoringServer::new(
config.monitoring_gateway.clone(),
Expand All @@ -202,19 +212,6 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
)?;
let monitoring_server_handle = monitoring_server.spawn_server().await;

// The sync is the only writer of the syncing state.
let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData {
// The pending data might change later to DeprecatedPendingBlock, depending on the response
// from the feeder gateway.
block: PendingBlockOrDeprecated::Current(PendingBlock {
parent_block_hash: BlockHash(felt!(GENESIS_HASH)),
..Default::default()
}),
..Default::default()
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));

// JSON-RPC server.
let server_handle_future = create_rpc_server_future(
&config,
Expand Down Expand Up @@ -270,7 +267,7 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
res?
res??
}
res = server_handle_future => {
error!("RPC server stopped.");
Expand Down Expand Up @@ -417,17 +414,21 @@ fn configure_tracing() {
tracing_subscriber::registry().with(fmt_layer).with(level_filter_layer).init();
}

fn spawn_storage_metrics_collector(
fn build_metrics_collector(
config: &MonitoringGatewayConfig,
storage_reader: StorageReader,
update_interval: Duration,
) -> JoinHandle<()> {
) -> JoinHandle<anyhow::Result<()>> {
if !config.collect_metrics {
return tokio::spawn(pending());
}

tokio::spawn(
async move {
loop {
if let Err(error) = update_storage_metrics(&storage_reader) {
warn!("Failed to update storage metrics: {error}");
}
tokio::time::sleep(update_interval).await;
tokio::time::sleep(STORAGE_METRICS_UPDATE_INTERVAL).await;
}
}
.instrument(debug_span!("collect_storage_metrics")),
Expand Down

0 comments on commit 6495646

Please sign in to comment.