diff --git a/crates/topos-tce-api/src/graphql/query.rs b/crates/topos-tce-api/src/graphql/query.rs index 11f52633f..596c64140 100644 --- a/crates/topos-tce-api/src/graphql/query.rs +++ b/crates/topos-tce-api/src/graphql/query.rs @@ -4,6 +4,7 @@ use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription}; use async_trait::async_trait; use futures::{Stream, StreamExt}; use tokio::sync::{mpsc, oneshot}; +use topos_core::api::graphql::checkpoint::SourceStreamPosition; use topos_core::api::graphql::errors::GraphQLServerError; use topos_core::api::graphql::filter::SubnetFilter; use topos_core::api::graphql::{ @@ -112,6 +113,31 @@ impl QueryRoot { ) -> Result { Self::certificate_by_id(ctx, certificate_id).await } + + /// This endpoint is used to get the current checkpoint of the source streams. + /// The checkpoint is the position of the last certificate delivered for each source stream. + async fn get_checkpoint( + &self, + ctx: &Context<'_>, + ) -> Result, GraphQLServerError> { + let store = ctx.data::>().map_err(|_| { + tracing::error!("Failed to get store from context"); + + GraphQLServerError::ParseDataConnector + })?; + + let checkpoint = store + .get_checkpoint() + .map_err(|_| GraphQLServerError::StorageError)?; + + Ok(checkpoint + .iter() + .map(|(subnet_id, head)| SourceStreamPosition { + source_subnet_id: subnet_id.into(), + position: *head.position, + }) + .collect()) + } } pub struct SubscriptionRoot; diff --git a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs index 807dbc226..ee82d7393 100644 --- a/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs +++ b/crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs @@ -230,13 +230,20 @@ impl BroadcastState { fn reached_delivery_threshold(&self) -> bool { // If reached the delivery threshold, I can deliver - match self + let delivery_threshold = match self .subscriptions_view .network_size .checked_sub(self.subscriptions_view.ready.len()) { Some(consumed) => consumed >= self.delivery_threshold, None => false, - } + }; + + debug!( + "📝 Certificate {} reached Delivery threshold: {}", + &self.certificate.id, delivery_threshold + ); + + delivery_threshold } } diff --git a/crates/topos-tce-storage/src/rocks/map.rs b/crates/topos-tce-storage/src/rocks/map.rs index c781b8db8..fc6d5cb38 100644 --- a/crates/topos-tce-storage/src/rocks/map.rs +++ b/crates/topos-tce-storage/src/rocks/map.rs @@ -17,6 +17,7 @@ where fn iter_at(&'a self, index: &I) -> Result; /// Returns an Iterator over the whole CF with mode configured + #[allow(dead_code)] fn iter_with_mode( &'a self, mode: IteratorMode<'_>, @@ -29,6 +30,7 @@ where ) -> Result; /// Returns a prefixed Iterator over the CF starting from index + #[allow(dead_code)] fn prefix_iter_at( &'a self, prefix: &P, diff --git a/crates/topos-tce-storage/src/tests/checkpoints.rs b/crates/topos-tce-storage/src/tests/checkpoints.rs new file mode 100644 index 000000000..947eb381b --- /dev/null +++ b/crates/topos-tce-storage/src/tests/checkpoints.rs @@ -0,0 +1,123 @@ +use std::{collections::HashMap, sync::Arc}; + +use rstest::rstest; +use topos_core::uci::SubnetId; +use topos_test_sdk::{ + certificates::create_certificate_chain, + constants::{SOURCE_SUBNET_ID_1, SOURCE_SUBNET_ID_2, TARGET_SUBNET_ID_1}, +}; + +use super::support::store; +use crate::{ + store::{ReadStore, WriteStore}, + validator::ValidatorStore, +}; + +#[rstest] +#[tokio::test] +async fn get_checkpoint_for_two_subnets(store: Arc) { + let certificates_a = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 32); + let certificates_b = create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_1], 24); + + for cert in certificates_a { + _ = store.insert_certificate_delivered(&cert).await; + } + + for cert in certificates_b { + _ = store.insert_certificate_delivered(&cert).await; + } + + let checkpoint = store + .get_checkpoint() + .unwrap() + .into_iter() + .map(|(subnet, value)| (subnet, *value.position)) + .collect::>(); + + assert_eq!(checkpoint.len(), 2); + assert_eq!(*checkpoint.get(&SOURCE_SUBNET_ID_1).unwrap(), 31); + assert_eq!(*checkpoint.get(&SOURCE_SUBNET_ID_2).unwrap(), 23); +} + +#[rstest] +#[tokio::test] +async fn get_checkpoint_diff_with_no_input(store: Arc) { + let certificates_a = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 32); + let certificates_b = create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_1], 24); + + for cert in certificates_a { + _ = store.insert_certificate_delivered(&cert).await; + } + + for cert in certificates_b { + _ = store.insert_certificate_delivered(&cert).await; + } + + let checkpoint = store + .get_checkpoint_diff(&[]) + .unwrap() + .into_iter() + .map(|(subnet, proofs)| { + ( + subnet, + proofs + .iter() + .map(|proof| *proof.delivery_position.position) + .collect::>(), + ) + }) + .collect::>(); + + assert_eq!(checkpoint.len(), 2); + assert_eq!( + *checkpoint.get(&SOURCE_SUBNET_ID_1).unwrap(), + (0..=31).collect::>() + ); + assert_eq!( + *checkpoint.get(&SOURCE_SUBNET_ID_2).unwrap(), + (0..=23).collect::>() + ); +} + +#[rstest] +#[tokio::test] +async fn get_checkpoint_diff_with_input(store: Arc) { + let certificates_a = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 32); + let certificates_b = create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_1], 24); + + let checkpoint = certificates_a.get(20).unwrap().proof_of_delivery.clone(); + assert_eq!(*checkpoint.delivery_position.position, 20); + + for cert in certificates_a { + _ = store.insert_certificate_delivered(&cert).await; + } + + for cert in certificates_b { + _ = store.insert_certificate_delivered(&cert).await; + } + + let checkpoint = store + .get_checkpoint_diff(&[checkpoint]) + .unwrap() + .into_iter() + .map(|(subnet, proofs)| { + ( + subnet, + proofs + .iter() + .map(|proof| *proof.delivery_position.position) + .collect::>(), + ) + }) + .collect::>(); + + assert_eq!(checkpoint.len(), 2); + assert_eq!( + *checkpoint.get(&SOURCE_SUBNET_ID_1).unwrap(), + (21..=31).collect::>() + ); + assert_eq!( + *checkpoint.get(&SOURCE_SUBNET_ID_2).unwrap(), + (0..=23).collect::>() + ); +} diff --git a/crates/topos-tce-storage/src/tests/mod.rs b/crates/topos-tce-storage/src/tests/mod.rs index 939fbf568..03255a1db 100644 --- a/crates/topos-tce-storage/src/tests/mod.rs +++ b/crates/topos-tce-storage/src/tests/mod.rs @@ -21,6 +21,7 @@ use self::support::store; use topos_test_sdk::certificates::create_certificate_chain; use topos_test_sdk::constants::*; +mod checkpoints; mod db_columns; mod pending_certificates; mod position; diff --git a/crates/topos-tce-storage/src/validator/mod.rs b/crates/topos-tce-storage/src/validator/mod.rs index 4fd438613..4d05233ac 100644 --- a/crates/topos-tce-storage/src/validator/mod.rs +++ b/crates/topos-tce-storage/src/validator/mod.rs @@ -29,7 +29,7 @@ use topos_core::{ }, uci::{Certificate, CertificateId, SubnetId, INITIAL_CERTIFICATE_ID}, }; -use tracing::{debug, info, instrument}; +use tracing::{debug, error, info, instrument}; use crate::{ errors::{InternalStorageError, StorageError}, @@ -334,16 +334,25 @@ impl ValidatorStore { .get(certificate_id)?) } + /// Returns the difference between the `from` list of [ProofOfDelivery] and the local head + /// checkpoint. This is used to define the list of certificates that are missing between the + /// `from` and the local head checkpoint. + /// The maximum number of [ProofOfDelivery] returned per [SubnetId] is 100. + /// If the `from` is missing a local subnet, the list of [ProofOfDelivery] for this subnet will + /// start from [Position] `0`. pub fn get_checkpoint_diff( &self, - from: Vec, + from: &[ProofOfDelivery], ) -> Result>, StorageError> { // Parse the from in order to extract the different position per subnets - let mut from_positions: HashMap> = from - .into_iter() - .map(|v| (v.delivery_position.subnet_id, vec![v])) + let from_positions: HashMap = from + .iter() + .map(|v| (v.delivery_position.subnet_id, v)) .collect(); + let mut output: HashMap> = + from_positions.keys().map(|s| (*s, vec![])).collect(); + // Request the local head checkpoint let subnets: HashMap = self .fullnode_store @@ -356,16 +365,16 @@ impl ValidatorStore { // For every local known subnets we want to iterate and check if there // is a delta between the from_position and our head position. for (subnet, local_position) in subnets { - let entry = from_positions.entry(subnet).or_default(); - - let certs: Vec<_> = if let Some(position) = entry.pop() { + let certs: Vec<_> = if let Some(position) = from_positions.get(&subnet) { if local_position <= position.delivery_position.position { continue; } + self.fullnode_store .perpetual_tables .streams - .prefix_iter_at(&subnet, &position)? + .prefix_iter(&(&subnet, &position.delivery_position.position))? + .skip(1) .take(100) .map(|(_, v)| v) .collect() @@ -373,7 +382,7 @@ impl ValidatorStore { self.fullnode_store .perpetual_tables .streams - .prefix_iter(&subnet)? + .prefix_iter(&(&subnet, Position::ZERO))? .take(100) .map(|(_, v)| v) .collect() @@ -391,10 +400,17 @@ impl ValidatorStore { subnet, proofs.len() ); - entry.extend_from_slice(&proofs[..]); + + if let Some(old_value) = output.insert(subnet, proofs) { + error!( + "Certificate Sync: This should not happen, we are overwriting a value during \ + sync of {subnet}. Overwriting {}", + old_value.len() + ); + } } - Ok(from_positions) + Ok(output) } #[cfg(test)] diff --git a/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs b/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs index 3f096db2d..99cb82ff6 100644 --- a/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs +++ b/crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs @@ -26,7 +26,7 @@ use topos_core::{ use topos_p2p::{error::P2PError, NetworkClient, PeerId}; use topos_tce_storage::{errors::StorageError, store::ReadStore, validator::ValidatorStore}; -use tracing::{debug, error, warn}; +use tracing::{debug, error, info, warn}; use uuid::Uuid; mod config; @@ -128,7 +128,7 @@ impl CheckpointSynchronizer { &self, peer: PeerId, ) -> Result>, SyncError> { - let request_id: APIUuid = Uuid::new_v4().into(); + let request_id = Uuid::new_v4(); let checkpoint: Vec = { let certificate_ids = self @@ -148,12 +148,16 @@ impl CheckpointSynchronizer { .collect() }; + debug!( + "Asking {} for latest checkpoint (request_id: {}), with local checkpoint: {:?}", + peer, request_id, checkpoint + ); + let req = CheckpointRequest { - request_id: Some(request_id), + request_id: Some(request_id.into()), checkpoint, }; - debug!("Asking {} for latest checkpoint", peer); let mut client: SynchronizerServiceClient<_> = self .network .new_grpc_client::, SynchronizerServiceServer>(peer) @@ -167,6 +171,7 @@ impl CheckpointSynchronizer { .map(|v| { let subnet = SubnetId::from_str(&v.key[..]).map_err(|_| SyncError::UnableToParseSubnetId)?; + let proofs = v .value .into_iter() @@ -184,11 +189,14 @@ impl CheckpointSynchronizer { diff: HashMap>, ) -> Result>, SyncError> { let mut certs: HashSet = HashSet::new(); - for (_subnet, proofs) in diff { + for (subnet, proofs) in diff { let len = proofs.len(); let unverified_certs = self.store.insert_unverified_proofs(proofs)?; - debug!("Persist {} unverified proofs", len); + debug!( + "Persist {} unverified proof of delivery for {}", + len, subnet + ); certs.extend(&unverified_certs[..]); } @@ -254,6 +262,7 @@ impl CheckpointSynchronizer { let diff = self.ask_for_checkpoint(target_peer).await?; let certificates_to_catchup = self.insert_unverified_proofs(diff)?; + info!("Certificates to catchup: {}", certificates_to_catchup.len()); for certificates in certificates_to_catchup { let certificates = self.fetch_certificates(certificates).await?; @@ -267,6 +276,7 @@ impl CheckpointSynchronizer { let certificate_id = certificate.id; match store.synchronize_certificate(certificate).await { Ok(_) => debug!("Certificate {} synchronized", certificate_id), + Err(StorageError::InternalStorage(topos_tce_storage::errors::InternalStorageError::CertificateAlreadyExists)) => {} Err(e) => error!("Failed to sync because of: {:?}", e), } }); diff --git a/crates/topos-tce-synchronizer/src/lib.rs b/crates/topos-tce-synchronizer/src/lib.rs index d7c50a5af..0f09aaf07 100644 --- a/crates/topos-tce-synchronizer/src/lib.rs +++ b/crates/topos-tce-synchronizer/src/lib.rs @@ -28,7 +28,8 @@ use topos_core::{ uci::CertificateId, }; use topos_tce_storage::{store::ReadStore, validator::ValidatorStore}; -use tracing::{error, info, warn}; +use tracing::{debug, error, info, trace, warn}; +use uuid::Uuid; pub struct Synchronizer { pub(crate) shutdown: CancellationToken, @@ -143,6 +144,12 @@ impl GrpcSynchronizerService for SynchronizerService { request: Request, ) -> Result, Status> { let request = request.into_inner(); + let id = request + .request_id + .map(|id| id.into()) + .unwrap_or(Uuid::new_v4()); + debug!("Received request for checkpoint (request_id: {})", id); + let res: Result, _> = request .checkpoint .into_iter() @@ -151,41 +158,62 @@ impl GrpcSynchronizerService for SynchronizerService { let res = match res { Err(error) => { - error!("{}", error); + error!("Invalid checkpoint for request {}: {}", id, error); return Err(Status::invalid_argument("Invalid checkpoint")); } Ok(value) => value, }; - let diff = if let Ok(diff) = self.validator_store.get_checkpoint_diff(res) { - diff.into_iter() - .map(|(key, value)| { - let v: Vec<_> = value - .into_iter() - .map(|v| ProofOfDelivery { - delivery_position: Some(SourceStreamPosition { - source_subnet_id: Some(v.delivery_position.subnet_id.into()), - position: *v.delivery_position.position, - certificate_id: Some(v.certificate_id.into()), - }), - readies: v - .readies - .into_iter() - .map(|(ready, signature)| SignedReady { ready, signature }) - .collect(), - threshold: v.threshold, - }) - .collect(); - CheckpointMapFieldEntry { - key: key.to_string(), - value: v, - } - }) - .collect() - } else { - Vec::new() + debug!("Request {} contains {} proof_of_delivery", id, res.len()); + trace!("Request {} contains {:?}", id, res); + let diff = match self.validator_store.get_checkpoint_diff(&res) { + Ok(diff) => { + debug!( + "Fetched checkpoint diff from storage for request {}, got {:?}", + id, diff + ); + diff.into_iter() + .map(|(key, value)| { + let v: Vec<_> = value + .into_iter() + .map(|v| ProofOfDelivery { + delivery_position: Some(SourceStreamPosition { + source_subnet_id: Some(v.delivery_position.subnet_id.into()), + position: *v.delivery_position.position, + certificate_id: Some(v.certificate_id.into()), + }), + readies: v + .readies + .into_iter() + .map(|(ready, signature)| SignedReady { ready, signature }) + .collect(), + threshold: v.threshold, + }) + .collect(); + CheckpointMapFieldEntry { + key: key.to_string(), + value: v, + } + }) + .collect() + } + Err(error) => { + error!( + "Error while fetching checkpoint diff for request {}: {}", + id, error + ); + Vec::new() + } }; + debug!( + "Responding to request {} with checkpoint diff containing {:?}", + id, + diff.iter() + .map(|v| (v.key.clone(), v.value.len())) + .collect::>() + ); + let response = CheckpointResponse { request_id: request.request_id, checkpoint_diff: diff, diff --git a/crates/topos/src/options/input_format.rs b/crates/topos/src/options/input_format.rs index ef03646dd..969a620bb 100644 --- a/crates/topos/src/options/input_format.rs +++ b/crates/topos/src/options/input_format.rs @@ -6,9 +6,3 @@ pub(crate) enum InputFormat { Json, Plain, } - -pub(crate) trait Parser { - type Result; - - fn parse(&self, input: T) -> Self::Result; -}