Skip to content

Commit

Permalink
chore: debug 0.0.11 synchronization (#447)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
  • Loading branch information
Freyskeyd committed Feb 8, 2024
1 parent 62126e0 commit edf86ee
Show file tree
Hide file tree
Showing 9 changed files with 262 additions and 55 deletions.
26 changes: 26 additions & 0 deletions crates/topos-tce-api/src/graphql/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use async_graphql::{Context, EmptyMutation, Object, Schema, Subscription};
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use tokio::sync::{mpsc, oneshot};
use topos_core::api::graphql::checkpoint::SourceStreamPosition;
use topos_core::api::graphql::errors::GraphQLServerError;
use topos_core::api::graphql::filter::SubnetFilter;
use topos_core::api::graphql::{
Expand Down Expand Up @@ -112,6 +113,31 @@ impl QueryRoot {
) -> Result<Certificate, GraphQLServerError> {
Self::certificate_by_id(ctx, certificate_id).await
}

/// This endpoint is used to get the current checkpoint of the source streams.
/// The checkpoint is the position of the last certificate delivered for each source stream.
async fn get_checkpoint(
&self,
ctx: &Context<'_>,
) -> Result<Vec<SourceStreamPosition>, GraphQLServerError> {
let store = ctx.data::<Arc<FullNodeStore>>().map_err(|_| {
tracing::error!("Failed to get store from context");

GraphQLServerError::ParseDataConnector
})?;

let checkpoint = store
.get_checkpoint()
.map_err(|_| GraphQLServerError::StorageError)?;

Ok(checkpoint
.iter()
.map(|(subnet_id, head)| SourceStreamPosition {
source_subnet_id: subnet_id.into(),
position: *head.position,
})
.collect())
}
}

pub struct SubscriptionRoot;
Expand Down
11 changes: 9 additions & 2 deletions crates/topos-tce-broadcast/src/double_echo/broadcast_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,13 +230,20 @@ impl BroadcastState {

fn reached_delivery_threshold(&self) -> bool {
// If reached the delivery threshold, I can deliver
match self
let delivery_threshold = match self
.subscriptions_view
.network_size
.checked_sub(self.subscriptions_view.ready.len())
{
Some(consumed) => consumed >= self.delivery_threshold,
None => false,
}
};

debug!(
"📝 Certificate {} reached Delivery threshold: {}",
&self.certificate.id, delivery_threshold
);

delivery_threshold
}
}
2 changes: 2 additions & 0 deletions crates/topos-tce-storage/src/rocks/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ where
fn iter_at<I: Serialize>(&'a self, index: &I) -> Result<Self::Iterator, InternalStorageError>;

/// Returns an Iterator over the whole CF with mode configured
#[allow(dead_code)]
fn iter_with_mode(
&'a self,
mode: IteratorMode<'_>,
Expand All @@ -29,6 +30,7 @@ where
) -> Result<Self::Iterator, InternalStorageError>;

/// Returns a prefixed Iterator over the CF starting from index
#[allow(dead_code)]
fn prefix_iter_at<P: Serialize, I: Serialize>(
&'a self,
prefix: &P,
Expand Down
123 changes: 123 additions & 0 deletions crates/topos-tce-storage/src/tests/checkpoints.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
use std::{collections::HashMap, sync::Arc};

use rstest::rstest;
use topos_core::uci::SubnetId;
use topos_test_sdk::{
certificates::create_certificate_chain,
constants::{SOURCE_SUBNET_ID_1, SOURCE_SUBNET_ID_2, TARGET_SUBNET_ID_1},
};

use super::support::store;
use crate::{
store::{ReadStore, WriteStore},
validator::ValidatorStore,
};

#[rstest]
#[tokio::test]
async fn get_checkpoint_for_two_subnets(store: Arc<ValidatorStore>) {
let certificates_a = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 32);
let certificates_b = create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_1], 24);

for cert in certificates_a {
_ = store.insert_certificate_delivered(&cert).await;
}

for cert in certificates_b {
_ = store.insert_certificate_delivered(&cert).await;
}

let checkpoint = store
.get_checkpoint()
.unwrap()
.into_iter()
.map(|(subnet, value)| (subnet, *value.position))
.collect::<HashMap<SubnetId, u64>>();

assert_eq!(checkpoint.len(), 2);
assert_eq!(*checkpoint.get(&SOURCE_SUBNET_ID_1).unwrap(), 31);
assert_eq!(*checkpoint.get(&SOURCE_SUBNET_ID_2).unwrap(), 23);
}

#[rstest]
#[tokio::test]
async fn get_checkpoint_diff_with_no_input(store: Arc<ValidatorStore>) {
let certificates_a = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 32);
let certificates_b = create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_1], 24);

