Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(node): network channel creation is done by users #863

Merged
merged 1 commit into from
Sep 19, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 58 additions & 65 deletions crates/papyrus_node/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ async fn run_sync(
}

async fn spawn_sync_client(
maybe_sync_client_channels: Option<P2PSyncClientChannels>,
maybe_network_manager: Option<&mut NetworkManager>,
storage_reader: StorageReader,
storage_writer: StorageWriter,
config: &NodeConfig,
Expand All @@ -214,25 +214,61 @@ async fn spawn_sync_client(
))
}
(None, Some(p2p_sync_client_config)) => {
let network_manager = maybe_network_manager
.expect("If p2p sync is enabled, network needs to be enabled too");
let header_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE);
let class_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE);
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
transaction_client_sender,
class_client_sender,
);
let p2p_sync = P2PSyncClient::new(
p2p_sync_client_config,
storage_reader,
storage_writer,
maybe_sync_client_channels
.expect("If p2p sync is enabled, network needs to be enabled too"),
p2p_sync_client_channels,
);
tokio::spawn(async move { Ok(p2p_sync.run().await?) })
}
}
}

fn spawn_p2p_sync_server(
maybe_sync_server_channels: Option<P2PSyncServerChannels>,
network_manager: Option<&mut NetworkManager>,
storage_reader: StorageReader,
) -> JoinHandle<anyhow::Result<()>> {
let Some(p2p_sync_server_channels) = maybe_sync_server_channels else {
let Some(network_manager) = network_manager else {
info!("P2P Sync is disabled.");
return tokio::spawn(pending());
};

let header_server_receiver = network_manager
.register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE);
let class_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE);
let event_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);

let p2p_sync_server_channels = P2PSyncServerChannels::new(
header_server_receiver,
state_diff_server_receiver,
transaction_server_receiver,
class_server_receiver,
event_server_receiver,
);

let p2p_sync_server = P2PSyncServer::new(storage_reader.clone(), p2p_sync_server_channels);
tokio::spawn(async move { Ok(p2p_sync_server.run().await) })
}
Expand All @@ -253,24 +289,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
}));
let pending_classes = Arc::new(RwLock::new(PendingClasses::default()));

// P2P network.
let (
mut maybe_network_manager,
maybe_sync_client_channels,
maybe_sync_server_channels,
local_peer_id,
) = register_to_network(config.network.clone())?;
let (mut maybe_network_manager, local_peer_id) = build_network_manager(config.network.clone())?;

let consensus_handle = run_consensus(
config.consensus.as_ref(),
storage_reader.clone(),
maybe_network_manager.as_mut(),
)?;
let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});

let storage_metrics_handle = spawn_storage_metrics_collector(
config.monitoring_gateway.collect_metrics,
Expand Down Expand Up @@ -301,11 +326,11 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {

// P2P Sync Server task.
let p2p_sync_server_handle =
spawn_p2p_sync_server(maybe_sync_server_channels, storage_reader.clone());
spawn_p2p_sync_server(maybe_network_manager.as_mut(), storage_reader.clone());

// Sync task.
let sync_client_handle = spawn_sync_client(
maybe_sync_client_channels,
maybe_network_manager.as_mut(),
storage_reader,
storage_writer,
&config,
Expand All @@ -315,6 +340,13 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
)
.await;

// Created last since it consumes the network manager.
let network_handle = tokio::spawn(async move {
match maybe_network_manager {
Some(manager) => manager.run().boxed().await,
None => pending().boxed().await,
}
});
tokio::select! {
res = storage_metrics_handle => {
error!("collecting storage metrics stopped.");
Expand Down Expand Up @@ -349,58 +381,19 @@ async fn run_threads(config: NodeConfig) -> anyhow::Result<()> {
return Ok(());
}

type NetworkRunReturn =
(Option<NetworkManager>, Option<P2PSyncClientChannels>, Option<P2PSyncServerChannels>, String);

fn register_to_network(network_config: Option<NetworkConfig>) -> anyhow::Result<NetworkRunReturn> {
fn build_network_manager(
network_config: Option<NetworkConfig>,
) -> anyhow::Result<(Option<NetworkManager>, String)> {
let Some(network_config) = network_config else {
return Ok((None, None, None, "".to_string()));
return Ok((None, "".to_string()));
};
let mut network_manager = network_manager::NetworkManager::new(
let network_manager = network_manager::NetworkManager::new(
network_config.clone(),
Some(VERSION_FULL.to_string()),
);
let local_peer_id = network_manager.get_local_peer_id();

let header_client_sender = network_manager
.register_sqmr_protocol_client(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Transaction.into(), BUFFER_SIZE);
let class_client_sender =
network_manager.register_sqmr_protocol_client(Protocol::Class.into(), BUFFER_SIZE);
let p2p_sync_client_channels = P2PSyncClientChannels::new(
header_client_sender,
state_diff_client_sender,
transaction_client_sender,
class_client_sender,
);

let header_server_receiver = network_manager
.register_sqmr_protocol_server(Protocol::SignedBlockHeader.into(), BUFFER_SIZE);
let state_diff_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::StateDiff.into(), BUFFER_SIZE);
let transaction_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Transaction.into(), BUFFER_SIZE);
let class_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Class.into(), BUFFER_SIZE);
let event_server_receiver =
network_manager.register_sqmr_protocol_server(Protocol::Event.into(), BUFFER_SIZE);
let p2p_sync_server_channels = P2PSyncServerChannels::new(
header_server_receiver,
state_diff_server_receiver,
transaction_server_receiver,
class_server_receiver,
event_server_receiver,
);

Ok((
Some(network_manager),
Some(p2p_sync_client_channels),
Some(p2p_sync_server_channels),
local_peer_id,
))
Ok((Some(network_manager), local_peer_id))
}

// TODO(yair): add dynamic level filtering.
Expand Down
Loading