Skip to content

Commit

Permalink
refactor(node): network channel creation is done by users (#863)
Browse files Browse the repository at this point in the history
This replaces the register function with creating the network manager resource that
any task can user.
  • Loading branch information
matan-starkware committed Sep 19, 2024
1 parent c7f9b7a commit 4e3a355
Showing 1 changed file with 58 additions and 65 deletions.
123 changes: 58 additions & 65 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async fn run_sync(
}

async fn spawn_sync_client(
maybe_sync_client_channels: Option<P2PSyncClientChannels>,
maybe_network_manager: Option<&mut NetworkManager>,
storage_reader: StorageReader,
storage_writer: StorageWriter,
config: &NodeConfig,
Expand All @@ -214,25 +214,61 @@ async fn spawn_sync_client(
))
}
(None, Some(p2p_sync_client_config)) => {
let network_manager = maybe_network_manager
.expect("If p2p sync is enabled, network needs to be enabled too");
let header_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE);
let class_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE);
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
transaction_client_sender,
class_client_sender,
);
let p2p_sync = P2PSyncClient::new(
p2p_sync_client_config,
storage_reader,
storage_writer,
maybe_sync_client_channels
.expect("If p2p sync is enabled, network needs to be enabled too"),
p2p_sync_client_channels,
);
tokio::spawn(async move { Ok(p2p_sync.run().await?) })
}
}
}

fn spawn_p2p_sync_server(
maybe_sync_server_channels: Option<P2PSyncServerChannels>,
network_manager: Option<&mut NetworkManager>,
storage_reader: StorageReader,
) -> JoinHandle<anyhow::Result<()>> {
let Some(p2p_sync_server_channels) = maybe_sync_server_channels else {
let Some(network_manager) = network_manager else {
info!("P2P Sync is disabled.");
return tokio::spawn(pending());
};

let header_server_receiver = network_manager
.register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE);
let class_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE);
let event_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);

let p2p_sync_server_channels = P2PSyncServerChannels::new(
header_server_receiver,
state_diff_server_receiver,
transaction_server_receiver,
class_server_receiver,
event_server_receiver,
);

let p2p_sync_server = P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels);
tokio::spawn(async move { Ok(p2p_sync_server.run().await) })
}
Expand All @@ -253,24 +289,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));

// P2P network.
let (
mut maybe_network_manager,
maybe_sync_client_channels,
maybe_sync_server_channels,
local_peer_id,
) = register_to_network(config.network.clone())?;
let (mut maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?;

let consensus_handle = run_consensus(
config.consensus.as_ref(),
storage_reader.clone(),
maybe_network_manager.as_mut(),
)?;
let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});

let storage_metrics_handle = spawn_storage_metrics_collector(
config.monitoring_gateway.collect_metrics,
Expand Down Expand Up @@ -301,11 +326,11 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// P2P Sync Server task.
let p2p_sync_server_handle =
spawn_p2p_sync_server(maybe_sync_server_channels, storage_reader.clone());
spawn_p2p_sync_server(maybe_network_manager.as_mut(), storage_reader.clone());

// Sync task.
let sync_client_handle = spawn_sync_client(
maybe_sync_client_channels,
maybe_network_manager.as_mut(),
storage_reader,
storage_writer,
&config,
Expand All @@ -315,6 +340,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
)
.await;

// Created last since it consumes the network manager.
let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});
tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
Expand Down Expand Up @@ -349,58 +381,19 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
return Ok(());
}

type NetworkRunReturn =
(Option<NetworkManager>, Option<P2PSyncClientChannels>, Option<P2PSyncServerChannels>, String);

fn register_to_network(network_config: Option<NetworkConfig>) -> anyhow::Result<NetworkRunReturn> {
fn build_network_manager(
network_config: Option<NetworkConfig>,
) -> anyhow::Result<(Option<NetworkManager>, String)> {
let Some(network_config) = network_config else {
return Ok((None, None, None, "".to_string()));
return Ok((None, "".to_string()));
};
let mut network_manager = network_manager::NetworkManager::new(
let network_manager = network_manager::NetworkManager::new(
network_config.clone(),
Some(VERSION_FULL.to_string()),
);
let local_peer_id = network_manager.get_local_peer_id();

let header_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE);
let class_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE);
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
transaction_client_sender,
class_client_sender,
);

let header_server_receiver = network_manager
.register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE);
let class_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE);
let event_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);
let p2p_sync_server_channels = P2PSyncServerChannels::new(
header_server_receiver,
state_diff_server_receiver,
transaction_server_receiver,
class_server_receiver,
event_server_receiver,
);

Ok((
Some(network_manager),
Some(p2p_sync_client_channels),
Some(p2p_sync_server_channels),
local_peer_id,
))
Ok((Some(network_manager), local_peer_id))
}

// TODO(yair): add dynamic level filtering.
Expand Down

0 comments on commit 4e3a355

Please sign in to comment.