diff --git a/Cargo.lock b/Cargo.lock index 801d60deb3..6708a46367 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10611,6 +10611,7 @@ dependencies = [ "async-trait", "futures", "libp2p", + "mockall", "papyrus_config", "papyrus_network", "papyrus_network_types", diff --git a/crates/starknet_mempool_p2p/Cargo.toml b/crates/starknet_mempool_p2p/Cargo.toml index d934a4241f..e6f8e892f4 100644 --- a/crates/starknet_mempool_p2p/Cargo.toml +++ b/crates/starknet_mempool_p2p/Cargo.toml @@ -27,6 +27,7 @@ validator.workspace = true [dev-dependencies] futures.workspace = true libp2p.workspace = true +mockall.workspace = true papyrus_network = { workspace = true, features = ["testing"] } papyrus_network_types = { workspace = true, features = ["testing"] } papyrus_protobuf.workspace = true diff --git a/crates/starknet_mempool_p2p/src/runner/test.rs b/crates/starknet_mempool_p2p/src/runner/test.rs index 5ad655c6fd..44f2c6f9e8 100644 --- a/crates/starknet_mempool_p2p/src/runner/test.rs +++ b/crates/starknet_mempool_p2p/src/runner/test.rs @@ -1,8 +1,6 @@ use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; -use futures::channel::mpsc::Sender; use futures::future::{pending, ready, BoxFuture}; use futures::stream::StreamExt; use futures::{FutureExt, SinkExt}; @@ -17,10 +15,10 @@ use papyrus_protobuf::mempool::RpcTransactionWrapper; use papyrus_test_utils::{get_rng, GetTestInstance}; use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; -use starknet_gateway_types::communication::{GatewayClient, GatewayClientResult}; +use starknet_gateway_types::communication::{GatewayClient, GatewayClientError, MockGatewayClient}; +use starknet_gateway_types::errors::{GatewayError, GatewaySpecError}; use starknet_gateway_types::gateway_types::GatewayInput; use starknet_sequencer_infra::component_definitions::ComponentStarter; -use tokio::time::sleep; use super::MempoolP2pRunner; @@ -44,8 +42,7 @@ fn setup( #[test] fn run_returns_when_network_future_returns() { let network_future = ready(Ok(())).boxed(); - let gateway_client = - Arc::new(MockGatewayClient { add_tx_sender: futures::channel::mpsc::channel(1).0 }); + let gateway_client = Arc::new(MockGatewayClient::new()); let (mut mempool_p2p_runner, _) = setup(network_future, gateway_client); mempool_p2p_runner.start().now_or_never().unwrap().unwrap(); } @@ -54,53 +51,110 @@ fn run_returns_when_network_future_returns() { fn run_returns_error_when_network_future_returns_error() { let network_future = ready(Err(NetworkError::DialError(libp2p::swarm::DialError::Aborted))).boxed(); - let gateway_client = - Arc::new(MockGatewayClient { add_tx_sender: futures::channel::mpsc::channel(1).0 }); + let gateway_client = Arc::new(MockGatewayClient::new()); let (mut mempool_p2p_runner, _) = setup(network_future, gateway_client); mempool_p2p_runner.start().now_or_never().unwrap().unwrap_err(); } -// TODO(eitan): Make it an automock -#[derive(Clone)] -struct MockGatewayClient { - add_tx_sender: Sender, -} +#[tokio::test] +async fn incoming_p2p_tx_reaches_gateway_client() { + let network_future = pending().boxed(); + + // Create channels for sending an empty message to indicate that the tx reached the gateway + // client. + let (add_tx_indicator_sender, add_tx_indicator_receiver) = futures::channel::oneshot::channel(); + + let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng()); + let expected_rpc_transaction = + RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng())); + let gateway_input = GatewayInput { + rpc_tx: expected_rpc_transaction.0.clone(), + message_metadata: Some(message_metadata.clone()), + }; + + let mut mock_gateway_client = MockGatewayClient::new(); + mock_gateway_client.expect_add_tx().with(mockall::predicate::eq(gateway_input)).return_once( + move |_| { + add_tx_indicator_sender.send(()).unwrap(); + Ok(TransactionHash::default()) + }, + ); + let (mut mempool_p2p_runner, mock_network) = + setup(network_future, Arc::new(mock_gateway_client)); + + let BroadcastNetworkMock { + broadcasted_messages_sender: mut mock_broadcasted_messages_sender, + .. + } = mock_network; + + let res = + mock_broadcasted_messages_sender.send((expected_rpc_transaction.clone(), message_metadata)); -#[async_trait] -impl GatewayClient for MockGatewayClient { - async fn add_tx(&self, gateway_input: GatewayInput) -> GatewayClientResult { - let _ = self.clone().add_tx_sender.send(gateway_input.rpc_tx).await; - Ok(TransactionHash::default()) + res.await.expect("Failed to send message"); + + tokio::select! { + // if the runner fails, there was a network issue => panic. + // if the runner returns successfully, we panic because the runner should never terminate. + res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => { + res.expect("Test timed out").expect("MempoolP2pRunner failed - network stopped unexpectedly"); + panic!("MempoolP2pRunner terminated"); + }, + // if a message was received on this oneshot channel, the gateway client received the tx and the test succeeded. + res = add_tx_indicator_receiver => {res.unwrap()} } } +// The p2p runner receives a tx from network, and the gateway declines it, triggering report_peer. #[tokio::test] -async fn start_component_receive_tx_happy_flow() { +async fn incoming_p2p_tx_fails_on_gateway_client() { let network_future = pending().boxed(); - let (add_tx_sender, mut add_tx_receiver) = futures::channel::mpsc::channel(1); - let mock_gateway_client = Arc::new(MockGatewayClient { add_tx_sender }); - let (mut mempool_p2p_runner, mock_network) = setup(network_future, mock_gateway_client); + // Create channels for sending an empty message to indicate that the tx reached the gateway + // client. + let (add_tx_indicator_sender, add_tx_indicator_receiver) = futures::channel::oneshot::channel(); + + let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng()); + let message_metadata_clone = message_metadata.clone(); + let expected_rpc_transaction = + RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng())); + + let mut mock_gateway_client = MockGatewayClient::new(); + mock_gateway_client.expect_add_tx().return_once(move |_| { + add_tx_indicator_sender.send(()).unwrap(); + Err(GatewayClientError::GatewayError(GatewayError::GatewaySpecError { + source: GatewaySpecError::DuplicateTx, + p2p_message_metadata: Some(message_metadata_clone), + })) + }); + + let (mut mempool_p2p_runner, mock_network) = + setup(network_future, Arc::new(mock_gateway_client)); + let BroadcastNetworkMock { broadcasted_messages_sender: mut mock_broadcasted_messages_sender, + reported_messages_receiver: mut mock_reported_messages_receiver, .. } = mock_network; - let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng()); - let expected_rpc_transaction = - RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng())); - // Sending the expected transaction to the mempool receiver - let res = - mock_broadcasted_messages_sender.send((expected_rpc_transaction.clone(), message_metadata)); + let res = mock_broadcasted_messages_sender + .send((expected_rpc_transaction.clone(), message_metadata.clone())); res.await.expect("Failed to send message"); + tokio::select! { - _ = mempool_p2p_runner.start() => {panic!("Mempool receiver failed to start");} - actual_rpc_transaction = add_tx_receiver.next() => { - assert_eq!(actual_rpc_transaction, Some(expected_rpc_transaction.0)); - } - _ = sleep(Duration::from_secs(5)) => { - panic!("Test timed out"); + // if the runner fails, there was a network issue => panic. + // if the runner returns successfully, we panic because the runner should never terminate. + res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => { + res.expect("Test timed out (MempoolP2pRunner took too long to start)").expect("MempoolP2pRunner failed - network stopped unexpectedly"); + panic!("MempoolP2pRunner terminated"); + }, + // if a message was received on this oneshot channel, the gateway client received the tx. + res = add_tx_indicator_receiver => { + // if unwrap fails, the tx wasn't forwarded to the gateway client. + res.unwrap(); + // After gateway client fails to add the tx, the p2p runner should have reported the peer. + let peer_reported = mock_reported_messages_receiver.next().await.expect("Failed to receive report"); + // TODO: add this functionality to network manager test utils + assert_eq!(peer_reported, message_metadata.originator_id.private_get_peer_id()) } } } -// TODO(eitan): Add test for when the gateway client fails to add the transaction