Skip to content

Commit

Permalink
refactor(node): convert storage metrics handle to the desired task ty…
Browse files Browse the repository at this point in the history
…pe (#860)
  • Loading branch information
matan-starkware committed Sep 18, 2024
1 parent 0a88987 commit fa8057c
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 23 deletions.
49 changes: 27 additions & 22 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,11 +166,18 @@ fn run_consensus(
async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
let (storage_reader, storage_writer) = open_storage(config.storage.clone())?;

let storage_metrics_handle = if config.monitoring_gateway.collect_metrics {
spawn_storage_metrics_collector(storage_reader.clone(), STORAGE_METRICS_UPDATE_INTERVAL)
} else {
tokio::spawn(pending())
};
// The sync is the only writer of the syncing state.
let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData {
// The pending data might change later to DeprecatedPendingBlock, depending on the response
// from the feeder gateway.
block: PendingBlockOrDeprecated::Current(PendingBlock {
parent_block_hash: BlockHash(felt!(GENESIS_HASH)),
..Default::default()
}),
..Default::default()
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));

// P2P network.
let (
Expand All @@ -191,6 +198,12 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
}
});

let storage_metrics_handle = spawn_storage_metrics_collector(
config.monitoring_gateway.collect_metrics,
storage_reader.clone(),
STORAGE_METRICS_UPDATE_INTERVAL,
);

// Monitoring server.
let monitoring_server = MonitoringServer::new(
config.monitoring_gateway.clone(),
Expand All @@ -202,19 +215,6 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
)?;
let monitoring_server_handle = monitoring_server.spawn_server().await;

// The sync is the only writer of the syncing state.
let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData {
// The pending data might change later to DeprecatedPendingBlock, depending on the response
// from the feeder gateway.
block: PendingBlockOrDeprecated::Current(PendingBlock {
parent_block_hash: BlockHash(felt!(GENESIS_HASH)),
..Default::default()
}),
..Default::default()
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));

// JSON-RPC server.
let server_handle_future = create_rpc_server_future(
&config,
Expand Down Expand Up @@ -270,7 +270,7 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
res?
res??
}
res = server_handle_future => {
error!("RPC server stopped.");
Expand Down Expand Up @@ -418,16 +418,21 @@ fn configure_tracing() {
}

fn spawn_storage_metrics_collector(
collect_metrics: bool,
storage_reader: StorageReader,
update_interval: Duration,
) -> JoinHandle<()> {
interval: Duration,
) -> JoinHandle<anyhow::Result<()>> {
if !collect_metrics {
return tokio::spawn(pending());
}

tokio::spawn(
async move {
loop {
if let Err(error) = update_storage_metrics(&storage_reader) {
warn!("Failed to update storage metrics: {error}");
}
tokio::time::sleep(update_interval).await;
tokio::time::sleep(interval).await;
}
}
.instrument(debug_span!("collect_storage_metrics")),
Expand Down
2 changes: 1 addition & 1 deletion crates/papyrus_node/src/main_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ async fn storage_metrics_collector() {

assert!(prometheus_is_contained(handle.render(), "storage_free_pages_number", &[]).is_none());

spawn_storage_metrics_collector(storage_reader, Duration::from_secs(1));
spawn_storage_metrics_collector(true, storage_reader, Duration::from_secs(1));
// To make sure the metrics in the spawned thread are updated.
tokio::time::sleep(Duration::from_millis(1)).await;

Expand Down

0 comments on commit fa8057c

Please sign in to comment.