diff --git a/crates/topos-tce/src/app_context/api.rs b/crates/topos-tce/src/app_context/api.rs index 965087e90..46371bf25 100644 --- a/crates/topos-tce/src/app_context/api.rs +++ b/crates/topos-tce/src/app_context/api.rs @@ -1,5 +1,6 @@ use crate::AppContext; use std::collections::HashMap; +use tokio::spawn; use topos_core::uci::{Certificate, SubnetId}; use topos_metrics::CERTIFICATE_DELIVERY_LATENCY; use topos_tce_api::RuntimeError; @@ -20,79 +21,82 @@ impl AppContext { self.delivery_latency .insert(certificate.id, CERTIFICATE_DELIVERY_LATENCY.start_timer()); - _ = match self - .validator_store - .insert_pending_certificate(&certificate) - .await - { - Ok(Some(pending_id)) => { - let certificate_id = certificate.id; - debug!( - "Certificate {} from subnet {} has been inserted into pending pool", - certificate_id, certificate.source_subnet_id - ); + let validator_store = self.validator_store.clone(); + let double_echo = self.tce_cli.get_double_echo_channel(); - if self - .tce_cli - .get_double_echo_channel() - .send(DoubleEchoCommand::Broadcast { - need_gossip: true, - cert: *certificate, - pending_id, - }) - .await - .is_err() - { - error!( - "Unable to send DoubleEchoCommand::Broadcast command to double \ - echo for {}", - certificate_id + spawn(async move { + _ = match validator_store + .insert_pending_certificate(&certificate) + .await + { + Ok(Some(pending_id)) => { + let certificate_id = certificate.id; + debug!( + "Certificate {} from subnet {} has been inserted into pending pool", + certificate_id, certificate.source_subnet_id ); - sender.send(Err(RuntimeError::CommunicationError( - "Unable to send DoubleEchoCommand::Broadcast command to double \ - echo" - .to_string(), - ))) - } else { - sender.send(Ok(PendingResult::InPending(pending_id))) + if double_echo + .send(DoubleEchoCommand::Broadcast { + need_gossip: true, + cert: *certificate, + pending_id, + }) + .await + .is_err() + { + error!( + "Unable to send DoubleEchoCommand::Broadcast command to \ + double echo for {}", + certificate_id + ); + + sender.send(Err(RuntimeError::CommunicationError( + "Unable to send DoubleEchoCommand::Broadcast command to \ + double echo" + .to_string(), + ))) + } else { + sender.send(Ok(PendingResult::InPending(pending_id))) + } } - } - Ok(None) => { - debug!( - "Certificate {} from subnet {} has been inserted into precedence pool \ - waiting for {}", - certificate.id, certificate.source_subnet_id, certificate.prev_id - ); - sender.send(Ok(PendingResult::AwaitPrecedence)) - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyPending, - )) => { - debug!( - "Certificate {} has already been added to the pending pool, skipping", - certificate.id - ); - sender.send(Ok(PendingResult::AlreadyPending)) - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyExists, - )) => { - debug!( - "Certificate {} has already been delivered, skipping", - certificate.id - ); - sender.send(Ok(PendingResult::AlreadyDelivered)) - } - Err(error) => { - error!( - "Unable to insert pending certificate {}: {}", - certificate.id, error - ); + Ok(None) => { + debug!( + "Certificate {} from subnet {} has been inserted into precedence \ + pool waiting for {}", + certificate.id, certificate.source_subnet_id, certificate.prev_id + ); + sender.send(Ok(PendingResult::AwaitPrecedence)) + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyPending, + )) => { + debug!( + "Certificate {} has already been added to the pending pool, \ + skipping", + certificate.id + ); + sender.send(Ok(PendingResult::AlreadyPending)) + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyExists, + )) => { + debug!( + "Certificate {} has already been delivered, skipping", + certificate.id + ); + sender.send(Ok(PendingResult::AlreadyDelivered)) + } + Err(error) => { + error!( + "Unable to insert pending certificate {}: {}", + certificate.id, error + ); - sender.send(Err(error.into())) - } - }; + sender.send(Err(error.into())) + } + }; + }); } ApiEvent::GetSourceHead { subnet_id, sender } => { diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs index 278a13c0a..c516a7dc5 100644 --- a/crates/topos-tce/src/app_context/protocol.rs +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -1,3 +1,4 @@ +use tokio::spawn; use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready}; use topos_tce_broadcast::event::ProtocolEvents; use tracing::{error, info, warn}; @@ -14,20 +15,22 @@ impl AppContext { ProtocolEvents::Gossip { cert } => { let cert_id = cert.id; - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Gossip(Gossip { - certificate: Some(cert.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Gossip(Gossip { + certificate: Some(cert.into()), + })), + }; - info!("Sending Gossip for certificate {}", cert_id); - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_GOSSIP, request) - .await - { - error!("Unable to send Gossip: {e}"); - } + info!("Sending Gossip for certificate {}", cert_id); + if let Err(e) = network_client + .publish(topos_p2p::TOPOS_GOSSIP, request) + .await + { + error!("Unable to send Gossip: {e}"); + } + }); } ProtocolEvents::Echo { @@ -35,22 +38,21 @@ impl AppContext { signature, validator_id, } if self.is_validator => { - // Send echo message - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Echo(Echo { - certificate_id: Some(certificate_id.into()), - signature: Some(signature.into()), - validator_id: Some(validator_id.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + // Send echo message + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Echo(Echo { + certificate_id: Some(certificate_id.into()), + signature: Some(signature.into()), + validator_id: Some(validator_id.into()), + })), + }; - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_ECHO, request) - .await - { - error!("Unable to send Echo: {e}"); - } + if let Err(e) = network_client.publish(topos_p2p::TOPOS_ECHO, request).await { + error!("Unable to send Echo: {e}"); + } + }); } ProtocolEvents::Ready { @@ -58,21 +60,23 @@ impl AppContext { signature, validator_id, } if self.is_validator => { - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Ready(Ready { - certificate_id: Some(certificate_id.into()), - signature: Some(signature.into()), - validator_id: Some(validator_id.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Ready(Ready { + certificate_id: Some(certificate_id.into()), + signature: Some(signature.into()), + validator_id: Some(validator_id.into()), + })), + }; - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_READY, request) - .await - { - error!("Unable to send Ready: {e}"); - } + if let Err(e) = network_client + .publish(topos_p2p::TOPOS_READY, request) + .await + { + error!("Unable to send Ready: {e}"); + } + }); } ProtocolEvents::BroadcastFailed { certificate_id } => { warn!("Broadcast failed for certificate {certificate_id}") diff --git a/crates/topos-tce/src/tests/mod.rs b/crates/topos-tce/src/tests/mod.rs index 39ea06d8b..bc680743a 100644 --- a/crates/topos-tce/src/tests/mod.rs +++ b/crates/topos-tce/src/tests/mod.rs @@ -1,11 +1,12 @@ use libp2p::PeerId; use rstest::{fixture, rstest}; -use std::{collections::HashSet, future::IntoFuture, sync::Arc}; +use std::{collections::HashSet, future::IntoFuture, sync::Arc, time::Duration}; use tokio_stream::Stream; use topos_tce_api::RuntimeEvent; use topos_tce_broadcast::event::ProtocolEvents; use topos_tce_gatekeeper::Gatekeeper; +use test_log::test; use tokio::sync::{broadcast, mpsc}; use topos_crypto::messages::MessageSigner; use topos_p2p::{utils::GrpcOverP2P, NetworkClient}; @@ -24,7 +25,8 @@ mod api; mod network; #[rstest] -#[tokio::test] +#[test(tokio::test)] +#[timeout(Duration::from_secs(1))] async fn non_validator_publish_gossip( #[future] setup_test: ( AppContext, @@ -33,21 +35,25 @@ async fn non_validator_publish_gossip( ), ) { let (mut context, mut p2p_receiver, _) = setup_test.await; + let certificates = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 1); + context .on_protocol_event(ProtocolEvents::Gossip { cert: certificates[0].certificate.clone(), }) .await; + let x = p2p_receiver.recv().await; assert!(matches!( - p2p_receiver.try_recv(), - Ok(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip" + x, + Some(topos_p2p::Command::Gossip { topic, .. }) if topic == "topos_gossip" )); } #[rstest] -#[tokio::test] +#[test(tokio::test)] +#[timeout(Duration::from_secs(1))] async fn non_validator_do_not_publish_echo( #[future] setup_test: ( AppContext, @@ -64,11 +70,16 @@ async fn non_validator_do_not_publish_echo( }) .await; - assert!(p2p_receiver.try_recv().is_err(),); + assert!( + tokio::time::timeout(Duration::from_millis(10), p2p_receiver.recv()) + .await + .is_err() + ); } #[rstest] #[tokio::test] +#[timeout(Duration::from_secs(1))] async fn non_validator_do_not_publish_ready( #[future] setup_test: ( AppContext, @@ -77,6 +88,7 @@ async fn non_validator_do_not_publish_ready( ), ) { let (mut context, mut p2p_receiver, message_signer) = setup_test.await; + context .on_protocol_event(ProtocolEvents::Ready { certificate_id: CERTIFICATE_ID_1, @@ -98,6 +110,7 @@ pub async fn setup_test( Arc, ) { let validator_store = create_validator_store.await; + let is_validator = false; let message_signer = Arc::new(MessageSigner::new(&[5u8; 32]).unwrap()); let validator_id = message_signer.public_address.into(); @@ -128,6 +141,7 @@ pub async fn setup_test( }; let (api_context, _api_stream) = create_public_api.await; + let api_client = api_context.client; let (gatekeeper_client, _) = Gatekeeper::builder().into_future().await.unwrap();