From e81d338a93d6e0bc58078b75929b1aa4b0141ba0 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Mon, 25 Mar 2024 08:20:50 +0100 Subject: [PATCH] feat: switch tce-lib action to spawn tasks Signed-off-by: Simon Paitrault --- crates/topos-tce/src/app_context/api.rs | 140 ++++++------- crates/topos-tce/src/app_context/network.rs | 197 +------------------ crates/topos-tce/src/app_context/protocol.rs | 88 +++++---- 3 files changed, 121 insertions(+), 304 deletions(-) diff --git a/crates/topos-tce/src/app_context/api.rs b/crates/topos-tce/src/app_context/api.rs index 965087e90..46371bf25 100644 --- a/crates/topos-tce/src/app_context/api.rs +++ b/crates/topos-tce/src/app_context/api.rs @@ -1,5 +1,6 @@ use crate::AppContext; use std::collections::HashMap; +use tokio::spawn; use topos_core::uci::{Certificate, SubnetId}; use topos_metrics::CERTIFICATE_DELIVERY_LATENCY; use topos_tce_api::RuntimeError; @@ -20,79 +21,82 @@ impl AppContext { self.delivery_latency .insert(certificate.id, CERTIFICATE_DELIVERY_LATENCY.start_timer()); - _ = match self - .validator_store - .insert_pending_certificate(&certificate) - .await - { - Ok(Some(pending_id)) => { - let certificate_id = certificate.id; - debug!( - "Certificate {} from subnet {} has been inserted into pending pool", - certificate_id, certificate.source_subnet_id - ); + let validator_store = self.validator_store.clone(); + let double_echo = self.tce_cli.get_double_echo_channel(); - if self - .tce_cli - .get_double_echo_channel() - .send(DoubleEchoCommand::Broadcast { - need_gossip: true, - cert: *certificate, - pending_id, - }) - .await - .is_err() - { - error!( - "Unable to send DoubleEchoCommand::Broadcast command to double \ - echo for {}", - certificate_id + spawn(async move { + _ = match validator_store + .insert_pending_certificate(&certificate) + .await + { + Ok(Some(pending_id)) => { + let certificate_id = certificate.id; + debug!( + "Certificate {} from subnet {} has been inserted into pending pool", + certificate_id, certificate.source_subnet_id ); - sender.send(Err(RuntimeError::CommunicationError( - "Unable to send DoubleEchoCommand::Broadcast command to double \ - echo" - .to_string(), - ))) - } else { - sender.send(Ok(PendingResult::InPending(pending_id))) + if double_echo + .send(DoubleEchoCommand::Broadcast { + need_gossip: true, + cert: *certificate, + pending_id, + }) + .await + .is_err() + { + error!( + "Unable to send DoubleEchoCommand::Broadcast command to \ + double echo for {}", + certificate_id + ); + + sender.send(Err(RuntimeError::CommunicationError( + "Unable to send DoubleEchoCommand::Broadcast command to \ + double echo" + .to_string(), + ))) + } else { + sender.send(Ok(PendingResult::InPending(pending_id))) + } } - } - Ok(None) => { - debug!( - "Certificate {} from subnet {} has been inserted into precedence pool \ - waiting for {}", - certificate.id, certificate.source_subnet_id, certificate.prev_id - ); - sender.send(Ok(PendingResult::AwaitPrecedence)) - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyPending, - )) => { - debug!( - "Certificate {} has already been added to the pending pool, skipping", - certificate.id - ); - sender.send(Ok(PendingResult::AlreadyPending)) - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyExists, - )) => { - debug!( - "Certificate {} has already been delivered, skipping", - certificate.id - ); - sender.send(Ok(PendingResult::AlreadyDelivered)) - } - Err(error) => { - error!( - "Unable to insert pending certificate {}: {}", - certificate.id, error - ); + Ok(None) => { + debug!( + "Certificate {} from subnet {} has been inserted into precedence \ + pool waiting for {}", + certificate.id, certificate.source_subnet_id, certificate.prev_id + ); + sender.send(Ok(PendingResult::AwaitPrecedence)) + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyPending, + )) => { + debug!( + "Certificate {} has already been added to the pending pool, \ + skipping", + certificate.id + ); + sender.send(Ok(PendingResult::AlreadyPending)) + } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyExists, + )) => { + debug!( + "Certificate {} has already been delivered, skipping", + certificate.id + ); + sender.send(Ok(PendingResult::AlreadyDelivered)) + } + Err(error) => { + error!( + "Unable to insert pending certificate {}: {}", + certificate.id, error + ); - sender.send(Err(error.into())) - } - }; + sender.send(Err(error.into())) + } + }; + }); } ApiEvent::GetSourceHead { subnet_id, sender } => { diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index acfdb569b..b451900f8 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -1,204 +1,13 @@ -use prost::Message; -use std::collections::hash_map; -use topos_tce_storage::errors::{InternalStorageError, StorageError}; - -use tokio::spawn; - -use topos_metrics::CERTIFICATE_DELIVERY_LATENCY; use topos_p2p::Event as NetEvent; -use topos_tce_broadcast::DoubleEchoCommand; -use tracing::{debug, error, info, trace}; - -use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready}; -use topos_core::uci; +use tracing::warn; use crate::AppContext; impl AppContext { pub async fn on_net_event(&mut self, evt: NetEvent) { - trace!( + warn!( "on_net_event: peer: {} event {:?}", - &self.network_client.local_peer_id, - &evt + &self.network_client.local_peer_id, &evt ); - - if let NetEvent::Gossip { data, from } = evt { - if let Ok(DoubleEchoRequest { - request: Some(double_echo_request), - }) = DoubleEchoRequest::decode(&data[..]) - { - match double_echo_request { - double_echo_request::Request::Gossip(Gossip { - certificate: Some(certificate), - }) => match uci::Certificate::try_from(certificate) { - Ok(cert) => { - if let hash_map::Entry::Vacant(entry) = - self.delivery_latency.entry(cert.id) - { - entry.insert(CERTIFICATE_DELIVERY_LATENCY.start_timer()); - } - info!( - "Received certificate {} from GossipSub from {}", - cert.id, from - ); - - match self.validator_store.insert_pending_certificate(&cert).await { - Ok(Some(pending_id)) => { - let certificate_id = cert.id; - debug!( - "Certificate {} has been inserted into pending pool", - certificate_id - ); - - if self - .tce_cli - .get_double_echo_channel() - .send(DoubleEchoCommand::Broadcast { - need_gossip: false, - cert, - pending_id, - }) - .await - .is_err() - { - error!( - "Unable to send DoubleEchoCommand::Broadcast command \ - to double echo for {}", - certificate_id - ); - } - } - - Ok(None) => { - debug!( - "Certificate {} from subnet {} has been inserted into \ - precedence pool waiting for {}", - cert.id, cert.source_subnet_id, cert.prev_id - ); - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyPending, - )) => { - debug!( - "Certificate {} has been already added to the pending \ - pool, skipping", - cert.id - ); - } - Err(StorageError::InternalStorage( - InternalStorageError::CertificateAlreadyExists, - )) => { - debug!( - "Certificate {} has been already delivered, skipping", - cert.id - ); - } - Err(error) => { - error!( - "Unable to insert pending certificate {}: {}", - cert.id, error - ); - } - } - } - Err(e) => { - error!("Failed to parse the received Certificate: {e}"); - } - }, - double_echo_request::Request::Echo(Echo { - certificate_id: Some(certificate_id), - signature: Some(signature), - validator_id: Some(validator_id), - }) => { - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - let certificate_id = certificate_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the CertificateId {certificate_id} from \ - Echo: {e}" - ); - e - }); - let validator_id = validator_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the ValidatorId {validator_id} from Echo: {e}" - ); - e - }); - - if let (Ok(certificate_id), Ok(validator_id)) = - (certificate_id, validator_id) - { - trace!( - "Received Echo message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from: {from}", - certificate_id = certificate_id, - validator_id = validator_id - ); - - if let Err(e) = channel - .send(DoubleEchoCommand::Echo { - signature: signature.into(), - certificate_id, - validator_id, - }) - .await - { - error!("Unable to pass received Echo message: {:?}", e); - } - } else { - error!("Unable to process Echo message due to invalid data"); - } - }); - } - double_echo_request::Request::Ready(Ready { - certificate_id: Some(certificate_id), - signature: Some(signature), - validator_id: Some(validator_id), - }) => { - let channel = self.tce_cli.get_double_echo_channel(); - spawn(async move { - let certificate_id = certificate_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the CertificateId {certificate_id} from \ - Ready: {e}" - ); - e - }); - let validator_id = validator_id.clone().try_into().map_err(|e| { - error!( - "Failed to parse the ValidatorId {validator_id} from Ready: \ - {e}" - ); - e - }); - if let (Ok(certificate_id), Ok(validator_id)) = - (certificate_id, validator_id) - { - trace!( - "Received Ready message, certificate_id: {certificate_id}, \ - validator_id: {validator_id} from: {from}", - certificate_id = certificate_id, - validator_id = validator_id - ); - if let Err(e) = channel - .send(DoubleEchoCommand::Ready { - signature: signature.into(), - certificate_id, - validator_id, - }) - .await - { - error!("Unable to pass received Ready message: {:?}", e); - } - } else { - error!("Unable to process Ready message due to invalid data"); - } - }); - } - _ => {} - } - } - } } } diff --git a/crates/topos-tce/src/app_context/protocol.rs b/crates/topos-tce/src/app_context/protocol.rs index 278a13c0a..c516a7dc5 100644 --- a/crates/topos-tce/src/app_context/protocol.rs +++ b/crates/topos-tce/src/app_context/protocol.rs @@ -1,3 +1,4 @@ +use tokio::spawn; use topos_core::api::grpc::tce::v1::{double_echo_request, DoubleEchoRequest, Echo, Gossip, Ready}; use topos_tce_broadcast::event::ProtocolEvents; use tracing::{error, info, warn}; @@ -14,20 +15,22 @@ impl AppContext { ProtocolEvents::Gossip { cert } => { let cert_id = cert.id; - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Gossip(Gossip { - certificate: Some(cert.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Gossip(Gossip { + certificate: Some(cert.into()), + })), + }; - info!("Sending Gossip for certificate {}", cert_id); - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_GOSSIP, request) - .await - { - error!("Unable to send Gossip: {e}"); - } + info!("Sending Gossip for certificate {}", cert_id); + if let Err(e) = network_client + .publish(topos_p2p::TOPOS_GOSSIP, request) + .await + { + error!("Unable to send Gossip: {e}"); + } + }); } ProtocolEvents::Echo { @@ -35,22 +38,21 @@ impl AppContext { signature, validator_id, } if self.is_validator => { - // Send echo message - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Echo(Echo { - certificate_id: Some(certificate_id.into()), - signature: Some(signature.into()), - validator_id: Some(validator_id.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + // Send echo message + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Echo(Echo { + certificate_id: Some(certificate_id.into()), + signature: Some(signature.into()), + validator_id: Some(validator_id.into()), + })), + }; - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_ECHO, request) - .await - { - error!("Unable to send Echo: {e}"); - } + if let Err(e) = network_client.publish(topos_p2p::TOPOS_ECHO, request).await { + error!("Unable to send Echo: {e}"); + } + }); } ProtocolEvents::Ready { @@ -58,21 +60,23 @@ impl AppContext { signature, validator_id, } if self.is_validator => { - let request = DoubleEchoRequest { - request: Some(double_echo_request::Request::Ready(Ready { - certificate_id: Some(certificate_id.into()), - signature: Some(signature.into()), - validator_id: Some(validator_id.into()), - })), - }; + let network_client = self.network_client.clone(); + spawn(async move { + let request = DoubleEchoRequest { + request: Some(double_echo_request::Request::Ready(Ready { + certificate_id: Some(certificate_id.into()), + signature: Some(signature.into()), + validator_id: Some(validator_id.into()), + })), + }; - if let Err(e) = self - .network_client - .publish(topos_p2p::TOPOS_READY, request) - .await - { - error!("Unable to send Ready: {e}"); - } + if let Err(e) = network_client + .publish(topos_p2p::TOPOS_READY, request) + .await + { + error!("Unable to send Ready: {e}"); + } + }); } ProtocolEvents::BroadcastFailed { certificate_id } => { warn!("Broadcast failed for certificate {certificate_id}")