Skip to content

Commit

Permalink
feat(papyrus_p2p_sync): p2p sync client and server return Never on ok (
Browse files Browse the repository at this point in the history
  • Loading branch information
ShahakShama authored Dec 30, 2024
1 parent afd20c4 commit 70ed8c2
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 13 deletions.
2 changes: 1 addition & 1 deletion crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ async fn spawn_sync_client(
p2p_sync_client_channels,
futures::stream::pending().boxed(),
);
tokio::spawn(async move { Ok(p2p_sync.run().await?) })
tokio::spawn(async move { Ok(p2p_sync.run().await.map(|_never| ())?) })
}
}
}
Expand Down
3 changes: 2 additions & 1 deletion crates/papyrus_p2p_sync/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::time::Duration;

use class::ClassStreamBuilder;
use futures::channel::mpsc::{Receiver, SendError, Sender};
use futures::never::Never;
use futures::stream::BoxStream;
use futures::{SinkExt as _, Stream};
use header::HeaderStreamBuilder;
Expand Down Expand Up @@ -229,7 +230,7 @@ impl P2PSyncClient {
}

#[instrument(skip(self), level = "debug", err)]
pub async fn run(self) -> Result<(), P2PSyncClientError> {
pub async fn run(self) -> Result<Never, P2PSyncClientError> {
info!("Starting P2P sync client");

let InternalBlocksChannels {
Expand Down
11 changes: 6 additions & 5 deletions crates/papyrus_p2p_sync/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use std::fmt::Debug;

use futures::never::Never;
use futures::StreamExt;
use papyrus_common::pending_classes::ApiContractClass;
use papyrus_network::network_manager::{ServerQueryManager, SqmrServerReceiver};
Expand Down Expand Up @@ -107,7 +108,7 @@ pub struct P2PSyncServer {
}

impl P2PSyncServer {
pub async fn run(self) {
pub async fn run(self) -> Never {
let P2PSyncServerChannels {
mut header_receiver,
mut state_diff_receiver,
Expand Down Expand Up @@ -135,14 +136,14 @@ impl P2PSyncServer {
);
register_query(self.storage_reader.clone(), server_query_manager);
}
mayber_server_query_manager = class_receiver.next() => {
let server_query_manager = mayber_server_query_manager.expect(
maybe_server_query_manager = class_receiver.next() => {
let server_query_manager = maybe_server_query_manager.expect(
"Class queries sender was unexpectedly dropped."
);
register_query(self.storage_reader.clone(), server_query_manager);
}
mayber_server_query_manager = event_receiver.next() => {
let server_query_manager = mayber_server_query_manager.expect(
maybe_server_query_manager = event_receiver.next() => {
let server_query_manager = maybe_server_query_manager.expect(
"Event queries sender was unexpectedly dropped."
);
register_query(self.storage_reader.clone(), server_query_manager);
Expand Down
15 changes: 9 additions & 6 deletions crates/starknet_state_sync/src/runner/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ mod test;
use async_trait::async_trait;
use futures::channel::mpsc::Receiver;
use futures::future::BoxFuture;
use futures::never::Never;
use futures::{FutureExt, StreamExt};
use papyrus_network::network_manager::{self, NetworkError};
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientChannels, P2PSyncClientError};
Expand All @@ -21,20 +22,22 @@ use crate::config::StateSyncConfig;
pub struct StateSyncRunner {
network_future: BoxFuture<'static, Result<(), NetworkError>>,
// TODO: change client and server to requester and responder respectively
p2p_sync_client_future: BoxFuture<'static, Result<(), P2PSyncClientError>>,
p2p_sync_server_future: BoxFuture<'static, ()>,
p2p_sync_client_future: BoxFuture<'static, Result<Never, P2PSyncClientError>>,
p2p_sync_server_future: BoxFuture<'static, Never>,
}

#[async_trait]
impl ComponentStarter for StateSyncRunner {
async fn start(&mut self) -> Result<(), ComponentError> {
tokio::select! {
result = &mut self.network_future => {
return result.map_err(|_| ComponentError::InternalComponentError);
result.map_err(|_| ComponentError::InternalComponentError)
}
result = &mut self.p2p_sync_client_future => return result.map_err(|_| ComponentError::InternalComponentError),
() = &mut self.p2p_sync_server_future => {
return Err(ComponentError::InternalComponentError);
result = &mut self.p2p_sync_client_future => {
result.map_err(|_| ComponentError::InternalComponentError).map(|_never| ())
}
_never = &mut self.p2p_sync_server_future => {
Err(ComponentError::InternalComponentError)
}
}
}
Expand Down

0 comments on commit 70ed8c2

Please sign in to comment.