for cert in certificates_a {
_ = store.insert_certificate_delivered(&cert).await;
}

for cert in certificates_b {
_ = store.insert_certificate_delivered(&cert).await;
}

let checkpoint = store
.get_checkpoint_diff(&[])
.unwrap()
.into_iter()
.map(|(subnet, proofs)| {
(
subnet,
proofs
.iter()
.map(|proof| *proof.delivery_position.position)
.collect::<Vec<_>>(),
)
})
.collect::<HashMap<SubnetId, _>>();

assert_eq!(checkpoint.len(), 2);
assert_eq!(
*checkpoint.get(&SOURCE_SUBNET_ID_1).unwrap(),
(0..=31).collect::<Vec<_>>()
);
assert_eq!(
*checkpoint.get(&SOURCE_SUBNET_ID_2).unwrap(),
(0..=23).collect::<Vec<_>>()
);
}

#[rstest]
#[tokio::test]
async fn get_checkpoint_diff_with_input(store: Arc<ValidatorStore>) {
let certificates_a = create_certificate_chain(SOURCE_SUBNET_ID_1, &[TARGET_SUBNET_ID_1], 32);
let certificates_b = create_certificate_chain(SOURCE_SUBNET_ID_2, &[TARGET_SUBNET_ID_1], 24);

let checkpoint = certificates_a.get(20).unwrap().proof_of_delivery.clone();
assert_eq!(*checkpoint.delivery_position.position, 20);

for cert in certificates_a {
_ = store.insert_certificate_delivered(&cert).await;
}

for cert in certificates_b {
_ = store.insert_certificate_delivered(&cert).await;
}

let checkpoint = store
.get_checkpoint_diff(&[checkpoint])
.unwrap()
.into_iter()
.map(|(subnet, proofs)| {
(
subnet,
proofs
.iter()
.map(|proof| *proof.delivery_position.position)
.collect::<Vec<_>>(),
)
})
.collect::<HashMap<SubnetId, _>>();

assert_eq!(checkpoint.len(), 2);
assert_eq!(
*checkpoint.get(&SOURCE_SUBNET_ID_1).unwrap(),
(21..=31).collect::<Vec<_>>()
);
assert_eq!(
*checkpoint.get(&SOURCE_SUBNET_ID_2).unwrap(),
(0..=23).collect::<Vec<_>>()
);
}
1 change: 1 addition & 0 deletions crates/topos-tce-storage/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use self::support::store;
use topos_test_sdk::certificates::create_certificate_chain;
use topos_test_sdk::constants::*;

mod checkpoints;
mod db_columns;
mod pending_certificates;
mod position;
Expand Down
40 changes: 28 additions & 12 deletions crates/topos-tce-storage/src/validator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use topos_core::{
},
uci::{Certificate, CertificateId, SubnetId, INITIAL_CERTIFICATE_ID},
};
use tracing::{debug, info, instrument};
use tracing::{debug, error, info, instrument};

use crate::{
errors::{InternalStorageError, StorageError},
Expand Down Expand Up @@ -334,16 +334,25 @@ impl ValidatorStore {
.get(certificate_id)?)
}

