Skip to content

Commit

Permalink
refactor(node): create papyrus resources for generating top level tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
matan-starkware committed Sep 18, 2024
1 parent aa1c819 commit d4b6124
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 39 deletions.
98 changes: 61 additions & 37 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,43 @@ const GENESIS_HASH: &str = "0x0";
// Duration between updates to the storage metrics (those in the collect_storage_metrics function).
const STORAGE_METRICS_UPDATE_INTERVAL: Duration = Duration::from_secs(10);

pub struct PapyrusResources {
pub storage_reader: StorageReader,
pub storage_writer: StorageWriter,
pub maybe_network_manager: Option<NetworkManager>,
pub local_peer_id: String,
pub shared_highest_block: Arc<RwLock<Option<BlockHashAndNumber>>>,
pub pending_data: Arc<RwLock<PendingData>>,
pub pending_classes: Arc<RwLock<PendingClasses>>,
}

impl PapyrusResources {
pub fn new(config: &NodeConfig) -> anyhow::Result<Self> {
let (storage_reader, storage_writer) = open_storage(config.storage.clone())?;
let (maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?;
let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData {
// The pending data might change later to DeprecatedPendingBlock, depending on the
// response from the feeder gateway.
block: PendingBlockOrDeprecated::Current(PendingBlock {
parent_block_hash: BlockHash(felt!(GENESIS_HASH)),
..Default::default()
}),
..Default::default()
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));
Ok(Self {
storage_reader,
storage_writer,
maybe_network_manager,
local_peer_id,
shared_highest_block,
pending_data,
pending_classes,
})
}
}

#[cfg(feature = "rpc")]
async fn create_rpc_server_future(
config: &NodeConfig,
Expand Down Expand Up @@ -273,33 +310,16 @@ fn spawn_p2p_sync_server(
tokio::spawn(async move { Ok(p2p_sync_server.run().await) })
}

async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
let (storage_reader, storage_writer) = open_storage(config.storage.clone())?;

// The sync is the only writer of the syncing state.
let shared_highest_block = Arc::new(RwLock::new(None));
let pending_data = Arc::new(RwLock::new(PendingData {
// The pending data might change later to DeprecatedPendingBlock, depending on the response
// from the feeder gateway.
block: PendingBlockOrDeprecated::Current(PendingBlock {
parent_block_hash: BlockHash(felt!(GENESIS_HASH)),
..Default::default()
}),
..Default::default()
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));

let (mut maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?;

async fn run_threads(config: NodeConfig, mut resources: PapyrusResources) -> anyhow::Result<()> {
let consensus_handle = run_consensus(
config.consensus.as_ref(),
storage_reader.clone(),
maybe_network_manager.as_mut(),
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut(),
)?;

let storage_metrics_handle = spawn_storage_metrics_collector(
config.monitoring_gateway.collect_metrics,
storage_reader.clone(),
resources.storage_reader.clone(),
STORAGE_METRICS_UPDATE_INTERVAL,
);

Expand All @@ -308,41 +328,43 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
config.monitoring_gateway.clone(),
get_config_presentation(&config, true)?,
get_config_presentation(&config, false)?,
storage_reader.clone(),
resources.storage_reader.clone(),
VERSION_FULL,
local_peer_id,
resources.local_peer_id,
)?;
let monitoring_server_handle = monitoring_server.spawn_server().await;

// JSON-RPC server.
let server_handle_future = create_rpc_server_future(
&config,
shared_highest_block.clone(),
pending_data.clone(),
pending_classes.clone(),
storage_reader.clone(),
resources.shared_highest_block.clone(),
resources.pending_data.clone(),
resources.pending_classes.clone(),
resources.storage_reader.clone(),
)
.await?;

// P2P Sync Server task.
let p2p_sync_server_handle =
spawn_p2p_sync_server(maybe_network_manager.as_mut(), storage_reader.clone());
let p2p_sync_server_handle = spawn_p2p_sync_server(
resources.maybe_network_manager.as_mut(),
resources.storage_reader.clone(),
);

// Sync task.
let sync_client_handle = spawn_sync_client(
maybe_network_manager.as_mut(),
storage_reader,
storage_writer,
resources.maybe_network_manager.as_mut(),
resources.storage_reader,
resources.storage_writer,
&config,
shared_highest_block,
pending_data,
pending_classes,
resources.shared_highest_block,
resources.pending_data,
resources.pending_classes,
)
.await;

// Created last since it consumes the network manager.
let network_handle = tokio::spawn(async move {
match maybe_network_manager {
match resources.maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
Expand Down Expand Up @@ -451,5 +473,7 @@ async fn main() -> anyhow::Result<()> {
.expect("This should be the first and only time we set this value.");

info!("Booting up.");
run_threads(config).await

let resources = PapyrusResources::new(&config)?;
run_threads(config, resources).await
}
5 changes: 3 additions & 2 deletions crates/papyrus_node/src/main_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use papyrus_storage::{open_storage, StorageConfig};
use papyrus_test_utils::prometheus_is_contained;
use tempfile::TempDir;

use crate::{run_threads, spawn_storage_metrics_collector};
use crate::{run_threads, spawn_storage_metrics_collector, PapyrusResources};

// The mission of this test is to ensure that if an error is returned from one of the spawned tasks,
// the node will stop, and this error will be returned. This is done by checking the case of an
Expand All @@ -19,7 +19,8 @@ async fn run_threads_stop() {

// Error when not supplying legal central URL.
config.central.url = "_not_legal_url".to_string();
let error = run_threads(config).await.expect_err("Should be an error.");
let resources = PapyrusResources::new(&config).unwrap();
let error = run_threads(config, resources).await.expect_err("Should be an error.");
assert_eq!("relative URL without a base", error.to_string());
}

Expand Down

0 comments on commit d4b6124

Please sign in to comment.