Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(node): introduce PapyrusTaskHandles to hold task overrides #872

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 89 additions & 45 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use std::sync::Arc;
use std::time::Duration;

use futures::stream::StreamExt;
use futures::FutureExt;
use papyrus_base_layer::ethereum_base_layer_contract::EthereumBaseLayerConfig;
use papyrus_common::metrics::COLLECT_PROFILING_METRICS;
use papyrus_common::pending_classes::PendingClasses;
Expand Down Expand Up @@ -68,6 +67,21 @@ pub struct PapyrusResources {
pub pending_classes: Arc<RwLock<PendingClasses>>,
}

/// Struct which allows configuring how the node will run.
/// - If left `None`, the task will be spawn with its default (prod) configuration.
/// - If set to Some, that variant of the task will be run, and the default ignored.
/// - If you want to disable a task set it to `Some(tokio::spawn(pending()))`.
#[derive(Default)]
pub struct PapyrusTaskHandles {
pub storage_metrics_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub rpc_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub sync_client_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub monitoring_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub p2p_sync_server_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub consensus_handle: Option<JoinHandle<anyhow::Result<()>>>,
pub network_handle: Option<JoinHandle<anyhow::Result<()>>>,
}

impl PapyrusResources {
pub fn new(config: &NodeConfig) -> anyhow::Result<Self> {
let (storage_reader, storage_writer) = open_storage(config.storage.clone())?;
Expand Down Expand Up @@ -338,61 +352,90 @@ fn spawn_p2p_sync_server(
})
}

async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> anyhow::Result<()> {
let consensus_handle = spawn_consensus(
config.consensus.as_ref(),
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut(),
)?;

let storage_metrics_handle = spawn_storage_metrics_collector(
config.monitoring_gateway.collect_metrics,
resources.storage_reader.clone(),
STORAGE_METRICS_UPDATE_INTERVAL,
);
async fn run_threads(
config: NodeConfig,
mut resources: PapyrusResources,
tasks: PapyrusTaskHandles,
) -> anyhow::Result<()> {
let consensus_handle = if let Some(handle) = tasks.consensus_handle {
handle
} else {
spawn_consensus(
config.consensus.as_ref(),
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut(),
)?
};

let storage_metrics_handle = if let Some(handle) = tasks.storage_metrics_handle {
handle
} else {
spawn_storage_metrics_collector(
config.monitoring_gateway.collect_metrics,
resources.storage_reader.clone(),
STORAGE_METRICS_UPDATE_INTERVAL,
)
};
// Monitoring server.
let monitoring_server_handle = spawn_monitoring_server(
resources.storage_reader.clone(),
resources.local_peer_id.clone(),
&config,
)?;
let monitoring_server_handle = if let Some(handle) = tasks.monitoring_server_handle {
handle
} else {
spawn_monitoring_server(
resources.storage_reader.clone(),
resources.local_peer_id.clone(),
&config,
)?
};

// JSON-RPC server.
let rpc_server_handle = spawn_rpc_server(
&config,
resources.shared_highest_block.clone(),
resources.pending_data.clone(),
resources.pending_classes.clone(),
resources.storage_reader.clone(),
)
.await?;
let rpc_server_handle = if let Some(handle) = tasks.rpc_server_handle {
handle
} else {
spawn_rpc_server(
&config,
resources.shared_highest_block.clone(),
resources.pending_data.clone(),
resources.pending_classes.clone(),
resources.storage_reader.clone(),
)
.await?
};

// P2P Sync Server task.
let p2p_sync_server_handle = spawn_p2p_sync_server(
resources.maybe_network_manager.as_mut(),
resources.storage_reader.clone(),
);
let p2p_sync_server_handle = if let Some(handle) = tasks.p2p_sync_server_handle {
handle
} else {
spawn_p2p_sync_server(
resources.maybe_network_manager.as_mut(),
resources.storage_reader.clone(),
)
};

// Sync task.
let sync_client_handle = spawn_sync_client(
resources.maybe_network_manager.as_mut(),
resources.storage_reader,
resources.storage_writer,
&config,
resources.shared_highest_block,
resources.pending_data,
resources.pending_classes,
)
.await;
let sync_client_handle = if let Some(handle) = tasks.sync_client_handle {
handle
} else {
spawn_sync_client(
resources.maybe_network_manager.as_mut(),
resources.storage_reader,
resources.storage_writer,
&config,
resources.shared_highest_block,
resources.pending_data,
resources.pending_classes,
)
.await
};

// Created last since it consumes the network manager.
let network_handle = tokio::spawn(async move {
let network_handle = if let Some(handle) = tasks.network_handle {
handle
} else {
match resources.maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
Some(manager) => tokio::spawn(async move { Ok(manager.run().await?) }),
None => tokio::spawn(pending()),
}
});
};
tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
Expand Down Expand Up @@ -499,5 +542,6 @@ async fn main() -> anyhow::Result<()> {
info!("Booting up.");

let resources = PapyrusResources::new(&config)?;
run_threads(config, resources).await
let tasks = PapyrusTaskHandles::default();
run_threads(config, resources, tasks).await
}
5 changes: 3 additions & 2 deletions crates/papyrus_node/src/main_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use papyrus_storage::{open_storage, StorageConfig};
use papyrus_test_utils::prometheus_is_contained;
use tempfile::TempDir;

use crate::{run_threads, spawn_storage_metrics_collector, PapyrusResources};
use crate::{run_threads, spawn_storage_metrics_collector, PapyrusResources, PapyrusTaskHandles};

// The mission of this test is to ensure that if an error is returned from one of the spawned tasks,
// the node will stop, and this error will be returned. This is done by checking the case of an
Expand All @@ -20,7 +20,8 @@ async fn run_threads_stop() {
// Error when not supplying legal central URL.
config.central.url = "_not_legal_url".to_string();
let resources = PapyrusResources::new(&config).unwrap();
let error = run_threads(config, resources).await.expect_err("Should be an error.");
let tasks = PapyrusTaskHandles::default();
let error = run_threads(config, resources, tasks).await.expect_err("Should be an error.");
assert_eq!("relative URL without a base", error.to_string());
}

Expand Down
Loading