Skip to content

Commit

Permalink
refactor(node): introduce PapyrusTaskHandles to hold task overrides
Browse files Browse the repository at this point in the history
  • Loading branch information
matan-starkware committed Sep 18, 2024
1 parent 68b900e commit 651c1f9
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 47 deletions.
134 changes: 89 additions & 45 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::sync::Arc;
use std::time::Duration;

use futures::stream::StreamExt;
use futures::FutureExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
Expand Down Expand Up @@ -68,6 +67,21 @@ pub struct PapyrusResources {
pub pending_classes: Arc<RwLock<PendingClasses>>,
}

/// Struct which allows configuring how the node will run.
/// - If left `None`, the task will be spawn with its default (prod) configuration.
/// - If set to Some, that variant of the task will be run, and the default ignored.
/// - If you want to disable a task set it to `Some(tokio::spawn(pending()))`.
#[derive(Default)]
pub struct PapyrusTaskHandles {
pub storage_metrics_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub rpc_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub sync_client_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub monitoring_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub p2p_sync_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub consensus_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub network_handle: Option<JoinHandle<anyhow::Result<()>>>,
}

impl PapyrusResources {
pub fn new(config: &NodeConfig) -> anyhow::Result<Self> {
let (storage_reader, storage_writer) = open_storage(config.storage.clone())?;
Expand Down Expand Up @@ -338,61 +352,90 @@ fn spawn_p2p_sync_server(
})
}

async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> anyhow::Result<()> {
let consensus_handle = spawn_consensus(
config.consensus.as_ref(),
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut(),
)?;

let storage_metrics_handle = spawn_storage_metrics_collector(
config.monitoring_gateway.collect_metrics,
resources.storage_reader.clone(),
STORAGE_METRICS_UPDATE_INTERVAL,
);
async fn run_threads(
config: NodeConfig,
mut resources: PapyrusResources,
tasks: PapyrusTaskHandles,
) -> anyhow::Result<()> {
let consensus_handle = if let Some(handle) = tasks.consensus_handle {
handle
} else {
spawn_consensus(
config.consensus.as_ref(),
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut(),
)?
};

let storage_metrics_handle = if let Some(handle) = tasks.storage_metrics_handle {
handle
} else {
spawn_storage_metrics_collector(
config.monitoring_gateway.collect_metrics,
resources.storage_reader.clone(),
STORAGE_METRICS_UPDATE_INTERVAL,
)
};
// Monitoring server.
let monitoring_server_handle = spawn_monitoring_server(
resources.storage_reader.clone(),
resources.local_peer_id.clone(),
&config,
)?;
let monitoring_server_handle = if let Some(handle) = tasks.monitoring_server_handle {
handle
} else {
spawn_monitoring_server(
resources.storage_reader.clone(),
resources.local_peer_id.clone(),
&config,
)?
};

// JSON-RPC server.
let rpc_server_handle = spawn_rpc_server(
&config,
resources.shared_highest_block.clone(),
resources.pending_data.clone(),
resources.pending_classes.clone(),
resources.storage_reader.clone(),
)
.await?;
let rpc_server_handle = if let Some(handle) = tasks.rpc_server_handle {
handle
} else {
spawn_rpc_server(
&config,
resources.shared_highest_block.clone(),
resources.pending_data.clone(),
resources.pending_classes.clone(),
resources.storage_reader.clone(),
)
.await?
};

// P2P Sync Server task.
let p2p_sync_server_handle = spawn_p2p_sync_server(
resources.maybe_network_manager.as_mut(),
resources.storage_reader.clone(),
);
let p2p_sync_server_handle = if let Some(handle) = tasks.p2p_sync_server_handle {
handle
} else {
spawn_p2p_sync_server(
resources.maybe_network_manager.as_mut(),
resources.storage_reader.clone(),
)
};

// Sync task.
let sync_client_handle = spawn_sync_client(
resources.maybe_network_manager.as_mut(),
resources.storage_reader,
resources.storage_writer,
&config,
resources.shared_highest_block,
resources.pending_data,
resources.pending_classes,
)
.await;
let sync_client_handle = if let Some(handle) = tasks.sync_client_handle {
handle
} else {
spawn_sync_client(
resources.maybe_network_manager.as_mut(),
resources.storage_reader,
resources.storage_writer,
&config,
resources.shared_highest_block,
resources.pending_data,
resources.pending_classes,
)
.await
};

// Created last since it consumes the network manager.
let network_handle = tokio::spawn(async move {
let network_handle = if let Some(handle) = tasks.network_handle {
handle
} else {
match resources.maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
Some(manager) => tokio::spawn(async move { Ok(manager.run().await?) }),
None => tokio::spawn(pending()),
}
});
};
tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
Expand Down Expand Up @@ -499,5 +542,6 @@ async fn main() -> anyhow::Result<()> {
info!("Booting up.");

let resources = PapyrusResources::new(&config)?;
run_threads(config, resources).await
let tasks = PapyrusTaskHandles::default();
run_threads(config, resources, tasks).await
}
5 changes: 3 additions & 2 deletions crates/papyrus_node/src/main_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use papyrus_storage::{open_storage, StorageConfig};
use papyrus_test_utils::prometheus_is_contained;
use tempfile::TempDir;

use crate::{run_threads, spawn_storage_metrics_collector, PapyrusResources};
use crate::{run_threads, spawn_storage_metrics_collector, PapyrusResources, PapyrusTaskHandles};

// The mission of this test is to ensure that if an error is returned from one of the spawned tasks,
// the node will stop, and this error will be returned. This is done by checking the case of an
Expand All @@ -20,7 +20,8 @@ async fn run_threads_stop() {
// Error when not supplying legal central URL.
config.central.url = "_not_legal_url".to_string();
let resources = PapyrusResources::new(&config).unwrap();
let error = run_threads(config, resources).await.expect_err("Should be an error.");
let tasks = PapyrusTaskHandles::default();
let error = run_threads(config, resources, tasks).await.expect_err("Should be an error.");
assert_eq!("relative URL without a base", error.to_string());
}

Expand Down

0 comments on commit 651c1f9

Please sign in to comment.