Skip to content

Commit

Permalink
refactor(node): spawned tasks should all return anyhow result
Browse files Browse the repository at this point in the history
This fairly generic return type should allow for flexibility with test overrides.
  • Loading branch information
matan-starkware committed Sep 19, 2024
1 parent 56b34c8 commit 5539129
Showing 1 changed file with 42 additions and 33 deletions.
75 changes: 42 additions & 33 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
mod main_test;

use std::env::args;
use std::future::{pending, Future};
use std::future::pending;
use std::process::exit;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -18,7 +18,6 @@ use papyrus_config::validators::config_validate;
use papyrus_config::ConfigError;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::simulation_network_receiver::NetworkReceiver;
use papyrus_consensus::types::ConsensusError;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
Expand All @@ -41,7 +40,7 @@ use starknet_api::felt;
use starknet_client::reader::objects::pending_data::{PendingBlock, PendingBlockOrDeprecated};
use starknet_client::reader::PendingData;
use tokio::sync::RwLock;
use tokio::task::{JoinError, JoinHandle};
use tokio::task::JoinHandle;
use tracing::metadata::LevelFilter;
use tracing::{debug, debug_span, error, info, warn, Instrument};
use tracing_subscriber::prelude::*;
Expand Down Expand Up @@ -97,13 +96,13 @@ impl PapyrusResources {
}

#[cfg(feature = "rpc")]
async fn create_rpc_server_future(
async fn spawn_rpc_server(
config: &NodeConfig,
shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pending_data: Arc<RwLock<PendingData>>,
pending_classes: Arc<RwLock<PendingClasses>>,
storage_reader: StorageReader,
) -> anyhow::Result<impl Future<Output = Result<(), JoinError>>> {
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
let (_, server_handle) = run_server(
&config.rpc,
shared_highest_block,
Expand All @@ -113,29 +112,33 @@ async fn create_rpc_server_future(
VERSION_FULL,
)
.await?;
Ok(tokio::spawn(server_handle.stopped()))
Ok(tokio::spawn(async move {
server_handle.stopped().await;
Ok(())
}))
}

#[cfg(not(feature = "rpc"))]
async fn create_rpc_server_future(
async fn spawn_rpc_server(
_config: &NodeConfig,
_shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
_pending_data: Arc<RwLock<PendingData>>,
_pending_classes: Arc<RwLock<PendingClasses>>,
_storage_reader: StorageReader,
) -> anyhow::Result<impl Future<Output = Result<(), JoinError>>> {
Ok(pending())
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
Ok(tokio::spawn(pending()))
}

fn run_consensus(
fn spawn_consensus(
config: Option<&ConsensusConfig>,
storage_reader: StorageReader,
network_manager: Option<&mut NetworkManager>,
) -> anyhow::Result<JoinHandle<Result<(), ConsensusError>>> {
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
let (Some(config), Some(network_manager)) = (config, network_manager) else {
info!("Consensus is disabled.");
return Ok(tokio::spawn(pending()));
};
let config = config.clone();
debug!("Consensus configuration: {config:?}");

let network_channels = network_manager
Expand Down Expand Up @@ -167,31 +170,37 @@ fn run_consensus(
sync_channels.broadcasted_messages_receiver.map(|(vote, _report_sender)| {
BlockNumber(vote.expect("Sync channel should never have errors").height)
});
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_channels,
sync_receiver,
)))
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
broadcast_channels,
sync_receiver,
)
.await?)
}))
} else {
let context = PapyrusConsensusContext::new(
storage_reader.clone(),
network_channels.messages_to_broadcast_sender.clone(),
config.num_validators,
None,
);
Ok(tokio::spawn(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)))
Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
network_channels,
futures::stream::pending(),
)
.await?)
}))
}
}

Expand Down Expand Up @@ -314,7 +323,7 @@ fn spawn_p2p_sync_server(
}

async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> anyhow::Result<()> {
let consensus_handle = run_consensus(
let consensus_handle = spawn_consensus(
config.consensus.as_ref(),
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut(),
Expand All @@ -338,7 +347,7 @@ async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> any
let monitoring_server_handle = monitoring_server.spawn_server().await;

// JSON-RPC server.
let server_handle_future = create_rpc_server_future(
let rpc_server_handle = spawn_rpc_server(
&config,
resources.shared_highest_block.clone(),
resources.pending_data.clone(),
Expand Down Expand Up @@ -377,9 +386,9 @@ async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> any
error!("collecting storage metrics stopped.");
res??
}
res = server_handle_future => {
res = rpc_server_handle => {
error!("RPC server stopped.");
res?
res??
}
res = monitoring_server_handle => {
error!("Monitoring server stopped.");
Expand Down

0 comments on commit 5539129

Please sign in to comment.