diff --git a/Cargo.lock b/Cargo.lock index ec72ce93fcb..eb3675eaaa3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -10557,6 +10557,7 @@ version = "0.0.0" dependencies = [ "async-trait", "futures", + "mockall", "papyrus_config", "papyrus_network", "papyrus_network_types", diff --git a/crates/starknet_mempool_p2p/Cargo.toml b/crates/starknet_mempool_p2p/Cargo.toml index 5e5b278e499..bf99e84ebe3 100644 --- a/crates/starknet_mempool_p2p/Cargo.toml +++ b/crates/starknet_mempool_p2p/Cargo.toml @@ -26,6 +26,7 @@ validator.workspace = true [dev-dependencies] futures.workspace = true +mockall.workspace = true papyrus_network = { workspace = true, features = ["testing"] } papyrus_network_types = { workspace = true, features = ["testing"] } papyrus_protobuf.workspace = true diff --git a/crates/starknet_mempool_p2p/src/runner/test.rs b/crates/starknet_mempool_p2p/src/runner/test.rs index 175adb74994..f3e3d8739db 100644 --- a/crates/starknet_mempool_p2p/src/runner/test.rs +++ b/crates/starknet_mempool_p2p/src/runner/test.rs @@ -16,79 +16,89 @@ use starknet_api::rpc_transaction::RpcTransaction; use starknet_api::transaction::TransactionHash; use starknet_gateway_types::communication::{GatewayClientError, MockGatewayClient}; use starknet_gateway_types::errors::{GatewayError, GatewaySpecError}; +use starknet_gateway_types::gateway_types::GatewayInput; use starknet_sequencer_infra::component_definitions::ComponentStarter; use super::MempoolP2pRunner; -// The p2p runner receives a tx from network, and forwards it to the gateway. +// The p2p runner receives a tx from network, and successfully forwards it to the gateway. #[tokio::test] async fn incoming_p2p_tx_reaches_gateway_client() { // Mock a network for the other node to send tx to our p2p runner using the subscriber channels. let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } = - subscriber_channels; // used to created our node's p2p runner below, which will listen for incoming txs over broadcasted_messages_receiver. + subscriber_channels; let BroadcastNetworkMock { broadcasted_messages_sender: mut mock_broadcasted_messages_sender, .. - } = mock_network; // other node sending tx to our p2p runner + } = mock_network; // Creating a placeholder network manager with default config for init of a mempool receiver let placeholder_network_manager = NetworkManager::new(NetworkConfig::default(), None); - // send an empty message on this channel to indicate that the tx reached the gateway client. + // Create channels for sending an empty message to indicate that the tx reached the gateway + // client. let (add_tx_indicator_sender, add_tx_indicator_receiver) = futures::channel::oneshot::channel(); + let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng()); + let expected_rpc_transaction = + RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng())); + let gateway_input = GatewayInput { + rpc_tx: expected_rpc_transaction.0.clone(), + message_metadata: Some(message_metadata.clone()), + }; + let mut mock_gateway_client = MockGatewayClient::new(); - mock_gateway_client.expect_add_tx().return_once(move |_| { - add_tx_indicator_sender.send(()).unwrap(); - Ok(TransactionHash::default()) - }); + mock_gateway_client.expect_add_tx().with(mockall::predicate::eq(gateway_input)).return_once( + move |_| { + add_tx_indicator_sender.send(()).unwrap(); + Ok(TransactionHash::default()) + }, + ); let mut mempool_p2p_runner = MempoolP2pRunner::new( Some(placeholder_network_manager), - broadcasted_messages_receiver, // listen to incoming tx - broadcast_topic_client, // broadcast tx or report peer + broadcasted_messages_receiver, + broadcast_topic_client, Arc::new(mock_gateway_client), ); - let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng()); - let expected_rpc_transaction = - RpcTransactionWrapper(RpcTransaction::get_test_instance(&mut get_rng())); - - // Sending the expected transaction to the mempool runner let res = mock_broadcasted_messages_sender.send((expected_rpc_transaction.clone(), message_metadata)); res.await.expect("Failed to send message"); tokio::select! { - // if the runner takes longer than 5 seconds to start, we panic. // if the runner fails, there was a network issue => panic. // if the runner returns successfully, we panic because the runner should never terminate. - res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => {res.expect("Test timed out").expect("Runner failed - network stopped unexpectedly"); panic!("Runner terminated")}, - // if a message was received on this oneshot channel, the gateway client received the tx. + res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => { + res.expect("Test timed out").expect("MempoolP2pRunner failed - network stopped unexpectedly"); + panic!("MempoolP2pRunner terminated"); + }, + // if a message was received on this oneshot channel, the gateway client received the tx and the test succeeded. res = add_tx_indicator_receiver => {res.unwrap()} } } -// The p2p runner receives a tx from network, and fails to forwards it to the gateway. +// The p2p runner receives a tx from network, and the gateway decalines it, triggering report_peer. #[tokio::test] async fn incoming_p2p_tx_fails_on_gateway_client() { // Mock a network for the other node to send tx to our p2p runner using the subscriber channels. let TestSubscriberChannels { mock_network, subscriber_channels } = mock_register_broadcast_topic().expect("Failed to create mock network"); let BroadcastTopicChannels { broadcasted_messages_receiver, broadcast_topic_client } = - subscriber_channels; // used to created our node's p2p runner below, which will listen for incoming txs over broadcasted_messages_receiver. + subscriber_channels; let BroadcastNetworkMock { broadcasted_messages_sender: mut mock_broadcasted_messages_sender, reported_messages_receiver: mut mock_reported_messages_receiver, .. - } = mock_network; // other node sending tx to our p2p runner + } = mock_network; // Creating a placeholder network manager with default config for init of a mempool receiver let placeholder_network_manager = NetworkManager::new(NetworkConfig::default(), None); - // send an empty message on this channel to indicate that the tx reached the gateway client. + // Create channels for sending an empty message to indicate that the tx reached the gateway + // client. let (add_tx_indicator_sender, add_tx_indicator_receiver) = futures::channel::oneshot::channel(); let message_metadata = BroadcastedMessageMetadata::get_test_instance(&mut get_rng()); @@ -111,24 +121,26 @@ async fn incoming_p2p_tx_fails_on_gateway_client() { Arc::new(mock_gateway_client), ); - // Sending the expected transaction to the mempool runner let res = mock_broadcasted_messages_sender .send((expected_rpc_transaction.clone(), message_metadata.clone())); res.await.expect("Failed to send message"); tokio::select! { - // if the runner takes longer than 5 seconds to start, we panic. - // if the runner fails, there was a network issue => panic. - // if the runner returns successfully, we panic because the runner should never terminate. - res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => {res.expect("Test timed out").expect("Runner failed - network stopped unexpectedly"); panic!("Runner terminated")}, - // if a message was received on this oneshot channel, the gateway client received the tx. - res = add_tx_indicator_receiver => { - // if unwrap fails, the tx wasnt forwarded to the gateway client. - res.unwrap(); - // After gateway client fails to add the tx, the p2p runner should have reported the peer. - let peer_reported = mock_reported_messages_receiver.next().await.expect("Failed to receive message"); - assert_eq!(peer_reported, message_metadata.originator_id.private_get_peer_id()) - } + // if the runner fails, there was a network issue => panic. + // if the runner returns successfully, we panic because the runner should never terminate. + res = tokio::time::timeout(Duration::from_secs(5), mempool_p2p_runner.start()) => { + res.expect("Test timed out (MempoolP2pRunner took too long to start)").expect("MempoolP2pRunner failed - network stopped unexpectedly"); + panic!("MempoolP2pRunner terminated"); + }, + // if a message was received on this oneshot channel, the gateway client received the tx. + res = add_tx_indicator_receiver => { + // if unwrap fails, the tx wasn't forwarded to the gateway client. + res.unwrap(); + // After gateway client fails to add the tx, the p2p runner should have reported the peer. + let peer_reported = mock_reported_messages_receiver.next().await.expect("Failed to receive report"); + // TODO: add this functionality to network manager test utils + assert_eq!(peer_reported, message_metadata.originator_id.private_get_peer_id()) + } } }