From 70ed8c2c08c61044aea1b37d047ac5064c942d5a Mon Sep 17 00:00:00 2001 From: ShahakShama <70578257+ShahakShama@users.noreply.github.com> Date: Mon, 30 Dec 2024 15:56:58 +0200 Subject: [PATCH] feat(papyrus_p2p_sync): p2p sync client and server return Never on ok (#2984) --- crates/papyrus_node/src/run.rs | 2 +- crates/papyrus_p2p_sync/src/client/mod.rs | 3 ++- crates/papyrus_p2p_sync/src/server/mod.rs | 11 ++++++----- crates/starknet_state_sync/src/runner/mod.rs | 15 +++++++++------ 4 files changed, 18 insertions(+), 13 deletions(-) diff --git a/crates/papyrus_node/src/run.rs b/crates/papyrus_node/src/run.rs index ed9d81c948..d78b1e3ad2 100644 --- a/crates/papyrus_node/src/run.rs +++ b/crates/papyrus_node/src/run.rs @@ -306,7 +306,7 @@ async fn spawn_sync_client( p2p_sync_client_channels, futures::stream::pending().boxed(), ); - tokio::spawn(async move { Ok(p2p_sync.run().await?) }) + tokio::spawn(async move { Ok(p2p_sync.run().await.map(|_never| ())?) }) } } } diff --git a/crates/papyrus_p2p_sync/src/client/mod.rs b/crates/papyrus_p2p_sync/src/client/mod.rs index 8a1a4e3d80..53646f748d 100644 --- a/crates/papyrus_p2p_sync/src/client/mod.rs +++ b/crates/papyrus_p2p_sync/src/client/mod.rs @@ -19,6 +19,7 @@ use std::time::Duration; use class::ClassStreamBuilder; use futures::channel::mpsc::{Receiver, SendError, Sender}; +use futures::never::Never; use futures::stream::BoxStream; use futures::{SinkExt as _, Stream}; use header::HeaderStreamBuilder; @@ -229,7 +230,7 @@ impl P2PSyncClient { } #[instrument(skip(self), level = "debug", err)] - pub async fn run(self) -> Result<(), P2PSyncClientError> { + pub async fn run(self) -> Result { info!("Starting P2P sync client"); let InternalBlocksChannels { diff --git a/crates/papyrus_p2p_sync/src/server/mod.rs b/crates/papyrus_p2p_sync/src/server/mod.rs index 205a10239a..a350466a4a 100644 --- a/crates/papyrus_p2p_sync/src/server/mod.rs +++ b/crates/papyrus_p2p_sync/src/server/mod.rs @@ -1,5 +1,6 @@ use std::fmt::Debug; +use futures::never::Never; use futures::StreamExt; use papyrus_common::pending_classes::ApiContractClass; use papyrus_network::network_manager::{ServerQueryManager, SqmrServerReceiver}; @@ -107,7 +108,7 @@ pub struct P2PSyncServer { } impl P2PSyncServer { - pub async fn run(self) { + pub async fn run(self) -> Never { let P2PSyncServerChannels { mut header_receiver, mut state_diff_receiver, @@ -135,14 +136,14 @@ impl P2PSyncServer { ); register_query(self.storage_reader.clone(), server_query_manager); } - mayber_server_query_manager = class_receiver.next() => { - let server_query_manager = mayber_server_query_manager.expect( + maybe_server_query_manager = class_receiver.next() => { + let server_query_manager = maybe_server_query_manager.expect( "Class queries sender was unexpectedly dropped." ); register_query(self.storage_reader.clone(), server_query_manager); } - mayber_server_query_manager = event_receiver.next() => { - let server_query_manager = mayber_server_query_manager.expect( + maybe_server_query_manager = event_receiver.next() => { + let server_query_manager = maybe_server_query_manager.expect( "Event queries sender was unexpectedly dropped." ); register_query(self.storage_reader.clone(), server_query_manager); diff --git a/crates/starknet_state_sync/src/runner/mod.rs b/crates/starknet_state_sync/src/runner/mod.rs index edb6371fa8..ab2b080967 100644 --- a/crates/starknet_state_sync/src/runner/mod.rs +++ b/crates/starknet_state_sync/src/runner/mod.rs @@ -4,6 +4,7 @@ mod test; use async_trait::async_trait; use futures::channel::mpsc::Receiver; use futures::future::BoxFuture; +use futures::never::Never; use futures::{FutureExt, StreamExt}; use papyrus_network::network_manager::{self, NetworkError}; use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientError}; @@ -21,8 +22,8 @@ use crate::config::StateSyncConfig; pub struct StateSyncRunner { network_future: BoxFuture<'static, Result<(), NetworkError>>, // TODO: change client and server to requester and responder respectively - p2p_sync_client_future: BoxFuture<'static, Result<(), P2PSyncClientError>>, - p2p_sync_server_future: BoxFuture<'static, ()>, + p2p_sync_client_future: BoxFuture<'static, Result>, + p2p_sync_server_future: BoxFuture<'static, Never>, } #[async_trait] @@ -30,11 +31,13 @@ impl ComponentStarter for StateSyncRunner { async fn start(&mut self) -> Result<(), ComponentError> { tokio::select! { result = &mut self.network_future => { - return result.map_err(|_| ComponentError::InternalComponentError); + result.map_err(|_| ComponentError::InternalComponentError) } - result = &mut self.p2p_sync_client_future => return result.map_err(|_| ComponentError::InternalComponentError), - () = &mut self.p2p_sync_server_future => { - return Err(ComponentError::InternalComponentError); + result = &mut self.p2p_sync_client_future => { + result.map_err(|_| ComponentError::InternalComponentError).map(|_never| ()) + } + _never = &mut self.p2p_sync_server_future => { + Err(ComponentError::InternalComponentError) } } }