diff --git a/Cargo.lock b/Cargo.lock index edfee04d6..dbd61ab1a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7746,6 +7746,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "topos-core", "topos-crypto", "topos-metrics", diff --git a/crates/topos-p2p/src/config.rs b/crates/topos-p2p/src/config.rs index a223e8676..4e32456e5 100644 --- a/crates/topos-p2p/src/config.rs +++ b/crates/topos-p2p/src/config.rs @@ -7,6 +7,7 @@ pub struct NetworkConfig { pub discovery: DiscoveryConfig, pub yamux_max_buffer_size: usize, pub yamux_window_size: Option, + pub is_bootnode: bool, } impl Default for NetworkConfig { @@ -18,6 +19,7 @@ impl Default for NetworkConfig { discovery: Default::default(), yamux_max_buffer_size: usize::MAX, yamux_window_size: None, + is_bootnode: false, } } } diff --git a/crates/topos-p2p/src/network.rs b/crates/topos-p2p/src/network.rs index f6f2ebbde..ccf3dfe19 100644 --- a/crates/topos-p2p/src/network.rs +++ b/crates/topos-p2p/src/network.rs @@ -79,14 +79,14 @@ impl<'a> NetworkBuilder<'a> { self } - pub fn public_addresses(mut self, addresses: Vec) -> Self { - self.public_addresses = Some(addresses); + pub fn public_addresses>>(mut self, addresses: M) -> Self { + self.public_addresses = Some(addresses.into()); self } - pub fn listen_addresses(mut self, addresses: Vec) -> Self { - self.listen_addresses = Some(addresses); + pub fn listen_addresses>>(mut self, addresses: M) -> Self { + self.listen_addresses = Some(addresses.into()); self } @@ -200,7 +200,6 @@ impl<'a> NetworkBuilder<'a> { swarm, config: self.config, peer_set: self.known_peers.iter().map(|(p, _)| *p).collect(), - is_boot_node: self.known_peers.is_empty(), command_receiver, event_sender, local_peer_id: peer_id, @@ -213,4 +212,10 @@ impl<'a> NetworkBuilder<'a> { }, )) } + + pub fn is_bootnode(mut self, is_bootnode: bool) -> Self { + self.config.is_bootnode = is_bootnode; + + self + } } diff --git a/crates/topos-p2p/src/runtime/mod.rs b/crates/topos-p2p/src/runtime/mod.rs index 721e3d662..eee1622a2 100644 --- a/crates/topos-p2p/src/runtime/mod.rs +++ b/crates/topos-p2p/src/runtime/mod.rs @@ -29,7 +29,6 @@ pub struct Runtime { pub(crate) listening_on: Vec, pub(crate) public_addresses: Vec, pub(crate) bootstrapped: bool, - pub(crate) is_boot_node: bool, /// Contains current listenerId of the swarm pub active_listeners: HashSet, @@ -74,22 +73,16 @@ impl Runtime { } } - debug!("Starting a boot node ? {:?}", self.is_boot_node); - if !self.is_boot_node { + debug!("Starting a boot node ? {:?}", self.config.is_bootnode); + if !self.config.is_bootnode { // First we need to be known and known some peers before publishing our addresses to // the network. let mut publish_retry = self.config.publish_retry; // We were able to send the DHT query, starting the bootstrap // We may want to remove the bootstrap at some point - if self - .swarm - .behaviour_mut() - .discovery - .inner - .bootstrap() - .is_err() - { + if let Err(error) = self.swarm.behaviour_mut().discovery.inner.bootstrap() { + error!("Unable to start kademlia bootstrap: {error:?}"); return Err(Box::new(P2PError::BootstrapError( "Unable to start kademlia bootstrap", ))); diff --git a/crates/topos-p2p/src/tests/command/random_peer.rs b/crates/topos-p2p/src/tests/command/random_peer.rs index dee0f34d9..f8e1fb768 100644 --- a/crates/topos-p2p/src/tests/command/random_peer.rs +++ b/crates/topos-p2p/src/tests/command/random_peer.rs @@ -15,8 +15,11 @@ async fn no_random_peer() { let (client, _, runtime) = crate::network::builder() .peer_key(local.keypair.clone()) + .public_addresses(&[local.addr.clone()]) + .listen_addresses(&[local.addr.clone()]) .public_addresses(vec![local.addr.clone()]) .listen_addresses(vec![local.addr.clone()]) + .is_bootnode(true) .build() .await .expect("Unable to create p2p network"); @@ -48,6 +51,7 @@ async fn return_a_peer() { .peer_key(local.keypair.clone()) .public_addresses(vec![local.addr.clone()]) .listen_addresses(vec![local.addr.clone()]) + .is_bootnode(true) .build() .await .expect("Unable to create p2p network"); @@ -77,6 +81,7 @@ async fn return_a_random_peer_among_100() { .peer_key(local.keypair.clone()) .public_addresses(vec![local.addr.clone()]) .listen_addresses(vec![local.addr.clone()]) + .is_bootnode(true) .build() .await .expect("Unable to create p2p network"); diff --git a/crates/topos-tce-broadcast/Cargo.toml b/crates/topos-tce-broadcast/Cargo.toml index c0a880ccc..05d29aa2e 100644 --- a/crates/topos-tce-broadcast/Cargo.toml +++ b/crates/topos-tce-broadcast/Cargo.toml @@ -16,6 +16,7 @@ serde.workspace = true thiserror.workspace = true tokio = { workspace = true, features = ["full"] } tokio-stream = { workspace = true, features = ["sync"] } +tokio-util.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } tracing.workspace = true tce_transport = { package = "topos-tce-transport", path = "../topos-tce-transport"} diff --git a/crates/topos-tce-broadcast/src/constant.rs b/crates/topos-tce-broadcast/src/constant.rs index 01dbfc5d6..30e46f24d 100644 --- a/crates/topos-tce-broadcast/src/constant.rs +++ b/crates/topos-tce-broadcast/src/constant.rs @@ -19,18 +19,6 @@ lazy_static! { .ok() .and_then(|s| s.parse().ok()) .unwrap_or(2048); - /// Size of the channel to send updated subscriptions views to the double echo - pub static ref SUBSCRIPTION_VIEW_CHANNEL_SIZE: usize = - std::env::var("TOPOS_SUBSCRIPTION_VIEW_CHANNEL_SIZE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(2048); - /// Size of the channel to send updated subscriptions views to the double echo - pub static ref BROADCAST_TASK_COMPLETION_CHANNEL_SIZE: usize = - std::env::var("BROADCAST_TASK_COMPLETION_CHANNEL_SIZE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(2048); /// Capacity alert threshold for the double echo command channel pub static ref COMMAND_CHANNEL_CAPACITY: usize = COMMAND_CHANNEL_SIZE .checked_mul(10) @@ -39,10 +27,10 @@ lazy_static! { r }) .unwrap_or(*COMMAND_CHANNEL_SIZE); - /// Size of the double echo buffer - pub static ref TOPOS_DOUBLE_ECHO_MAX_BUFFER_SIZE: usize = - std::env::var("TOPOS_BROADCAST_MAX_BUFFER_SIZE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(crate::double_echo::DoubleEcho::MAX_BUFFER_SIZE); + /// + pub static ref PENDING_LIMIT_PER_REQUEST_TO_STORAGE: usize = + std::env::var("TOPOS_PENDING_LIMIT_PER_REQUEST_TO_STORAGE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(1000); } diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 458997aa1..85efed9fa 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -19,6 +19,7 @@ use std::collections::HashSet; use std::sync::Arc; use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; use tokio::sync::{broadcast, mpsc, oneshot}; +use tokio_util::sync::CancellationToken; use topos_core::{ types::ValidatorId, uci::{Certificate, CertificateId}, @@ -52,6 +53,8 @@ pub struct DoubleEcho { pub validators: HashSet, pub validator_store: Arc, pub broadcast_sender: broadcast::Sender, + + pub task_manager_cancellation: CancellationToken, } impl DoubleEcho { @@ -86,6 +89,7 @@ impl DoubleEcho { shutdown, validator_store, broadcast_sender, + task_manager_cancellation: CancellationToken::new(), } } @@ -95,7 +99,7 @@ impl DoubleEcho { ) -> mpsc::Receiver<(CertificateId, TaskStatus)> { let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048); - let (task_manager, shutdown_receiver) = crate::task_manager::TaskManager::new( + let task_manager = crate::task_manager::TaskManager::new( task_manager_message_receiver, task_completion_sender, self.subscriptions.clone(), @@ -107,7 +111,7 @@ impl DoubleEcho { self.broadcast_sender.clone(), ); - tokio::spawn(task_manager.run(shutdown_receiver)); + tokio::spawn(task_manager.run(self.task_manager_cancellation.child_token())); task_completion_receiver } @@ -133,14 +137,13 @@ impl DoubleEcho { shutdown = self.shutdown.recv() => { warn!("Double echo shutdown signal received {:?}", shutdown); + self.task_manager_cancellation.cancel(); break shutdown; }, Some(command) = self.command_receiver.recv() => { match command { - DoubleEchoCommand::Broadcast { need_gossip, cert } => { - _ = self.broadcast(cert, need_gossip).await; - }, + DoubleEchoCommand::Broadcast { need_gossip, cert } => self.broadcast(cert, need_gossip).await, command if self.subscriptions.is_some() => { match command { diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 7e47cfe72..2df66f276 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -5,10 +5,13 @@ use std::collections::HashMap; use std::future::IntoFuture; use std::pin::Pin; use std::sync::Arc; +use std::time::Duration; use tce_transport::{ProtocolEvents, ReliableBroadcastParams}; use tokio::sync::broadcast; use tokio::{spawn, sync::mpsc}; +use tokio_util::sync::CancellationToken; use topos_core::types::ValidatorId; +use topos_core::uci::Certificate; use topos_core::uci::CertificateId; use topos_metrics::CERTIFICATE_PROCESSING_FROM_API_TOTAL; use topos_metrics::CERTIFICATE_PROCESSING_FROM_GOSSIP_TOTAL; @@ -17,11 +20,14 @@ use topos_metrics::DOUBLE_ECHO_ACTIVE_TASKS_COUNT; use topos_tce_storage::store::ReadStore; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; +use topos_tce_storage::PendingCertificateId; use tracing::debug; +use tracing::error; use tracing::warn; pub mod task; +use crate::constant::PENDING_LIMIT_PER_REQUEST_TO_STORAGE; use crate::double_echo::broadcast_state::BroadcastState; use crate::sampler::SubscriptionsView; use crate::DoubleEchoCommand; @@ -47,11 +53,10 @@ pub struct TaskManager { pub buffered_messages: HashMap>, pub thresholds: ReliableBroadcastParams, pub validator_id: ValidatorId, - pub shutdown_sender: mpsc::Sender<()>, pub validator_store: Arc, pub broadcast_sender: broadcast::Sender, - pub precedence: HashMap, + pub latest_pending_id: PendingCertificateId, } impl TaskManager { @@ -66,35 +71,47 @@ impl TaskManager { message_signer: Arc, validator_store: Arc, broadcast_sender: broadcast::Sender, - ) -> (Self, mpsc::Receiver<()>) { - let (shutdown_sender, shutdown_receiver) = mpsc::channel(1); - - ( - Self { - message_receiver, - task_completion_sender, - subscriptions, - event_sender, - tasks: HashMap::new(), - running_tasks: FuturesUnordered::new(), - buffered_messages: Default::default(), - validator_id, - message_signer, - thresholds, - shutdown_sender, - validator_store, - broadcast_sender, - precedence: HashMap::new(), - }, - shutdown_receiver, - ) + ) -> Self { + Self { + message_receiver, + task_completion_sender, + subscriptions, + event_sender, + tasks: HashMap::new(), + running_tasks: FuturesUnordered::new(), + buffered_messages: Default::default(), + validator_id, + message_signer, + thresholds, + validator_store, + broadcast_sender, + latest_pending_id: 0, + } } - pub async fn run(mut self, mut shutdown_receiver: mpsc::Receiver<()>) { + pub async fn run(mut self, mut shutdown_receiver: CancellationToken) { + let mut interval = tokio::time::interval(Duration::from_secs(1)); + loop { tokio::select! { biased; + _ = interval.tick() => { + debug!("Checking for next pending_certificates"); + match self.validator_store.get_next_pending_certificates(&self.latest_pending_id, *PENDING_LIMIT_PER_REQUEST_TO_STORAGE) { + Ok(pendings) => { + debug!("Received {} pending certificates", pendings.len()); + for (pending_id, certificate) in pendings { + debug!("Creating task for pending certificate {} at position {} if needed", certificate.id, pending_id); + self.create_task(&certificate, true); + self.latest_pending_id = pending_id; + } + } + Err(error) => { + error!("Error while fetching pending certificates: {:?}", error); + } + } + } Some(msg) = self.message_receiver.recv() => { match msg { DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { @@ -109,46 +126,8 @@ impl TaskManager { } DoubleEchoCommand::Broadcast { ref cert, need_gossip } => { debug!("Received broadcast message for certificate {} ", cert.id); - match self.tasks.entry(cert.id) { - std::collections::hash_map::Entry::Vacant(entry) => { - let broadcast_state = BroadcastState::new( - cert.clone(), - self.validator_id, - self.thresholds.echo_threshold, - self.thresholds.ready_threshold, - self.thresholds.delivery_threshold, - self.event_sender.clone(), - self.subscriptions.clone(), - need_gossip, - self.message_signer.clone(), - ); - - let (task, task_context) = Task::new( - cert.id, - broadcast_state, - self.validator_store.clone(), - self.broadcast_sender.clone() - ); - - let prev = self.validator_store.get_certificate(&cert.prev_id); - if matches!(prev, Ok(Some(_))) || cert.prev_id == topos_core::uci::INITIAL_CERTIFICATE_ID { - Self::start_task( - &self.running_tasks, - task, - task_context.sink.clone(), - self.buffered_messages.remove(&cert.id), - need_gossip - ); - } else { - debug!("Received broadcast message for certificate {} but the previous certificate {} is not available yet", cert.id, cert.prev_id); - self.precedence.insert(cert.prev_id, task); - } - entry.insert(task_context); - } - std::collections::hash_map::Entry::Occupied(_) => { - debug!("Received broadcast message for certificate {} but it is already being processed", cert.id); - }, - } + + self.create_task(cert, need_gossip) } } } @@ -156,30 +135,23 @@ impl TaskManager { Some((certificate_id, status)) = self.running_tasks.next() => { if let TaskStatus::Success = status { + debug!("Task for certificate {} finished successfully", certificate_id); self.tasks.remove(&certificate_id); DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); let _ = self.task_completion_sender.send((certificate_id, status)).await; - if let Some(task) = self.precedence.remove(&certificate_id) { - if let Some(context) = self.tasks.get(&task.certificate_id) { - - let certificate_id= task.certificate_id; - Self::start_task( - &self.running_tasks, - task, - context.sink.clone(), - self.buffered_messages.remove(&certificate_id), - false - ); - } - - - } + } else { + debug!("Task for certificate {} finished unsuccessfully", certificate_id); } } - _ = shutdown_receiver.recv() => { + _ = shutdown_receiver.cancelled() => { warn!("Task Manager shutting down"); + warn!("There are still {} active tasks", self.tasks.len()); + if !self.tasks.is_empty() { + debug!("Certificates still in broadcast: {:?}", self.tasks.keys()); + } + warn!("There are still {} buffered messages", self.buffered_messages.len()); for task in self.tasks.iter() { task.1.shutdown_sender.send(()).await.unwrap(); } @@ -216,10 +188,56 @@ impl TaskManager { CERTIFICATE_PROCESSING_FROM_GOSSIP_TOTAL.inc(); } } -} -impl Drop for TaskManager { - fn drop(&mut self) { - _ = self.shutdown_sender.try_send(()); + fn create_task(&mut self, cert: &Certificate, need_gossip: bool) { + match self.tasks.entry(cert.id) { + std::collections::hash_map::Entry::Vacant(entry) => { + let broadcast_state = BroadcastState::new( + cert.clone(), + self.validator_id, + self.thresholds.echo_threshold, + self.thresholds.ready_threshold, + self.thresholds.delivery_threshold, + self.event_sender.clone(), + self.subscriptions.clone(), + need_gossip, + self.message_signer.clone(), + ); + + let (task, task_context) = Task::new( + cert.id, + broadcast_state, + self.validator_store.clone(), + self.broadcast_sender.clone(), + ); + + let prev = self.validator_store.get_certificate(&cert.prev_id); + if matches!(prev, Ok(Some(_))) + || cert.prev_id == topos_core::uci::INITIAL_CERTIFICATE_ID + { + Self::start_task( + &self.running_tasks, + task, + task_context.sink.clone(), + self.buffered_messages.remove(&cert.id), + need_gossip, + ); + } else { + debug!( + "Received broadcast message for certificate {} but the previous \ + certificate {} is not available yet", + cert.id, cert.prev_id + ); + } + entry.insert(task_context); + } + std::collections::hash_map::Entry::Occupied(_) => { + debug!( + "Received broadcast message for certificate {} but it is already being \ + processed", + cert.id + ); + } + } } } diff --git a/crates/topos-tce-broadcast/src/tests/task_manager.rs b/crates/topos-tce-broadcast/src/tests/task_manager.rs index 6e83e0cae..dd50bac3d 100644 --- a/crates/topos-tce-broadcast/src/tests/task_manager.rs +++ b/crates/topos-tce-broadcast/src/tests/task_manager.rs @@ -5,6 +5,7 @@ use tokio::{ spawn, sync::{broadcast, mpsc}, }; +use tokio_util::sync::CancellationToken; use topos_crypto::{messages::MessageSigner, validator_id::ValidatorId}; use topos_tce_storage::validator::ValidatorStore; use topos_test_sdk::{ @@ -23,6 +24,7 @@ async fn can_start(#[future] create_validator_store: Arc) { let (task_completion_sender, _) = mpsc::channel(1); let (event_sender, _) = mpsc::channel(1); let (broadcast_sender, _) = broadcast::channel(1); + let shutdown = CancellationToken::new(); let validator_id = ValidatorId::default(); let thresholds = tce_transport::ReliableBroadcastParams { echo_threshold: 1, @@ -35,7 +37,7 @@ async fn can_start(#[future] create_validator_store: Arc) { .unwrap(), ); - let (mut manager, shutdown_receiver) = TaskManager::new( + let mut manager = TaskManager::new( message_receiver, task_completion_sender, SubscriptionsView::default(), @@ -47,7 +49,7 @@ async fn can_start(#[future] create_validator_store: Arc) { broadcast_sender, ); - spawn(manager.run(shutdown_receiver)); + spawn(manager.run(shutdown)); let certificates = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 2); let parent = certificates diff --git a/crates/topos-tce-storage/src/errors.rs b/crates/topos-tce-storage/src/errors.rs index 1c9284fc5..bb9dee8e1 100644 --- a/crates/topos-tce-storage/src/errors.rs +++ b/crates/topos-tce-storage/src/errors.rs @@ -10,6 +10,9 @@ pub enum InternalStorageError { #[error("The certificate already exists")] CertificateAlreadyExists, + #[error("The certificate is already in pending")] + CertificateAlreadyPending, + #[error("Unable to find a certificate: {0:?}")] CertificateNotFound(CertificateId), diff --git a/crates/topos-tce-storage/src/rocks/db_column.rs b/crates/topos-tce-storage/src/rocks/db_column.rs index a011b30de..1130b4955 100644 --- a/crates/topos-tce-storage/src/rocks/db_column.rs +++ b/crates/topos-tce-storage/src/rocks/db_column.rs @@ -11,7 +11,7 @@ use rocksdb::{ }; use bincode::Options; -use serde::{de::DeserializeOwned, Serialize}; +use serde::{de::DeserializeOwned, Deserialize, Serialize}; use crate::errors::InternalStorageError; @@ -222,6 +222,13 @@ where Ok(ColumnIterator::new(raw_iterator)) } + fn iter_at(&'a self, index: &I) -> Result { + let mut raw_iterator = self.rocksdb.raw_iterator_cf(&self.cf()?); + + raw_iterator.seek(be_fix_int_ser(index)?); + Ok(ColumnIterator::new(raw_iterator)) + } + fn iter_with_mode( &'a self, mode: IteratorMode<'_>, @@ -271,7 +278,7 @@ where } /// Serialize a value using a fix length serialize and a big endian endianness -fn be_fix_int_ser(t: &S) -> Result, InternalStorageError> +pub(crate) fn be_fix_int_ser(t: &S) -> Result, InternalStorageError> where S: Serialize + ?Sized, { diff --git a/crates/topos-tce-storage/src/rocks/map.rs b/crates/topos-tce-storage/src/rocks/map.rs index 2597f599e..c781b8db8 100644 --- a/crates/topos-tce-storage/src/rocks/map.rs +++ b/crates/topos-tce-storage/src/rocks/map.rs @@ -13,6 +13,9 @@ where /// Returns an Iterator over the whole CF fn iter(&'a self) -> Result; + /// Returns an Iterator over the CF starting from index + fn iter_at(&'a self, index: &I) -> Result; + /// Returns an Iterator over the whole CF with mode configured fn iter_with_mode( &'a self, diff --git a/crates/topos-tce-storage/src/tests/mod.rs b/crates/topos-tce-storage/src/tests/mod.rs index c076ac3b3..939fbf568 100644 --- a/crates/topos-tce-storage/src/tests/mod.rs +++ b/crates/topos-tce-storage/src/tests/mod.rs @@ -10,6 +10,7 @@ use topos_core::{ }; use crate::{ + errors::StorageError, rocks::map::Map, store::{ReadStore, WriteStore}, validator::ValidatorStore, @@ -354,17 +355,22 @@ async fn pending_certificate_can_be_removed(store: Arc) { assert!(matches!(pending_column.get(&pending_id), Ok(None))); - let _ = store.insert_pending_certificate(&certificate).unwrap(); - let pending_id = store .insert_pending_certificate(&certificate) .unwrap() .unwrap(); + assert!(matches!( + store.insert_pending_certificate(&certificate), + Err(StorageError::InternalStorage( + crate::errors::InternalStorageError::CertificateAlreadyPending + )) + )); + assert!(pending_column.get(&pending_id).is_ok()); store.delete_pending_certificate(&pending_id).unwrap(); - assert!(pending_column.iter().unwrap().next().is_some()); + assert!(pending_column.iter().unwrap().next().is_none()); } #[rstest] @@ -494,7 +500,7 @@ async fn get_pending_certificates(store: Arc) { let mut expected_pending_certificates = certificates_for_source_subnet_1[10..] .iter() .enumerate() - .map(|(index, certificate)| (index as u64, certificate.certificate.clone())) + .map(|(index, certificate)| ((index as u64 + 1), certificate.certificate.clone())) .collect::>(); expected_pending_certificates.extend( @@ -503,7 +509,7 @@ async fn get_pending_certificates(store: Arc) { .enumerate() .map(|(index, certificate)| { ( - index as u64 + expected_pending_certificates.len() as u64, + (index as u64 + 1) + expected_pending_certificates.len() as u64, certificate.certificate.clone(), ) }) diff --git a/crates/topos-tce-storage/src/types.rs b/crates/topos-tce-storage/src/types.rs index 631f6db8f..d02a94d9a 100644 --- a/crates/topos-tce-storage/src/types.rs +++ b/crates/topos-tce-storage/src/types.rs @@ -35,6 +35,7 @@ pub(crate) type TargetSourceListColumn = DBColumn #[derive(Debug, Clone)] pub enum PendingResult { AlreadyDelivered, + AlreadyPending, AwaitPrecedence, InPending(PendingCertificateId), } diff --git a/crates/topos-tce-storage/src/validator/mod.rs b/crates/topos-tce-storage/src/validator/mod.rs index e1c04e379..4fd438613 100644 --- a/crates/topos-tce-storage/src/validator/mod.rs +++ b/crates/topos-tce-storage/src/validator/mod.rs @@ -118,6 +118,23 @@ impl ValidatorStore { Ok(self.pending_tables.pending_pool.iter()?.collect()) } + pub fn get_next_pending_certificates( + &self, + from: &PendingCertificateId, + number: usize, + ) -> Result, StorageError> { + debug!( + "Get next pending certificates from {} (max: {})", + from, number + ); + Ok(self + .pending_tables + .pending_pool + .iter_at(from)? + .take(number) + .collect()) + } + // TODO: Performance issue on this one as we iter over all the pending certificates // We need to improve how we request the pending certificates. pub fn get_pending_certificates_for_subnets( @@ -191,16 +208,29 @@ impl ValidatorStore { certificate: &Certificate, ) -> Result, StorageError> { if self.get_certificate(&certificate.id)?.is_some() { + debug!("Certificate {} is already delivered", certificate.id); return Err(StorageError::InternalStorage( InternalStorageError::CertificateAlreadyExists, )); } + if self + .pending_tables + .pending_pool_index + .get(&certificate.id)? + .is_some() + { + debug!( + "Certificate {} is already in the pending pool", + certificate.id + ); + return Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyPending, + )); + } + let prev_delivered = certificate.prev_id == INITIAL_CERTIFICATE_ID - || self - .fullnode_store - .get_certificate(&certificate.prev_id)? - .is_some(); + || self.get_certificate(&certificate.prev_id)?.is_some(); if prev_delivered { let id = self @@ -213,11 +243,20 @@ impl ValidatorStore { .pending_pool_index .insert(&certificate.id, &id)?; + debug!( + "Certificate {} is now in the pending pool at index: {}", + certificate.id, id + ); Ok(Some(id)) } else { self.pending_tables .precedence_pool .insert(&certificate.prev_id, certificate)?; + debug!( + "Certificate {} is now in the precedence pool, because the previous certificate \ + {} isn't delivered yet", + certificate.id, certificate.prev_id + ); Ok(None) } @@ -467,12 +506,19 @@ impl WriteStore for ValidatorStore { _ = self.pending_tables.pending_pool.delete(&pending_id); } - if let Ok(Some(certificate)) = self + if let Ok(Some(next_certificate)) = self .pending_tables .precedence_pool .get(&certificate.certificate.id) { - self.insert_pending_certificate(&certificate)?; + debug!( + "Delivered certificate {} unlocks {} for broadcast", + certificate.certificate.id, next_certificate.id + ); + self.insert_pending_certificate(&next_certificate)?; + self.pending_tables + .precedence_pool + .delete(&certificate.certificate.id)?; } Ok(position) diff --git a/crates/topos-tce-storage/src/validator/tables.rs b/crates/topos-tce-storage/src/validator/tables.rs index b310b3ab0..c6fa1193f 100644 --- a/crates/topos-tce-storage/src/validator/tables.rs +++ b/crates/topos-tce-storage/src/validator/tables.rs @@ -1,5 +1,10 @@ -use std::{fs::create_dir_all, path::PathBuf, sync::atomic::AtomicU64}; +use std::{ + fs::create_dir_all, + path::PathBuf, + sync::atomic::{AtomicU64, Ordering}, +}; +use bincode::Options; use rocksdb::ColumnFamilyDescriptor; use topos_core::{ types::ProofOfDelivery, @@ -71,11 +76,38 @@ impl ValidatorPendingTables { let db = init_with_cfs(&path, default_options(), cfs) .unwrap_or_else(|_| panic!("Cannot open DB at {:?}", path)); + let pending_pool = DBColumn::reopen(&db, cfs::PENDING_POOL); + let next_pending_id = { + let cf = pending_pool + .rocksdb + .cf_handle(cfs::PENDING_POOL) + .expect("Cannot get cf handle for pending pool"); + let mut pending_iterator = pending_pool.rocksdb.raw_iterator_cf(&cf); + + pending_iterator.seek_to_last(); + if pending_iterator.valid() { + AtomicU64::new( + pending_iterator + .key() + .map(|key| { + bincode::DefaultOptions::new() + .with_big_endian() + .with_fixint_encoding() + .deserialize(key) + .unwrap_or(0) + }) + .unwrap_or(0), + ) + } else { + AtomicU64::new(0) + } + }; + + next_pending_id.fetch_add(1, Ordering::Relaxed); Self { - // TODO: Fetch it from the storage - next_pending_id: AtomicU64::new(0), - pending_pool: DBColumn::reopen(&db, cfs::PENDING_POOL), + next_pending_id, + pending_pool, pending_pool_index: DBColumn::reopen(&db, cfs::PENDING_POOL_INDEX), precedence_pool: DBColumn::reopen(&db, cfs::PRECEDENCE_POOL), } diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index c781eb199..38127c82c 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -16,6 +16,7 @@ use topos_tce_api::RuntimeClient as ApiClient; use topos_tce_api::RuntimeEvent as ApiEvent; use topos_tce_broadcast::ReliableBroadcastClient; use topos_tce_gatekeeper::GatekeeperClient; +use topos_tce_storage::store::ReadStore; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; use topos_tce_storage::StorageClient; @@ -127,6 +128,7 @@ impl AppContext { // Shutdown signal _ = shutdown.0.cancelled() => { info!("Shutting down TCE app context..."); + if let Err(e) = self.shutdown().await { error!("Error shutting down TCE app context: {e}"); } @@ -141,11 +143,35 @@ impl AppContext { pub async fn shutdown(&mut self) -> Result<(), Box> { info!("Shutting down the TCE client..."); + self.api_client.shutdown().await?; self.tce_cli.shutdown().await?; self.gatekeeper.shutdown().await?; self.network_client.shutdown().await?; + let certificates_synced = self + .validator_store + .count_certificates_delivered() + .map_err(|error| format!("Unable to count certificates delivered: {error}")) + .unwrap(); + + let pending_certificates = self + .validator_store + .count_pending_certificates() + .map_err(|error| format!("Unable to count pending certificates: {error}")) + .unwrap(); + + let precedence_pool_certificates = self + .validator_store + .count_precedence_pool_certificates() + .map_err(|error| format!("Unable to count precedence pool certificates: {error}")) + .unwrap(); + + info!( + "Stopping with {} certificates delivered, {} pending certificates and {} certificates \ + in the precedence pool", + certificates_synced, pending_certificates, precedence_pool_certificates + ); Ok(()) } } diff --git a/crates/topos-tce/src/app_context/api.rs b/crates/topos-tce/src/app_context/api.rs index 10edcfc40..1d84ade6d 100644 --- a/crates/topos-tce/src/app_context/api.rs +++ b/crates/topos-tce/src/app_context/api.rs @@ -25,19 +25,29 @@ impl AppContext { { Ok(Some(pending_id)) => { debug!( - "Certificate {} has been inserted into pending pool", - certificate.id + "Certificate {} from subnet {} has been inserted into pending pool", + certificate.id, certificate.source_subnet_id ); sender.send(Ok(PendingResult::InPending(pending_id))) } Ok(None) => { debug!( - "Certificate {} has been inserted into precedence pool", - certificate.id + "Certificate {} from subnet {} has been inserted into precedence pool \ + waiting for {}", + certificate.id, certificate.source_subnet_id, certificate.prev_id ); sender.send(Ok(PendingResult::AwaitPrecedence)) } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyPending, + )) => { + debug!( + "Certificate {} has been already added to the pending pool, skipping", + certificate.id + ); + sender.send(Ok(PendingResult::AlreadyPending)) + } Err(StorageError::InternalStorage( InternalStorageError::CertificateAlreadyExists, )) => { @@ -56,11 +66,6 @@ impl AppContext { sender.send(Err(error.into())) } }; - - _ = self - .tce_cli - .broadcast_new_certificate(*certificate, true) - .await; } ApiEvent::GetSourceHead { subnet_id, sender } => { diff --git a/crates/topos-tce/src/app_context/network.rs b/crates/topos-tce/src/app_context/network.rs index 060243abc..276af70bc 100644 --- a/crates/topos-tce/src/app_context/network.rs +++ b/crates/topos-tce/src/app_context/network.rs @@ -55,6 +55,15 @@ impl AppContext { cert.id ); } + Err(StorageError::InternalStorage( + InternalStorageError::CertificateAlreadyPending, + )) => { + debug!( + "Certificate {} from subnet {} has been inserted into \ + precedence pool waiting for {}", + cert.id, cert.source_subnet_id, cert.prev_id + ); + } Err(StorageError::InternalStorage( InternalStorageError::CertificateAlreadyExists, )) => { @@ -70,23 +79,6 @@ impl AppContext { ); } } - - spawn(async move { - info!("Send certificate {} to be broadcast", cert.id); - if channel - .send(DoubleEchoCommand::Broadcast { - cert, - need_gossip: false, - }) - .await - .is_err() - { - error!( - "Unable to send broadcast_new_certificate command, Receiver \ - was dropped" - ); - } - }); } Err(e) => { error!("Error converting received certificate {e}"); diff --git a/crates/topos-tce/src/config.rs b/crates/topos-tce/src/config.rs index ddc9dfb53..d4c5d20f4 100644 --- a/crates/topos-tce/src/config.rs +++ b/crates/topos-tce/src/config.rs @@ -31,6 +31,7 @@ pub struct TceConfiguration { pub version: &'static str, pub listen_addresses: Vec, pub public_addresses: Vec, + pub is_bootnode: bool, } #[derive(Debug)] diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index 012c8547a..fe72f8897 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -144,6 +144,7 @@ pub async fn run( .public_addresses(config.public_addresses.clone()) .known_peers(&boot_peers) .grpc_context(grpc_context) + .is_bootnode(config.is_bootnode) .build() .await?; diff --git a/crates/topos-test-sdk/src/tce/p2p.rs b/crates/topos-test-sdk/src/tce/p2p.rs index 2dfacf798..faa0b62f8 100644 --- a/crates/topos-test-sdk/src/tce/p2p.rs +++ b/crates/topos-test-sdk/src/tce/p2p.rs @@ -54,6 +54,7 @@ pub async fn create_network_worker( .listen_addresses(addr) .minimum_cluster_size(minimum_cluster_size) .grpc_context(grpc_context) + .is_bootnode(seed == 1) .build() .await } diff --git a/crates/topos/src/components/node/services/process.rs b/crates/topos/src/components/node/services/process.rs index dd2a42246..fa12f98b5 100644 --- a/crates/topos/src/components/node/services/process.rs +++ b/crates/topos/src/components/node/services/process.rs @@ -1,10 +1,10 @@ use crate::config::sequencer::SequencerConfig; use crate::config::tce::TceConfig; use crate::edge::CommandConfig; -use std::collections::HashMap; use std::path::PathBuf; use std::process::ExitStatus; use std::time::Duration; +use std::{collections::HashMap, task::Wake}; use thiserror::Error; use tokio::{spawn, sync::mpsc, task::JoinHandle}; use tokio_util::sync::CancellationToken; @@ -79,6 +79,22 @@ pub(crate) fn spawn_tce_process( genesis: Genesis, shutdown: (CancellationToken, mpsc::Sender<()>), ) -> JoinHandle> { + let boot_peers = genesis + .boot_peers(Some(topos_p2p::constants::TCE_BOOTNODE_PORT)) + .into_iter() + .chain(config.parse_boot_peers()) + .collect::>(); + let auth_key = keys.network.map(AuthKey::PrivateKey); + config.p2p.is_bootnode = if let Some(AuthKey::PrivateKey(ref k)) = auth_key { + let peer_id = topos_p2p::utils::keypair_from_protobuf_encoding(&k[..]) + .public() + .to_peer_id(); + + Some(boot_peers.iter().any(|(p, _)| p == &peer_id)) + } else { + None + }; + let validators = genesis.validators().expect("Cannot parse validators"); let tce_params = ReliableBroadcastParams::new(validators.len()); @@ -98,13 +114,10 @@ pub(crate) fn spawn_tce_process( } let tce_config = TceConfiguration { - boot_peers: genesis - .boot_peers(Some(topos_p2p::constants::TCE_BOOTNODE_PORT)) - .into_iter() - .chain(config.parse_boot_peers()) - .collect::>(), + is_bootnode: config.p2p.is_bootnode.unwrap_or_default(), + boot_peers, validators, - auth_key: keys.network.map(AuthKey::PrivateKey), + auth_key, signing_key: keys.validator.map(AuthKey::PrivateKey), listen_addresses: config.p2p.listen_addresses, public_addresses: config.p2p.public_addresses, diff --git a/crates/topos/src/config/tce.rs b/crates/topos/src/config/tce.rs index a414c8405..1a6edc208 100644 --- a/crates/topos/src/config/tce.rs +++ b/crates/topos/src/config/tce.rs @@ -61,6 +61,8 @@ pub struct P2PConfig { /// List of multiaddresses to advertise to the network #[serde(default = "default_public_addresses")] pub public_addresses: Vec, + + pub is_bootnode: Option, } impl Default for P2PConfig { @@ -68,6 +70,7 @@ impl Default for P2PConfig { Self { listen_addresses: default_listen_addresses(), public_addresses: default_public_addresses(), + is_bootnode: None, } } }