/// Returns the difference between the `from` list of [ProofOfDelivery] and the local head
/// checkpoint. This is used to define the list of certificates that are missing between the
/// `from` and the local head checkpoint.
/// The maximum number of [ProofOfDelivery] returned per [SubnetId] is 100.
/// If the `from` is missing a local subnet, the list of [ProofOfDelivery] for this subnet will
/// start from [Position] `0`.
pub fn get_checkpoint_diff(
&self,
from: Vec<ProofOfDelivery>,
from: &[ProofOfDelivery],
) -> Result<HashMap<SubnetId, Vec<ProofOfDelivery>>, StorageError> {
// Parse the from in order to extract the different position per subnets
let mut from_positions: HashMap<SubnetId, Vec<ProofOfDelivery>> = from
.into_iter()
.map(|v| (v.delivery_position.subnet_id, vec![v]))
let from_positions: HashMap<SubnetId, &ProofOfDelivery> = from
.iter()
.map(|v| (v.delivery_position.subnet_id, v))
.collect();

let mut output: HashMap<SubnetId, Vec<ProofOfDelivery>> =
from_positions.keys().map(|s| (*s, vec![])).collect();

// Request the local head checkpoint
let subnets: HashMap<SubnetId, Position> = self
.fullnode_store
Expand All @@ -356,24 +365,24 @@ impl ValidatorStore {
// For every local known subnets we want to iterate and check if there
// is a delta between the from_position and our head position.
for (subnet, local_position) in subnets {
let entry = from_positions.entry(subnet).or_default();

let certs: Vec<_> = if let Some(position) = entry.pop() {
let certs: Vec<_> = if let Some(position) = from_positions.get(&subnet) {
if local_position <= position.delivery_position.position {
continue;
}

self.fullnode_store
.perpetual_tables
.streams
.prefix_iter_at(&subnet, &position)?
.prefix_iter(&(&subnet, &position.delivery_position.position))?
.skip(1)
.take(100)
.map(|(_, v)| v)
.collect()
} else {
self.fullnode_store
.perpetual_tables
.streams
.prefix_iter(&subnet)?
.prefix_iter(&(&subnet, Position::ZERO))?
.take(100)
.map(|(_, v)| v)
.collect()
Expand All @@ -391,10 +400,17 @@ impl ValidatorStore {
subnet,
proofs.len()
);
entry.extend_from_slice(&proofs[..]);

if let Some(old_value) = output.insert(subnet, proofs) {
error!(
"Certificate Sync: This should not happen, we are overwriting a value during \
sync of {subnet}. Overwriting {}",
old_value.len()
);
}
}

Ok(from_positions)
Ok(output)
}

#[cfg(test)]
Expand Down
22 changes: 16 additions & 6 deletions crates/topos-tce-synchronizer/src/checkpoints_collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use topos_core::{

use topos_p2p::{error::P2PError, NetworkClient, PeerId};
use topos_tce_storage::{errors::StorageError, store::ReadStore, validator::ValidatorStore};
use tracing::{debug, error, warn};
use tracing::{debug, error, info, warn};
use uuid::Uuid;

mod config;
Expand Down Expand Up @@ -128,7 +128,7 @@ impl CheckpointSynchronizer {
&self,
peer: PeerId,
) -> Result<HashMap<SubnetId, Vec<ProofOfDelivery>>, SyncError> {
let request_id: APIUuid = Uuid::new_v4().into();
let request_id = Uuid::new_v4();

let checkpoint: Vec<grpc::tce::v1::ProofOfDelivery> = {
let certificate_ids = self
Expand All @@ -148,12 +148,16 @@ impl CheckpointSynchronizer {
.collect()
};

debug!(
"Asking {} for latest checkpoint (request_id: {}), with local checkpoint: {:?}",
peer, request_id, checkpoint
);

let req = CheckpointRequest {
request_id: Some(request_id),
request_id: Some(request_id.into()),
checkpoint,
};

debug!("Asking {} for latest checkpoint", peer);
let mut client: SynchronizerServiceClient<_> = self
.network
.new_grpc_client::<SynchronizerServiceClient<_>, SynchronizerServiceServer<SynchronizerService>>(peer)
Expand All @@ -167,6 +171,7 @@ impl CheckpointSynchronizer {
.map(|v| {
let subnet =
SubnetId::from_str(&v.key[..]).map_err(|_| SyncError::UnableToParseSubnetId)?;

let proofs = v
.value
.into_iter()
Expand All @@ -184,11 +189,14 @@ impl CheckpointSynchronizer {
diff: HashMap<SubnetId, Vec<ProofOfDelivery>>,
) -> Result<Vec<Vec<CertificateId>>, SyncError> {
let mut certs: HashSet<CertificateId> = HashSet::new();
for (_subnet, proofs) in diff {
for (subnet, proofs) in diff {
let len = proofs.len();
let unverified_certs = self.store.insert_unverified_proofs(proofs)?;

debug!("Persist {} unverified proofs", len);
debug!(
"Persist {} unverified proof of delivery for {}",
len, subnet
);
certs.extend(&unverified_certs[..]);
}

Expand Down Expand Up @@ -254,6 +262,7 @@ impl CheckpointSynchronizer {
let diff = self.ask_for_checkpoint(target_peer).await?;

let certificates_to_catchup = self.insert_unverified_proofs(diff)?;
info!("Certificates to catchup: {}", certificates_to_catchup.len());

for certificates in certificates_to_catchup {
let certificates = self.fetch_certificates(certificates).await?;
Expand All @@ -267,6 +276,7 @@ impl CheckpointSynchronizer {
let certificate_id = certificate.id;
match store.synchronize_certificate(certificate).await {
Ok(_) => debug!("Certificate {} synchronized", certificate_id),
Err(StorageError::InternalStorage(topos_tce_storage::errors::InternalStorageError::CertificateAlreadyExists)) => {}
Err(e) => error!("Failed to sync because of: {:?}", e),
}
});
Expand Down
Loading

0 comments on commit edf86ee

Please sign in to comment.