Skip to content

Commit

Permalink
refactor(node): network channel creation is done by users
Browse files Browse the repository at this point in the history
This replaces the register function with creating the network manager resource that
any task can user.
  • Loading branch information
matan-starkware committed Sep 19, 2024
1 parent ac56ada commit 96f38ba
Showing 1 changed file with 58 additions and 65 deletions.
123 changes: 58 additions & 65 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async fn run_sync(
}

async fn spawn_sync_client(
maybe_sync_client_channels: Option<P2PSyncClientChannels>,
maybe_network_manager: Option<&mut NetworkManager>,
storage_reader: StorageReader,
storage_writer: StorageWriter,
config: &NodeConfig,
Expand All @@ -214,25 +214,61 @@ async fn spawn_sync_client(
))
}
(None, Some(p2p_sync_client_config)) => {
let network_manager = maybe_network_manager
.expect("If p2p sync is enabled, network needs to be enabled too");
let header_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE);
let class_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE);
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
transaction_client_sender,
class_client_sender,
);
let p2p_sync = P2PSyncClient::new(
p2p_sync_client_config,
storage_reader,
storage_writer,
maybe_sync_client_channels
.expect("If p2p sync is enabled, network needs to be enabled too"),
p2p_sync_client_channels,
);
tokio::spawn(async move { Ok(p2p_sync.run().await?) })
}
}
}

fn spawn_p2p_sync_server(
maybe_sync_server_channels: Option<P2PSyncServerChannels>,
network_manager: Option<&mut NetworkManager>,
storage_reader: StorageReader,
) -> JoinHandle<anyhow::Result<()>> {
let Some(p2p_sync_server_channels) = maybe_sync_server_channels else {
let Some(network_manager) = network_manager else {
info!("P2P Sync is disabled.");
return tokio::spawn(pending());
};

let header_server_receiver = network_manager
.register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE);
let class_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE);
let event_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);

let p2p_sync_server_channels = P2PSyncServerChannels::new(
header_server_receiver,
state_diff_server_receiver,
transaction_server_receiver,
class_server_receiver,
event_server_receiver,
);

let p2p_sync_server = P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels);
tokio::spawn(async move { Ok(p2p_sync_server.run().await) })
}
Expand All @@ -253,24 +289,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));

// P2P network.
let (
mut maybe_network_manager,
maybe_sync_client_channels,
maybe_sync_server_channels,
local_peer_id,
) = register_to_network(config.network.clone())?;
let (mut maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?;

let consensus_handle = run_consensus(
config.consensus.as_ref(),
storage_reader.clone(),
maybe_network_manager.as_mut(),
)?;
let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});

let storage_metrics_handle = spawn_storage_metrics_collector(
config.monitoring_gateway.collect_metrics,
Expand Down Expand Up @@ -301,11 +326,11 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// P2P Sync Server task.
let p2p_sync_server_handle =
spawn_p2p_sync_server(maybe_sync_server_channels, storage_reader.clone());
spawn_p2p_sync_server(maybe_network_manager.as_mut(), storage_reader.clone());

// Sync task.
let sync_client_handle = spawn_sync_client(
maybe_sync_client_channels,
maybe_network_manager.as_mut(),
storage_reader,
storage_writer,
&config,
Expand All @@ -315,6 +340,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
)
.await;

// Created last since it consumes the network manager.
let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});
tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
Expand Down Expand Up @@ -349,58 +381,19 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
return Ok(());
}

type NetworkRunReturn =
(Option<NetworkManager>, Option<P2PSyncClientChannels>, Option<P2PSyncServerChannels>, String);

fn register_to_network(network_config: Option<NetworkConfig>) -> anyhow::Result<NetworkRunReturn> {
fn build_network_manager(
network_config: Option<NetworkConfig>,
) -> anyhow::Result<(Option<NetworkManager>, String)> {
let Some(network_config) = network_config else {
return Ok((None, None, None, "".to_string()));
return Ok((None, "".to_string()));
};
let mut network_manager = network_manager::NetworkManager::new(
let network_manager = network_manager::NetworkManager::new(
network_config.clone(),
Some(VERSION_FULL.to_string()),
);
let local_peer_id = network_manager.get_local_peer_id();

let header_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE);
let class_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE);
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
transaction_client_sender,
class_client_sender,
);

let header_server_receiver = network_manager
.register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE);
let class_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE);
let event_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);
let p2p_sync_server_channels = P2PSyncServerChannels::new(
header_server_receiver,
state_diff_server_receiver,
transaction_server_receiver,
class_server_receiver,
event_server_receiver,
);

Ok((
Some(network_manager),
Some(p2p_sync_client_channels),
Some(p2p_sync_server_channels),
local_peer_id,
))
Ok((Some(network_manager), local_peer_id))
}

// TODO(yair): add dynamic level filtering.
Expand Down

0 comments on commit 96f38ba

Please sign in to comment.