From a9b2f3e62d29c72fd7edb82991b1c7a1ce1bc29e Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 11 Jul 2024 15:36:10 +0300 Subject: [PATCH 1/7] type-safe validate --- node/src/store.rs | 13 ++- node/src/store/in_memory_store.rs | 25 ++--- node/src/store/indexed_db_store.rs | 2 +- node/src/store/redb_store.rs | 26 ++--- node/src/store/utils.rs | 172 ++++++++++++++++++++--------- node/src/syncer.rs | 10 +- node/src/test_utils.rs | 14 ++- node/tests/header_ex.rs | 6 +- node/tests/node.rs | 3 +- 9 files changed, 169 insertions(+), 102 deletions(-) diff --git a/node/src/store.rs b/node/src/store.rs index 4921961c2..e0d7e2315 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -15,7 +15,10 @@ use serde::{Deserialize, Serialize}; use thiserror::Error; pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError}; -pub use crate::store::utils::{ExtendedHeaderGeneratorExt, VerifiedExtendedHeaders}; +pub use crate::store::utils::{ + IntoValidExtendedHeadersChain, ValidExtendedHeadersChain, ValidatedExtendedHeaders, + VerifiedExtendedHeaders, +}; pub use in_memory_store::InMemoryStore; #[cfg(target_arch = "wasm32")] @@ -144,14 +147,11 @@ pub trait Store: Send + Sync + Debug { /// `Ok(None)` indicates that header is in the store but sampling metadata is not set yet. async fn get_sampling_metadata(&self, height: u64) -> Result>; - /// Insert a range of headers into the store. + /// Insert a chain of headers into the store. /// /// New insertion should pass all the constraints in [`BlockRanges::check_insertion_constraints`], /// additionaly it should be [`ExtendedHeader::verify`]ed against neighbor headers. - async fn insert(&self, headers: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>; + async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()>; /// Returns a list of header ranges currenty held in store. async fn get_stored_header_ranges(&self) -> Result; @@ -315,6 +315,7 @@ fn to_headers_range(bounds: impl RangeBounds, last_index: u64) -> Result(&self, headers: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>, - { - let headers = headers.try_into()?; + pub(crate) async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()> { + let headers = headers.into_valid_chain().await?; self.inner.write().await.insert(headers).await?; self.header_added_notifier.notify_waiters(); Ok(()) @@ -165,7 +164,7 @@ impl InMemoryStoreInner { .ok_or(StoreError::LostHash(hash)) } - async fn insert(&mut self, headers: VerifiedExtendedHeaders) -> Result<()> { + async fn insert(&mut self, headers: ValidExtendedHeadersChain) -> Result<()> { let (Some(head), Some(tail)) = (headers.as_ref().first(), headers.as_ref().last()) else { return Ok(()); }; @@ -363,12 +362,8 @@ impl Store for InMemoryStore { self.contains_height(height).await } - async fn insert(&self, header: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>, - { - self.insert(header).await + async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()> { + self.insert(headers).await } async fn update_sampling_metadata( diff --git a/node/src/store/indexed_db_store.rs b/node/src/store/indexed_db_store.rs index debff02bd..0ed0f8cc3 100644 --- a/node/src/store/indexed_db_store.rs +++ b/node/src/store/indexed_db_store.rs @@ -709,7 +709,7 @@ mod v3 { #[cfg(test)] pub mod tests { use super::*; - use crate::store::utils::ExtendedHeaderGeneratorExt; + use crate::test_utils::ExtendedHeaderGeneratorExt; use celestia_types::test_utils::ExtendedHeaderGenerator; use function_name::named; use wasm_bindgen_test::wasm_bindgen_test; diff --git a/node/src/store/redb_store.rs b/node/src/store/redb_store.rs index 0bf533979..fcc7c4c58 100644 --- a/node/src/store/redb_store.rs +++ b/node/src/store/redb_store.rs @@ -18,8 +18,9 @@ use tracing::warn; use tracing::{debug, trace}; use crate::block_ranges::BlockRanges; -use crate::store::utils::VerifiedExtendedHeaders; -use crate::store::{Result, SamplingMetadata, SamplingStatus, Store, StoreError}; +use crate::store::{ + IntoValidExtendedHeadersChain, Result, SamplingMetadata, SamplingStatus, Store, StoreError, +}; const SCHEMA_VERSION: u64 = 2; @@ -243,12 +244,8 @@ impl RedbStore { .unwrap_or(false) } - async fn insert(&self, headers: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>, - { - let headers = headers.try_into()?; + async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()> { + let headers = headers.into_valid_chain().await?; self.write_tx(move |tx| { let headers = headers.as_ref(); @@ -465,11 +462,7 @@ impl Store for RedbStore { self.contains_height(height).await } - async fn insert(&self, headers: R) -> Result<()> - where - R: TryInto + Send, - StoreError: From<>::Error>, - { + async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()> { self.insert(headers).await } @@ -678,13 +671,10 @@ fn migrate_v1_to_v2( #[cfg(test)] pub mod tests { - use crate::store::ExtendedHeaderGeneratorExt; - use super::*; - - use std::path::Path; - + use crate::test_utils::ExtendedHeaderGeneratorExt; use celestia_types::test_utils::ExtendedHeaderGenerator; + use std::path::Path; use tempfile::TempDir; #[tokio::test] diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index d50a670e6..7158b61f9 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -1,7 +1,6 @@ +use std::future::Future; use std::ops::RangeInclusive; -#[cfg(any(test, feature = "test-utils"))] -use celestia_types::test_utils::ExtendedHeaderGenerator; use celestia_types::ExtendedHeader; use crate::block_ranges::{BlockRange, BlockRangeExt}; @@ -59,98 +58,163 @@ fn get_most_recent_missing_range( penultimate_range_end + 1..=store_head_range.start().saturating_sub(1) } -/// Span of header that's been verified internally -#[derive(Clone)] +pub struct ValidatedExtendedHeaders(Vec); pub struct VerifiedExtendedHeaders(Vec); -impl IntoIterator for VerifiedExtendedHeaders { - type Item = ExtendedHeader; - type IntoIter = std::vec::IntoIter; +/// Holds a validated and verified chain of ExtendedHeader. +pub struct ValidExtendedHeadersChain(Vec); - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() +pub trait IntoValidExtendedHeadersChain: Send { + fn into_valid_chain( + self, + ) -> impl Future> + Send; +} + +impl ValidatedExtendedHeaders { + pub async fn new(headers: Vec) -> Result { + validate_headers(&headers).await?; + Ok(ValidatedExtendedHeaders(headers)) + } + + pub unsafe fn new_unchecked(headers: Vec) -> Self { + ValidatedExtendedHeaders(headers) } } -impl<'a> TryFrom<&'a [ExtendedHeader]> for VerifiedExtendedHeaders { - type Error = celestia_types::Error; +impl VerifiedExtendedHeaders { + pub fn new(headers: Vec) -> Result { + verify_headers(&headers)?; + Ok(VerifiedExtendedHeaders(headers)) + } - fn try_from(value: &'a [ExtendedHeader]) -> Result { - value.to_vec().try_into() + pub unsafe fn new_unchecked(headers: Vec) -> Self { + VerifiedExtendedHeaders(headers) } } -impl From for Vec { - fn from(value: VerifiedExtendedHeaders) -> Self { - value.0 +impl ValidExtendedHeadersChain { + pub async fn new(headers: Vec) -> Result { + verify_headers(&headers)?; + validate_headers(&headers).await?; + Ok(ValidExtendedHeadersChain(headers)) + } + + /// Create a new instance out of pre-checked vec of headers + /// + /// # Safety + /// + /// This function may produce invalid `ValidExtendedHeadersChain`, if passed range + /// is not validated manually + pub unsafe fn new_unchecked(headers: Vec) -> Self { + ValidExtendedHeadersChain(headers) + } + + pub async fn from_verified( + headers: VerifiedExtendedHeaders, + ) -> Result { + // Headers are already verified but not validated. + validate_headers(&headers.0).await?; + Ok(ValidExtendedHeadersChain(headers.0)) + } + + pub fn from_validated( + headers: ValidatedExtendedHeaders, + ) -> Result { + // Headers are already validated but not verified. + verify_headers(&headers.0)?; + Ok(ValidExtendedHeadersChain(headers.0)) } } -impl AsRef<[ExtendedHeader]> for VerifiedExtendedHeaders { +/* +impl IntoIterator for ValidExtendedHeadersChain { + type Item = &ExtendedHeader; + type IntoIter<'a> = std::slice::Iter<'a, ExtendedHeader>; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} +*/ + +impl IntoIterator for ValidExtendedHeadersChain { + type Item = ExtendedHeader; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() + } +} + +impl AsRef<[ExtendedHeader]> for ValidExtendedHeadersChain { fn as_ref(&self) -> &[ExtendedHeader] { &self.0 } } -/// 1-length hedaer span is internally verified, this is valid -impl From<[ExtendedHeader; 1]> for VerifiedExtendedHeaders { - fn from(value: [ExtendedHeader; 1]) -> Self { - Self(value.into()) +impl IntoValidExtendedHeadersChain for ValidatedExtendedHeaders { + async fn into_valid_chain(self) -> Result { + ValidExtendedHeadersChain::from_validated(self) } } -impl From for VerifiedExtendedHeaders { - fn from(value: ExtendedHeader) -> Self { - Self(vec![value]) +impl IntoValidExtendedHeadersChain for VerifiedExtendedHeaders { + async fn into_valid_chain(self) -> Result { + ValidExtendedHeadersChain::from_verified(self).await } } -impl<'a> From<&'a ExtendedHeader> for VerifiedExtendedHeaders { - fn from(value: &ExtendedHeader) -> Self { - Self(vec![value.to_owned()]) +impl IntoValidExtendedHeadersChain for ValidExtendedHeadersChain { + async fn into_valid_chain(self) -> Result { + // Headers are already verified and validated. + Ok(self) } } -impl TryFrom> for VerifiedExtendedHeaders { - type Error = celestia_types::Error; - - fn try_from(headers: Vec) -> Result { - let Some(head) = headers.first() else { - return Ok(VerifiedExtendedHeaders(Vec::default())); - }; +impl IntoValidExtendedHeadersChain for ExtendedHeader { + async fn into_valid_chain(self) -> Result { + ValidExtendedHeadersChain::new(vec![self]).await + } +} - head.verify_adjacent_range(&headers[1..])?; +impl<'a> IntoValidExtendedHeadersChain for &'a ExtendedHeader { + async fn into_valid_chain(self) -> Result { + ValidExtendedHeadersChain::new(vec![self.to_owned()]).await + } +} - Ok(Self(headers)) +impl IntoValidExtendedHeadersChain for Vec { + async fn into_valid_chain(self) -> Result { + ValidExtendedHeadersChain::new(self).await } } -impl VerifiedExtendedHeaders { - /// Create a new instance out of pre-checked vec of headers - /// - /// # Safety - /// - /// This function may produce invalid `VerifiedExtendedHeaders`, if passed range is not - /// validated manually - pub unsafe fn new_unchecked(headers: Vec) -> Self { - Self(headers) +impl<'a> IntoValidExtendedHeadersChain for &'a [ExtendedHeader] { + async fn into_valid_chain(self) -> Result { + ValidExtendedHeadersChain::new(self.into()).await } } -/// Extends test header generator for easier insertion into the store -pub trait ExtendedHeaderGeneratorExt { - /// Generate next amount verified headers - fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders; +impl IntoValidExtendedHeadersChain for [ExtendedHeader; N] { + async fn into_valid_chain(self) -> Result { + ValidExtendedHeadersChain::new(self.into()).await + } } -#[cfg(any(test, feature = "test-utils"))] -impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator { - fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders { - unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) } +impl<'a, const N: usize> IntoValidExtendedHeadersChain for &'a [ExtendedHeader; N] { + async fn into_valid_chain(self) -> Result { + ValidExtendedHeadersChain::new(self.into()).await } } -#[allow(unused)] +pub(crate) fn verify_headers(headers: &[ExtendedHeader]) -> celestia_types::Result<()> { + let Some(head) = headers.first() else { + return Ok(()); + }; + + head.verify_adjacent_range(&headers[1..]) +} + pub(crate) async fn validate_headers(headers: &[ExtendedHeader]) -> celestia_types::Result<()> { for headers in headers.chunks(VALIDATIONS_PER_YIELD) { for header in headers { diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 094a052ae..c9e4fde46 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -30,7 +30,7 @@ use crate::events::{EventPublisher, NodeEvent}; use crate::executor::{sleep, spawn, spawn_cancellable, Interval}; use crate::p2p::{P2p, P2pError}; use crate::store::utils::calculate_range_to_fetch; -use crate::store::{Store, StoreError}; +use crate::store::{Store, StoreError, ValidExtendedHeadersChain, ValidatedExtendedHeaders}; use crate::utils::OneshotSenderExt; type Result = std::result::Result; @@ -415,8 +415,9 @@ where if let Ok(store_head_height) = self.store.head_height().await { // If our new header is adjacent to the HEAD of the store if store_head_height + 1 == new_head_height { - // Header is already verified by HeaderSub and will be validated against previous - // head on insert + // HeaderSub already validated the header. + let new_head = unsafe { ValidatedExtendedHeaders::new_unchecked(vec![new_head]) }; + if self.store.insert(new_head).await.is_ok() { self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub { height: new_head_height, @@ -538,6 +539,9 @@ where } }; + // HeaderEx already validated the headers (but not verified them). + let headers = unsafe { ValidatedExtendedHeaders::new_unchecked(headers) }; + if let Err(e) = self.store.insert(headers).await { self.event_pub.send(NodeEvent::FetchingHeadersFailed { from_height, diff --git a/node/src/test_utils.rs b/node/src/test_utils.rs index 8c1122a1d..cea31cc85 100644 --- a/node/src/test_utils.rs +++ b/node/src/test_utils.rs @@ -16,7 +16,7 @@ use crate::{ node::NodeConfig, p2p::{P2pCmd, P2pError}, peer_tracker::PeerTrackerInfo, - store::{ExtendedHeaderGeneratorExt, InMemoryStore}, + store::{InMemoryStore, ValidExtendedHeadersChain}, utils::OneshotResultSender, }; @@ -27,6 +27,18 @@ pub(crate) use tokio::test as async_test; #[cfg(target_arch = "wasm32")] pub(crate) use wasm_bindgen_test::wasm_bindgen_test as async_test; +/// Extends test header generator for easier insertion into the store +pub trait ExtendedHeaderGeneratorExt { + /// Generate next amount verified headers + fn next_many_verified(&mut self, amount: u64) -> ValidExtendedHeadersChain; +} + +impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator { + fn next_many_verified(&mut self, amount: u64) -> ValidExtendedHeadersChain { + unsafe { ValidExtendedHeadersChain::new_unchecked(self.next_many(amount)) } + } +} + /// Generate a store pre-filled with headers. pub async fn gen_filled_store(amount: u64) -> (InMemoryStore, ExtendedHeaderGenerator) { let s = InMemoryStore::new(); diff --git a/node/tests/header_ex.rs b/node/tests/header_ex.rs index 80f319f96..c1abd9a42 100644 --- a/node/tests/header_ex.rs +++ b/node/tests/header_ex.rs @@ -6,7 +6,7 @@ use celestia_types::test_utils::{invalidate, unverify}; use lumina_node::{ node::{Node, NodeConfig, NodeError}, p2p::{HeaderExError, P2pError}, - store::{Store, VerifiedExtendedHeaders}, + store::{Store, ValidExtendedHeadersChain}, test_utils::{gen_filled_store, listening_test_node_config, test_node_config}, }; use tokio::time::{sleep, timeout}; @@ -254,7 +254,7 @@ async fn replaced_header_server_store() { server_headers[10] = replaced_header.clone(); server_store - .insert(unsafe { VerifiedExtendedHeaders::new_unchecked(server_headers.clone()) }) + .insert(unsafe { ValidExtendedHeadersChain::new_unchecked(server_headers.clone()) }) .await .unwrap(); @@ -373,7 +373,7 @@ async fn unverified_header_server_store() { unverify(&mut server_headers[10]); server_store - .insert(unsafe { VerifiedExtendedHeaders::new_unchecked(server_headers.clone()) }) + .insert(unsafe { ValidExtendedHeadersChain::new_unchecked(server_headers.clone()) }) .await .unwrap(); diff --git a/node/tests/node.rs b/node/tests/node.rs index 7c2fc6b39..56cc50261 100644 --- a/node/tests/node.rs +++ b/node/tests/node.rs @@ -11,9 +11,10 @@ use futures::StreamExt; use libp2p::swarm::NetworkBehaviour; use libp2p::{gossipsub, identity, noise, ping, tcp, yamux, Multiaddr, SwarmBuilder}; use lumina_node::node::{Node, NodeConfig}; -use lumina_node::store::{ExtendedHeaderGeneratorExt, InMemoryStore, Store}; +use lumina_node::store::{InMemoryStore, Store}; use lumina_node::test_utils::{ gen_filled_store, listening_test_node_config, test_node_config, test_node_config_with_keypair, + ExtendedHeaderGeneratorExt, }; use rand::Rng; use tokio::{select, spawn, sync::mpsc, time::sleep}; From 0b45e3d2e8f4793cc9936f82600e5f942ac095d3 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Thu, 11 Jul 2024 23:31:00 +0300 Subject: [PATCH 2/7] wip --- node/src/p2p.rs | 9 +- node/src/store.rs | 7 +- node/src/store/in_memory_store.rs | 2 +- node/src/store/utils.rs | 319 +++++++++++++++++++++++------- node/src/syncer.rs | 9 +- 5 files changed, 273 insertions(+), 73 deletions(-) diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 38407088e..4a991f129 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -63,7 +63,7 @@ use crate::p2p::shwap::{namespaced_data_cid, row_cid, sample_cid, ShwapMultihash use crate::p2p::swarm::new_swarm; use crate::peer_tracker::PeerTracker; use crate::peer_tracker::PeerTrackerInfo; -use crate::store::Store; +use crate::store::{Store, ValidatedExtendedHeaders}; use crate::utils::{ celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt, OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt, @@ -191,7 +191,7 @@ pub(crate) enum P2pCmd { }, HeaderExRequest { request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, }, Listeners { respond_to: oneshot::Sender>, @@ -340,7 +340,10 @@ impl P2p { } /// Send a request on the `header-ex` protocol. - pub async fn header_ex_request(&self, request: HeaderRequest) -> Result> { + pub async fn header_ex_request( + &self, + request: HeaderRequest, + ) -> Result { let (tx, rx) = oneshot::channel(); self.send_command(P2pCmd::HeaderExRequest { diff --git a/node/src/store.rs b/node/src/store.rs index e0d7e2315..b450c33d2 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -579,10 +579,13 @@ mod tests { let mut dup_header = s.get_by_height(99).await.unwrap(); dup_header.header.height = Height::from(102u32); + dup_header.commit.height = Height::from(102u32); assert!(matches!( - s.insert(dup_header).await, - Err(StoreError::HashExists(_)) + dbg!(s.insert(dup_header).await), + Err(StoreError::CelestiaTypes( + celestia_types::Error::Validation(_) + )) )); } diff --git a/node/src/store/in_memory_store.rs b/node/src/store/in_memory_store.rs index 52099d515..2f4caef19 100644 --- a/node/src/store/in_memory_store.rs +++ b/node/src/store/in_memory_store.rs @@ -87,7 +87,7 @@ impl InMemoryStore { } pub(crate) async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()> { - let headers = headers.into_valid_chain().await?; + let headers = headers.into_valid_chain().await.map_err(|e| dbg!(e))?; self.inner.write().await.insert(headers).await?; self.header_added_notifier.notify_waiters(); Ok(()) diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index 7158b61f9..c1bcde8b0 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -1,6 +1,8 @@ use std::future::Future; -use std::ops::RangeInclusive; +use std::mem; +use std::ops::{Deref, RangeInclusive}; +use celestia_proto::header::pb::ExtendedHeader; use celestia_types::ExtendedHeader; use crate::block_ranges::{BlockRange, BlockRangeExt}; @@ -58,71 +60,219 @@ fn get_most_recent_missing_range( penultimate_range_end + 1..=store_head_range.start().saturating_sub(1) } +#[repr(transparent)] +pub struct ValidatedExtendedHeader(ExtendedHeader); pub struct ValidatedExtendedHeaders(Vec); pub struct VerifiedExtendedHeaders(Vec); -/// Holds a validated and verified chain of ExtendedHeader. -pub struct ValidExtendedHeadersChain(Vec); - -pub trait IntoValidExtendedHeadersChain: Send { - fn into_valid_chain( +pub trait IntoVerifiedExtendedHeaders: Send { + fn into_verified( self, - ) -> impl Future> + Send; + ) -> impl Future> + Send; +} + +impl ValidatedExtendedHeader { + pub fn new(header: ExtendedHeader) -> celestia_types::Result { + header.validate()?; + Ok(ValidatedExtendedHeader(header)) + } + + /// Create a new instance out of pre-checked vec of headers + /// + /// # Safety + /// + /// This function may produce invalid `ValidatedExtendedHeader`, if passed range + /// is not validated manually. + pub unsafe fn new_unchecked(header: ExtendedHeader) -> Self { + ValidatedExtendedHeader(header) + } + + pub fn into_inner(self) -> Self { + self.0 + } } impl ValidatedExtendedHeaders { - pub async fn new(headers: Vec) -> Result { - validate_headers(&headers).await?; + pub async fn new(headers: Vec) -> celestia_types::Result { + validate_headers(&headers)?; Ok(ValidatedExtendedHeaders(headers)) } + /// Create a new instance out of pre-checked vec of headers + /// + /// # Safety + /// + /// This function may produce invalid `ValidatedExtendedHeaders`, if passed range + /// is not validated manually. pub unsafe fn new_unchecked(headers: Vec) -> Self { ValidatedExtendedHeaders(headers) } + + pub fn into_inner(self) -> Self { + self.0 + } } impl VerifiedExtendedHeaders { - pub fn new(headers: Vec) -> Result { + pub async fn new(headers: Vec) -> celestia_types::Result { + let headers = ValidatedExtendedHeaders::new(headers).await?; + ValidatedExtendedHeaders::from_validated(headers) + } + + pub fn from_validated( + headers: impl Into, + ) -> celestia_types::Result { + let headers = headers.into().0; verify_headers(&headers)?; Ok(VerifiedExtendedHeaders(headers)) } + /// Create a new instance out of pre-checked vec of headers + /// + /// # Safety + /// + /// This function may produce invalid `VerifiedExtendedHeaders`, if passed range + /// is not validated and verified manually. pub unsafe fn new_unchecked(headers: Vec) -> Self { VerifiedExtendedHeaders(headers) } + + pub fn into_inner(self) -> Self { + self.0 + } } -impl ValidExtendedHeadersChain { - pub async fn new(headers: Vec) -> Result { - verify_headers(&headers)?; - validate_headers(&headers).await?; - Ok(ValidExtendedHeadersChain(headers)) +impl IntoIterator for ValidatedExtendedHeader { + type Item = ExtendedHeader; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() } +} - /// Create a new instance out of pre-checked vec of headers - /// - /// # Safety - /// - /// This function may produce invalid `ValidExtendedHeadersChain`, if passed range - /// is not validated manually - pub unsafe fn new_unchecked(headers: Vec) -> Self { - ValidExtendedHeadersChain(headers) +impl IntoIterator for VerifiedExtendedHeaders { + type Item = ExtendedHeader; + type IntoIter = std::vec::IntoIter; + + fn into_iter(self) -> Self::IntoIter { + self.0.into_iter() } +} - pub async fn from_verified( - headers: VerifiedExtendedHeaders, - ) -> Result { - // Headers are already verified but not validated. - validate_headers(&headers.0).await?; - Ok(ValidExtendedHeadersChain(headers.0)) +impl Deref for ValidatedExtendedHeader { + type Target = ExtendedHeader; + + fn deref(&self) -> &Self::Target { + &self.0 } +} - pub fn from_validated( - headers: ValidatedExtendedHeaders, - ) -> Result { - // Headers are already validated but not verified. - verify_headers(&headers.0)?; - Ok(ValidExtendedHeadersChain(headers.0)) +impl Deref for ValidatedExtendedHeaders { + type Target = [ExtendedHeader]; + + fn deref(&self) -> &Self::Target { + &self.0[..] + } +} + +impl Deref for VerifiedExtendedHeaders { + type Target = [ExtendedHeader]; + + fn deref(&self) -> &Self::Target { + &self.0[..] + } +} + +impl AsRef for ValidatedExtendedHeader { + fn as_ref(&self) -> &ExtendedHeader { + &self.0 + } +} + +impl AsRef<[ExtendedHeader]> for ValidatedExtendedHeaders { + fn as_ref(&self) -> &ExtendedHeader { + &self.0[..] + } +} + +impl AsRef<[ExtendedHeader]> for VerifiedExtendedHeaders { + fn as_ref(&self) -> &ExtendedHeader { + &self.0[..] + } +} + +impl From for ExtendedHeader { + fn from(header: VerifiedExtendedHeader) -> Self { + header.0 + } +} + +impl From for Vec { + fn from(headers: ValidatedExtendedHeaders) -> Self { + headers.0 + } +} + +impl From for Vec { + fn from(headers: ValidatedExtendedHeaders) -> Self { + headers.0 + } +} + +impl From for ValidatedExtendedHeaders { + fn from(header: ValidatedExtendedHeader) -> Self { + ValidatedExtendedHeaders(vec![header.0]) + } +} + +impl<'a> From<&'a ValidatedExtendedHeader> for ValidatedExtendedHeaders { + fn from(header: &'a ValidatedExtendedHeader) -> Self { + self.to_owned().into() + } +} + +impl<'a> From<&'a mut ValidatedExtendedHeader> for ValidatedExtendedHeaders { + fn from(header: &'a mut ValidatedExtendedHeader) -> Self { + self.to_owned().into() + } +} + +impl From> for ValidatedExtendedHeaders { + fn from(headers: Vec) -> Self { + // SAFETY: It is safe to transmute because of `repr(transparent)`. + let headers = unsafe { mem::transmute(headers) }; + ValidatedExtendedHeaders(headers) + } +} + +impl<'a> From<&'a [ValidatedExtendedHeader]> for ValidatedExtendedHeaders { + fn from(headers: &'a [ValidatedExtendedHeader]) -> Self { + Vec::from(headers).into() + } +} + +impl<'a> From<&'a mut [ValidatedExtendedHeader]> for ValidatedExtendedHeaders { + fn from(headers: &'a mut [ValidatedExtendedHeader]) -> Self { + Vec::from(headers).into() + } +} + +impl<'a, const N: usize> From<[ValidatedExtendedHeader; N]> for ValidatedExtendedHeaders { + fn from(headers: [ValidatedExtendedHeader; N]) -> Self { + Vec::from(headers).into() + } +} + +impl<'a, const N: usize> From<&'a [ValidatedExtendedHeader; N]> for ValidatedExtendedHeaders { + fn from(headers: [ValidatedExtendedHeader; N]) -> Self { + Vec::from(headers).into() + } +} + +impl<'a, const N: usize> From<&'a mut [ValidatedExtendedHeader; N]> for ValidatedExtendedHeaders { + fn from(headers: [ValidatedExtendedHeader; N]) -> Self { + Vec::from(headers).into() } } @@ -135,9 +285,8 @@ impl IntoIterator for ValidExtendedHeadersChain { self.0.into_iter() } } -*/ -impl IntoIterator for ValidExtendedHeadersChain { +impl IntoIterator for VerifiedExtendedHeaders { type Item = ExtendedHeader; type IntoIter = std::vec::IntoIter; @@ -148,62 +297,92 @@ impl IntoIterator for ValidExtendedHeadersChain { impl AsRef<[ExtendedHeader]> for ValidExtendedHeadersChain { fn as_ref(&self) -> &[ExtendedHeader] { - &self.0 + unsafe { mem::transmute(self.0.as_ref()) } + } +} +*/ + +impl IntoVerifiedExtendedHeaders for VerifiedExtendedHeaders { + async fn into_verified(self) -> celestia_types::Result { + self } } -impl IntoValidExtendedHeadersChain for ValidatedExtendedHeaders { - async fn into_valid_chain(self) -> Result { - ValidExtendedHeadersChain::from_validated(self) +impl IntoVerifiedExtendedHeaders for T +where + T: Into, +{ + async fn into_verified(self) -> celestia_types::Result { + VerifiedExtendedHeaders::from_validated(self.into()) } } -impl IntoValidExtendedHeadersChain for VerifiedExtendedHeaders { - async fn into_valid_chain(self) -> Result { - ValidExtendedHeadersChain::from_verified(self).await +impl IntoVerifiedExtendedHeaders for ExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + VerifiedExtendedHeaders::new(vec![self]).await } } -impl IntoValidExtendedHeadersChain for ValidExtendedHeadersChain { - async fn into_valid_chain(self) -> Result { - // Headers are already verified and validated. - Ok(self) +impl<'a> IntoVerifiedExtendedHeaders for &'a ExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + self.to_owned().into() } } -impl IntoValidExtendedHeadersChain for ExtendedHeader { - async fn into_valid_chain(self) -> Result { - ValidExtendedHeadersChain::new(vec![self]).await +impl<'a> IntoVerifiedExtendedHeaders for &'a mut ExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + self.to_owned().into() } } -impl<'a> IntoValidExtendedHeadersChain for &'a ExtendedHeader { - async fn into_valid_chain(self) -> Result { - ValidExtendedHeadersChain::new(vec![self.to_owned()]).await +impl IntoVerifiedExtendedHeaders for Vec { + async fn into_verified(self) -> celestia_types::Result { + VerifiedExtendedHeaders::new(self).await } } -impl IntoValidExtendedHeadersChain for Vec { - async fn into_valid_chain(self) -> Result { - ValidExtendedHeadersChain::new(self).await +impl<'a> IntoVerifiedExtendedHeaders for &'a [ExtendedHeader] { + async fn into_verified(self) -> celestia_types::Result { + // Validate and verify before we convert it to `Vec` + validate_headers(self).await?; + verify_headers(self)?; + VerifiedExtendedHeaders(Vec::from(self)) } } -impl<'a> IntoValidExtendedHeadersChain for &'a [ExtendedHeader] { - async fn into_valid_chain(self) -> Result { - ValidExtendedHeadersChain::new(self.into()).await +impl<'a> IntoVerifiedExtendedHeaders for &'a mut [ExtendedHeader] { + async fn into_verified(self) -> celestia_types::Result { + // Validate and verify before we convert it to `Vec` + validate_headers(self).await?; + verify_headers(self)?; + VerifiedExtendedHeaders(Vec::from(self)) } } -impl IntoValidExtendedHeadersChain for [ExtendedHeader; N] { - async fn into_valid_chain(self) -> Result { - ValidExtendedHeadersChain::new(self.into()).await +impl IntoVerifiedExtendedHeaders for [ExtendedHeader; N] { + async fn into_verified(self) -> celestia_types::Result { + // Validate and verify before we convert it to `Vec` + validate_headers(&self).await?; + verify_headers(&self)?; + VerifiedExtendedHeaders(Vec::from(self)) } } -impl<'a, const N: usize> IntoValidExtendedHeadersChain for &'a [ExtendedHeader; N] { - async fn into_valid_chain(self) -> Result { - ValidExtendedHeadersChain::new(self.into()).await +impl<'a, const N: usize> IntoVerifiedExtendedHeaders for &'a [ExtendedHeader; N] { + async fn into_verified(self) -> celestia_types::Result { + // Validate and verify before we convert it to `Vec` + validate_headers(&self).await?; + verify_headers(&self)?; + VerifiedExtendedHeaders(Vec::from(self)) + } +} + +impl<'a, const N: usize> IntoVerifiedExtendedHeaders for &'a mut [ExtendedHeader; N] { + async fn into_verified(self) -> celestia_types::Result { + // Validate and verify before we convert it to `Vec` + validate_headers(&self).await?; + verify_headers(&self)?; + VerifiedExtendedHeaders(Vec::from(self)) } } @@ -228,6 +407,14 @@ pub(crate) async fn validate_headers(headers: &[ExtendedHeader]) -> celestia_typ Ok(()) } +pub(crate) async fn validate_headers_vec( + headers: Vec, +) -> celestia_types::Result> { + validate_headers_slice(&headers).await?; + // SAFETY: It is safe to transmute because of `repr(transparent)`. + Ok(unsafe { mem::transmute(headers) }) +} + #[cfg(test)] mod tests { use super::*; diff --git a/node/src/syncer.rs b/node/src/syncer.rs index c9e4fde46..68f6753a0 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -592,6 +592,7 @@ mod tests { use crate::block_ranges::{BlockRange, BlockRangeExt}; use crate::events::EventChannel; use crate::p2p::header_session; + use crate::store::utils::validate_headers; use crate::store::InMemoryStore; use crate::test_utils::{async_test, gen_filled_store, MockP2pHandle}; use celestia_types::test_utils::ExtendedHeaderGenerator; @@ -888,6 +889,8 @@ mod tests { let headers = ExtendedHeaderGenerator::new().next_many(20); let headers_prime = ExtendedHeaderGenerator::new().next_many(20); + //validate_headers(&headers).await.unwrap(); + // Start Syncer and report last header as network head let (syncer, store, mut p2p_mock) = initialized_syncer(headers[19].clone()).await; @@ -895,13 +898,17 @@ mod tests { handle_session_batch(&mut p2p_mock, &headers_prime, 1..=19, true).await; // Syncer should not apply headers from invalid response + dbg!(1); assert_syncing(&syncer, &store, &[20..=20], 20).await; + dbg!(2); // Syncer requests missing headers again handle_session_batch(&mut p2p_mock, &headers, 1..=19, true).await; + dbg!(3); // With a correct resposne, syncer should update the store assert_syncing(&syncer, &store, &[1..=20], 20).await; + dbg!(4); } async fn assert_syncing( @@ -912,7 +919,7 @@ mod tests { ) { // Syncer receives responds on the same loop that receive other events. // Wait a bit to be processed. - sleep(Duration::from_millis(1)).await; + sleep(Duration::from_millis(10)).await; let store_ranges = store.get_stored_header_ranges().await.unwrap(); let syncing_info = syncer.info().await.unwrap(); From 3d456e44f58288091084fd3fb424743901345f48 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 12 Jul 2024 10:32:06 +0300 Subject: [PATCH 3/7] wip --- node/src/p2p.rs | 7 +- node/src/store.rs | 5 +- node/src/store/in_memory_store.rs | 12 ++-- node/src/store/redb_store.rs | 8 +-- node/src/store/utils.rs | 112 +++++++++++++++--------------- node/src/syncer.rs | 2 +- 6 files changed, 72 insertions(+), 74 deletions(-) diff --git a/node/src/p2p.rs b/node/src/p2p.rs index 4a991f129..f6db62727 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -191,7 +191,7 @@ pub(crate) enum P2pCmd { }, HeaderExRequest { request: HeaderRequest, - respond_to: OneshotResultSender, + respond_to: OneshotResultSender, P2pError>, }, Listeners { respond_to: oneshot::Sender>, @@ -340,10 +340,7 @@ impl P2p { } /// Send a request on the `header-ex` protocol. - pub async fn header_ex_request( - &self, - request: HeaderRequest, - ) -> Result { + pub async fn header_ex_request(&self, request: HeaderRequest) -> Result> { let (tx, rx) = oneshot::channel(); self.send_command(P2pCmd::HeaderExRequest { diff --git a/node/src/store.rs b/node/src/store.rs index b450c33d2..17aa6dfd0 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -16,8 +16,7 @@ use thiserror::Error; pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError}; pub use crate::store::utils::{ - IntoValidExtendedHeadersChain, ValidExtendedHeadersChain, ValidatedExtendedHeaders, - VerifiedExtendedHeaders, + IntoVerifiedExtendedHeaders, ValidatedExtendedHeaders, VerifiedExtendedHeaders, }; pub use in_memory_store::InMemoryStore; @@ -151,7 +150,7 @@ pub trait Store: Send + Sync + Debug { /// /// New insertion should pass all the constraints in [`BlockRanges::check_insertion_constraints`], /// additionaly it should be [`ExtendedHeader::verify`]ed against neighbor headers. - async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()>; + async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()>; /// Returns a list of header ranges currenty held in store. async fn get_stored_header_ranges(&self) -> Result; diff --git a/node/src/store/in_memory_store.rs b/node/src/store/in_memory_store.rs index 2f4caef19..8d24886fa 100644 --- a/node/src/store/in_memory_store.rs +++ b/node/src/store/in_memory_store.rs @@ -11,10 +11,10 @@ use tracing::debug; use crate::block_ranges::BlockRanges; use crate::store::{ - IntoValidExtendedHeadersChain, Result, SamplingMetadata, SamplingStatus, Store, StoreError, + IntoVerifiedExtendedHeaders, Result, SamplingMetadata, SamplingStatus, Store, StoreError, }; -use super::utils::ValidExtendedHeadersChain; +use super::VerifiedExtendedHeaders; /// A non-persistent in memory [`Store`] implementation. #[derive(Debug)] @@ -86,8 +86,8 @@ impl InMemoryStore { self.inner.read().await.get_by_height(height) } - pub(crate) async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()> { - let headers = headers.into_valid_chain().await.map_err(|e| dbg!(e))?; + pub(crate) async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()> { + let headers = headers.into_verified().await?; self.inner.write().await.insert(headers).await?; self.header_added_notifier.notify_waiters(); Ok(()) @@ -164,7 +164,7 @@ impl InMemoryStoreInner { .ok_or(StoreError::LostHash(hash)) } - async fn insert(&mut self, headers: ValidExtendedHeadersChain) -> Result<()> { + async fn insert(&mut self, headers: VerifiedExtendedHeaders) -> Result<()> { let (Some(head), Some(tail)) = (headers.as_ref().first(), headers.as_ref().last()) else { return Ok(()); }; @@ -362,7 +362,7 @@ impl Store for InMemoryStore { self.contains_height(height).await } - async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()> { + async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()> { self.insert(headers).await } diff --git a/node/src/store/redb_store.rs b/node/src/store/redb_store.rs index fcc7c4c58..4846c4f6a 100644 --- a/node/src/store/redb_store.rs +++ b/node/src/store/redb_store.rs @@ -19,7 +19,7 @@ use tracing::{debug, trace}; use crate::block_ranges::BlockRanges; use crate::store::{ - IntoValidExtendedHeadersChain, Result, SamplingMetadata, SamplingStatus, Store, StoreError, + IntoVerifiedExtendedHeaders, Result, SamplingMetadata, SamplingStatus, Store, StoreError, }; const SCHEMA_VERSION: u64 = 2; @@ -244,8 +244,8 @@ impl RedbStore { .unwrap_or(false) } - async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()> { - let headers = headers.into_valid_chain().await?; + async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()> { + let headers = headers.into_verified().await?; self.write_tx(move |tx| { let headers = headers.as_ref(); @@ -462,7 +462,7 @@ impl Store for RedbStore { self.contains_height(height).await } - async fn insert(&self, headers: impl IntoValidExtendedHeadersChain) -> Result<()> { + async fn insert(&self, headers: impl IntoVerifiedExtendedHeaders) -> Result<()> { self.insert(headers).await } diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index c1bcde8b0..218782597 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -2,7 +2,6 @@ use std::future::Future; use std::mem; use std::ops::{Deref, RangeInclusive}; -use celestia_proto::header::pb::ExtendedHeader; use celestia_types::ExtendedHeader; use crate::block_ranges::{BlockRange, BlockRangeExt}; @@ -61,8 +60,13 @@ fn get_most_recent_missing_range( } #[repr(transparent)] +#[derive(Clone, Debug)] pub struct ValidatedExtendedHeader(ExtendedHeader); + +#[derive(Clone, Debug)] pub struct ValidatedExtendedHeaders(Vec); + +#[derive(Clone, Debug)] pub struct VerifiedExtendedHeaders(Vec); pub trait IntoVerifiedExtendedHeaders: Send { @@ -87,14 +91,14 @@ impl ValidatedExtendedHeader { ValidatedExtendedHeader(header) } - pub fn into_inner(self) -> Self { + pub fn into_inner(self) -> ExtendedHeader { self.0 } } impl ValidatedExtendedHeaders { pub async fn new(headers: Vec) -> celestia_types::Result { - validate_headers(&headers)?; + validate_headers(&headers).await?; Ok(ValidatedExtendedHeaders(headers)) } @@ -108,7 +112,7 @@ impl ValidatedExtendedHeaders { ValidatedExtendedHeaders(headers) } - pub fn into_inner(self) -> Self { + pub fn into_inner(self) -> Vec { self.0 } } @@ -116,15 +120,12 @@ impl ValidatedExtendedHeaders { impl VerifiedExtendedHeaders { pub async fn new(headers: Vec) -> celestia_types::Result { let headers = ValidatedExtendedHeaders::new(headers).await?; - ValidatedExtendedHeaders::from_validated(headers) + VerifiedExtendedHeaders::from_validated(headers) } - pub fn from_validated( - headers: impl Into, - ) -> celestia_types::Result { - let headers = headers.into().0; - verify_headers(&headers)?; - Ok(VerifiedExtendedHeaders(headers)) + pub fn from_validated(headers: ValidatedExtendedHeaders) -> celestia_types::Result { + verify_headers(&headers.0)?; + Ok(VerifiedExtendedHeaders(headers.0)) } /// Create a new instance out of pre-checked vec of headers @@ -137,12 +138,12 @@ impl VerifiedExtendedHeaders { VerifiedExtendedHeaders(headers) } - pub fn into_inner(self) -> Self { + pub fn into_inner(self) -> Vec { self.0 } } -impl IntoIterator for ValidatedExtendedHeader { +impl IntoIterator for ValidatedExtendedHeaders { type Item = ExtendedHeader; type IntoIter = std::vec::IntoIter; @@ -191,19 +192,19 @@ impl AsRef for ValidatedExtendedHeader { } impl AsRef<[ExtendedHeader]> for ValidatedExtendedHeaders { - fn as_ref(&self) -> &ExtendedHeader { + fn as_ref(&self) -> &[ExtendedHeader] { &self.0[..] } } impl AsRef<[ExtendedHeader]> for VerifiedExtendedHeaders { - fn as_ref(&self) -> &ExtendedHeader { + fn as_ref(&self) -> &[ExtendedHeader] { &self.0[..] } } -impl From for ExtendedHeader { - fn from(header: VerifiedExtendedHeader) -> Self { +impl From for ExtendedHeader { + fn from(header: ValidatedExtendedHeader) -> Self { header.0 } } @@ -215,7 +216,7 @@ impl From for Vec { } impl From for Vec { - fn from(headers: ValidatedExtendedHeaders) -> Self { + fn from(headers: VerifiedExtendedHeaders) -> Self { headers.0 } } @@ -228,13 +229,13 @@ impl From for ValidatedExtendedHeaders { impl<'a> From<&'a ValidatedExtendedHeader> for ValidatedExtendedHeaders { fn from(header: &'a ValidatedExtendedHeader) -> Self { - self.to_owned().into() + header.to_owned().into() } } impl<'a> From<&'a mut ValidatedExtendedHeader> for ValidatedExtendedHeaders { fn from(header: &'a mut ValidatedExtendedHeader) -> Self { - self.to_owned().into() + header.to_owned().into() } } @@ -265,13 +266,13 @@ impl<'a, const N: usize> From<[ValidatedExtendedHeader; N]> for ValidatedExtende } impl<'a, const N: usize> From<&'a [ValidatedExtendedHeader; N]> for ValidatedExtendedHeaders { - fn from(headers: [ValidatedExtendedHeader; N]) -> Self { + fn from(headers: &'a [ValidatedExtendedHeader; N]) -> Self { Vec::from(headers).into() } } impl<'a, const N: usize> From<&'a mut [ValidatedExtendedHeader; N]> for ValidatedExtendedHeaders { - fn from(headers: [ValidatedExtendedHeader; N]) -> Self { + fn from(headers: &'a mut [ValidatedExtendedHeader; N]) -> Self { Vec::from(headers).into() } } @@ -304,34 +305,48 @@ impl AsRef<[ExtendedHeader]> for ValidExtendedHeadersChain { impl IntoVerifiedExtendedHeaders for VerifiedExtendedHeaders { async fn into_verified(self) -> celestia_types::Result { - self + Ok(self) } } -impl IntoVerifiedExtendedHeaders for T -where - T: Into, -{ +impl IntoVerifiedExtendedHeaders for ValidatedExtendedHeaders { async fn into_verified(self) -> celestia_types::Result { - VerifiedExtendedHeaders::from_validated(self.into()) + VerifiedExtendedHeaders::from_validated(self) + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a ValidatedExtendedHeaders { + async fn into_verified(self) -> celestia_types::Result { + verify_headers(&self.0[..])?; + Ok(VerifiedExtendedHeaders(self.0.to_owned())) + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a mut ValidatedExtendedHeaders { + async fn into_verified(self) -> celestia_types::Result { + verify_headers(&self.0[..])?; + Ok(VerifiedExtendedHeaders(self.0.to_owned())) } } impl IntoVerifiedExtendedHeaders for ExtendedHeader { async fn into_verified(self) -> celestia_types::Result { - VerifiedExtendedHeaders::new(vec![self]).await + self.validate()?; + Ok(VerifiedExtendedHeaders(vec![self])) } } impl<'a> IntoVerifiedExtendedHeaders for &'a ExtendedHeader { async fn into_verified(self) -> celestia_types::Result { - self.to_owned().into() + self.validate()?; + Ok(VerifiedExtendedHeaders(vec![self.to_owned()])) } } impl<'a> IntoVerifiedExtendedHeaders for &'a mut ExtendedHeader { async fn into_verified(self) -> celestia_types::Result { - self.to_owned().into() + self.validate()?; + Ok(VerifiedExtendedHeaders(vec![self.to_owned()])) } } @@ -343,46 +358,41 @@ impl IntoVerifiedExtendedHeaders for Vec { impl<'a> IntoVerifiedExtendedHeaders for &'a [ExtendedHeader] { async fn into_verified(self) -> celestia_types::Result { - // Validate and verify before we convert it to `Vec` validate_headers(self).await?; verify_headers(self)?; - VerifiedExtendedHeaders(Vec::from(self)) + Ok(VerifiedExtendedHeaders(Vec::from(self))) } } impl<'a> IntoVerifiedExtendedHeaders for &'a mut [ExtendedHeader] { async fn into_verified(self) -> celestia_types::Result { - // Validate and verify before we convert it to `Vec` validate_headers(self).await?; verify_headers(self)?; - VerifiedExtendedHeaders(Vec::from(self)) + Ok(VerifiedExtendedHeaders(Vec::from(self))) } } impl IntoVerifiedExtendedHeaders for [ExtendedHeader; N] { async fn into_verified(self) -> celestia_types::Result { - // Validate and verify before we convert it to `Vec` - validate_headers(&self).await?; - verify_headers(&self)?; - VerifiedExtendedHeaders(Vec::from(self)) + validate_headers(&self[..]).await?; + verify_headers(&self[..])?; + Ok(VerifiedExtendedHeaders(Vec::from(self))) } } impl<'a, const N: usize> IntoVerifiedExtendedHeaders for &'a [ExtendedHeader; N] { async fn into_verified(self) -> celestia_types::Result { - // Validate and verify before we convert it to `Vec` - validate_headers(&self).await?; - verify_headers(&self)?; - VerifiedExtendedHeaders(Vec::from(self)) + validate_headers(&self[..]).await?; + verify_headers(&self[..])?; + Ok(VerifiedExtendedHeaders(Vec::from(self))) } } impl<'a, const N: usize> IntoVerifiedExtendedHeaders for &'a mut [ExtendedHeader; N] { async fn into_verified(self) -> celestia_types::Result { - // Validate and verify before we convert it to `Vec` - validate_headers(&self).await?; - verify_headers(&self)?; - VerifiedExtendedHeaders(Vec::from(self)) + validate_headers(&self[..]).await?; + verify_headers(&self[..])?; + Ok(VerifiedExtendedHeaders(Vec::from(self))) } } @@ -407,14 +417,6 @@ pub(crate) async fn validate_headers(headers: &[ExtendedHeader]) -> celestia_typ Ok(()) } -pub(crate) async fn validate_headers_vec( - headers: Vec, -) -> celestia_types::Result> { - validate_headers_slice(&headers).await?; - // SAFETY: It is safe to transmute because of `repr(transparent)`. - Ok(unsafe { mem::transmute(headers) }) -} - #[cfg(test)] mod tests { use super::*; diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 68f6753a0..3fefc6efb 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -30,7 +30,7 @@ use crate::events::{EventPublisher, NodeEvent}; use crate::executor::{sleep, spawn, spawn_cancellable, Interval}; use crate::p2p::{P2p, P2pError}; use crate::store::utils::calculate_range_to_fetch; -use crate::store::{Store, StoreError, ValidExtendedHeadersChain, ValidatedExtendedHeaders}; +use crate::store::{Store, StoreError, ValidatedExtendedHeaders}; use crate::utils::OneshotSenderExt; type Result = std::result::Result; From 917ba9b282bb86931112d51e00b7dcf8505239f7 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 12 Jul 2024 10:34:50 +0300 Subject: [PATCH 4/7] wip --- node/src/store/utils.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index 218782597..894f073e8 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -115,6 +115,11 @@ impl ValidatedExtendedHeaders { pub fn into_inner(self) -> Vec { self.0 } + + pub fn into_validated_vec(self) -> Vec { + // SAFETY: It is safe to transmute because of `repr(transparent)`. + unsafe { mem::transmute(self.0) } + } } impl VerifiedExtendedHeaders { From 66da34d92c5e7a1b05f991f568ab228c5b2c2b06 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 12 Jul 2024 11:53:03 +0300 Subject: [PATCH 5/7] wip --- node/src/node.rs | 18 +++++--- node/src/p2p.rs | 72 ++++++++++++++++++-------------- node/src/p2p/header_ex.rs | 5 +-- node/src/p2p/header_ex/client.rs | 21 +++++----- node/src/p2p/header_ex/utils.rs | 12 ++++-- node/src/p2p/header_session.rs | 11 ++--- node/src/store.rs | 3 +- node/src/store/utils.rs | 43 +++++++++++++++++++ node/src/syncer.rs | 14 ++----- types/src/extended_header.rs | 12 ++++++ 10 files changed, 140 insertions(+), 71 deletions(-) diff --git a/node/src/node.rs b/node/src/node.rs index 0ce5a1be8..deb6be272 100644 --- a/node/src/node.rs +++ b/node/src/node.rs @@ -224,17 +224,17 @@ where /// Request the head header from the network. pub async fn request_head_header(&self) -> Result { - Ok(self.p2p.get_head_header().await?) + Ok(self.p2p.get_head_header().await?.into()) } /// Request a header for the block with a given hash from the network. pub async fn request_header_by_hash(&self, hash: &Hash) -> Result { - Ok(self.p2p.get_header(*hash).await?) + Ok(self.p2p.get_header(*hash).await?.into()) } /// Request a header for the block with a given height from the network. pub async fn request_header_by_height(&self, hash: u64) -> Result { - Ok(self.p2p.get_header_by_height(hash).await?) + Ok(self.p2p.get_header_by_height(hash).await?.into()) } /// Request headers in range (from, from + amount] from the network. @@ -245,7 +245,11 @@ where from: &ExtendedHeader, amount: u64, ) -> Result> { - Ok(self.p2p.get_verified_headers_range(from, amount).await?) + Ok(self + .p2p + .get_verified_headers_range(from, amount) + .await? + .into()) } /// Request a verified [`Row`] from the network. @@ -301,7 +305,11 @@ where /// Get the latest header announced in the network. pub fn get_network_head_header(&self) -> Option { - self.p2p.header_sub_watcher().borrow().clone() + self.p2p + .header_sub_watcher() + .borrow() + .clone() + .map(|h| h.into()) } /// Get the latest locally synced header. diff --git a/node/src/p2p.rs b/node/src/p2p.rs index f6db62727..20e38f5d8 100644 --- a/node/src/p2p.rs +++ b/node/src/p2p.rs @@ -63,7 +63,8 @@ use crate::p2p::shwap::{namespaced_data_cid, row_cid, sample_cid, ShwapMultihash use crate::p2p::swarm::new_swarm; use crate::peer_tracker::PeerTracker; use crate::peer_tracker::PeerTrackerInfo; -use crate::store::{Store, ValidatedExtendedHeaders}; +use crate::store::utils::ValidatedExtendedHeader; +use crate::store::{Store, ValidatedExtendedHeaders, VerifiedExtendedHeaders}; use crate::utils::{ celestia_protocol_id, fraudsub_ident_topic, gossipsub_ident_topic, MultiaddrExt, OneshotResultSender, OneshotResultSenderExt, OneshotSenderExt, @@ -157,7 +158,7 @@ impl From for P2pError { #[derive(Debug)] pub struct P2p { cmd_tx: mpsc::Sender, - header_sub_watcher: watch::Receiver>, + header_sub_watcher: watch::Receiver>, peer_tracker_info_watcher: watch::Receiver, local_peer_id: PeerId, } @@ -191,7 +192,7 @@ pub(crate) enum P2pCmd { }, HeaderExRequest { request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, }, Listeners { respond_to: oneshot::Sender>, @@ -200,7 +201,7 @@ pub(crate) enum P2pCmd { respond_to: oneshot::Sender>, }, InitHeaderSub { - head: Box, + head: Box, }, SetPeerTrust { peer_id: PeerId, @@ -289,7 +290,7 @@ impl P2p { } /// Watcher for the latest verified network head headers announced on `header-sub`. - pub fn header_sub_watcher(&self) -> watch::Receiver> { + pub fn header_sub_watcher(&self) -> watch::Receiver> { self.header_sub_watcher.clone() } @@ -304,7 +305,7 @@ impl P2p { } /// Initializes `header-sub` protocol with a given `subjective_head`. - pub async fn init_header_sub(&self, head: ExtendedHeader) -> Result<()> { + pub async fn init_header_sub(&self, head: ValidatedExtendedHeader) -> Result<()> { self.send_command(P2pCmd::InitHeaderSub { head: Box::new(head), }) @@ -340,7 +341,10 @@ impl P2p { } /// Send a request on the `header-ex` protocol. - pub async fn header_ex_request(&self, request: HeaderRequest) -> Result> { + pub async fn header_ex_request( + &self, + request: HeaderRequest, + ) -> Result { let (tx, rx) = oneshot::channel(); self.send_command(P2pCmd::HeaderExRequest { @@ -353,35 +357,37 @@ impl P2p { } /// Request the head header on the `header-ex` protocol. - pub async fn get_head_header(&self) -> Result { + pub async fn get_head_header(&self) -> Result { self.get_header_by_height(0).await } /// Request the header by hash on the `header-ex` protocol. - pub async fn get_header(&self, hash: Hash) -> Result { + pub async fn get_header(&self, hash: Hash) -> Result { self.header_ex_request(HeaderRequest { data: Some(header_request::Data::Hash(hash.as_bytes().to_vec())), amount: 1, }) .await? + .into_validated_vec() .into_iter() .next() .ok_or(HeaderExError::HeaderNotFound.into()) } /// Request the header by height on the `header-ex` protocol. - pub async fn get_header_by_height(&self, height: u64) -> Result { + pub async fn get_header_by_height(&self, height: u64) -> Result { self.header_ex_request(HeaderRequest { data: Some(header_request::Data::Origin(height)), amount: 1, }) .await? + .into_validated_vec() .into_iter() .next() .ok_or(HeaderExError::HeaderNotFound.into()) } - /// Request the headers following the one given with the `header-ex` protocol. + /// Request the headers followingValidatedExtendedHeader the one given with the `header-ex` protocol. /// /// First header from the requested range will be verified against the provided one, /// then each subsequent is verified against the previous one. @@ -389,22 +395,26 @@ impl P2p { &self, from: &ExtendedHeader, amount: u64, - ) -> Result> { + ) -> Result { + if amount == 0 { + return Err(HeaderExError::InvalidRequest.into()); + } + // User can give us a bad header, so validate it. from.validate().map_err(|_| HeaderExError::InvalidRequest)?; let height = from.height().value() + 1; - let range = height..=height + amount - 1; let mut session = HeaderSession::new(range, self.cmd_tx.clone()); let headers = session.run().await?; - // `.validate()` is called on each header separately by `HeaderExClientHandler`. - // - // The last step is to verify that all headers are from the same chain - // and indeed connected with the next one. - from.verify_adjacent_range(&headers) + // Verify the first header, the rest will be verified + // in `VerifiedExtendedHeaders::from_validated`. + from.verify_adjacent(&headers[0]) + .map_err(|_| HeaderExError::InvalidResponse)?; + + let headers = VerifiedExtendedHeaders::from_validated(headers) .map_err(|_| HeaderExError::InvalidResponse)?; Ok(headers) @@ -417,7 +427,7 @@ impl P2p { pub(crate) async fn get_unverified_header_range( &self, range: BlockRange, - ) -> Result> { + ) -> Result { if range.is_empty() { return Err(HeaderExError::InvalidRequest.into()); } @@ -425,13 +435,6 @@ impl P2p { let mut session = HeaderSession::new(range, self.cmd_tx.clone()); let headers = session.run().await?; - let Some(head) = headers.first() else { - return Err(HeaderExError::InvalidResponse.into()); - }; - - head.verify_adjacent_range(&headers[1..]) - .map_err(|_| HeaderExError::InvalidResponse)?; - Ok(headers) } @@ -565,7 +568,7 @@ where bad_encoding_fraud_sub_topic: TopicHash, cmd_rx: mpsc::Receiver, peer_tracker: Arc, - header_sub_watcher: watch::Sender>, + header_sub_watcher: watch::Sender>, bitswap_queries: HashMap, P2pError>>, network_compromised_token: CancellationToken, store: Arc, @@ -579,7 +582,7 @@ where fn new( args: P2pArgs, cmd_rx: mpsc::Receiver, - header_sub_watcher: watch::Sender>, + header_sub_watcher: watch::Sender>, peer_tracker: Arc, ) -> Result { let local_peer_id = PeerId::from(args.local_keypair.public()); @@ -948,15 +951,20 @@ where } #[instrument(skip_all, fields(header = %head))] - fn on_init_header_sub(&mut self, head: ExtendedHeader) { + fn on_init_header_sub(&mut self, head: ValidatedExtendedHeader) { self.header_sub_watcher.send_replace(Some(head)); trace!("HeaderSub initialized"); } #[instrument(skip_all)] async fn on_header_sub_message(&mut self, data: &[u8]) -> gossipsub::MessageAcceptance { - let Ok(header) = ExtendedHeader::decode_and_validate(data) else { - trace!("Malformed or invalid header from header-sub"); + let Ok(header) = ExtendedHeader::decode(data) else { + trace!("Malformed header from header-sub"); + return gossipsub::MessageAcceptance::Reject; + }; + + let Ok(header) = ValidatedExtendedHeader::new(header) else { + trace!("Invalid header from header-sub"); return gossipsub::MessageAcceptance::Reject; }; @@ -1142,6 +1150,6 @@ where .build()) } -fn network_head_height(watcher: &watch::Sender>) -> Option { +fn network_head_height(watcher: &watch::Sender>) -> Option { watcher.borrow().as_ref().map(|header| header.height()) } diff --git a/node/src/p2p/header_ex.rs b/node/src/p2p/header_ex.rs index 79b18d1f7..34cd33a60 100644 --- a/node/src/p2p/header_ex.rs +++ b/node/src/p2p/header_ex.rs @@ -4,7 +4,6 @@ use std::task::{Context, Poll}; use async_trait::async_trait; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; -use celestia_types::ExtendedHeader; use futures::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}; use libp2p::{ core::Endpoint, @@ -29,7 +28,7 @@ use crate::p2p::header_ex::client::HeaderExClientHandler; use crate::p2p::header_ex::server::HeaderExServerHandler; use crate::p2p::P2pError; use crate::peer_tracker::PeerTracker; -use crate::store::Store; +use crate::store::{Store, ValidatedExtendedHeaders}; use crate::utils::{protocol_id, OneshotResultSender}; /// Size limit of a request in bytes @@ -111,7 +110,7 @@ where pub(crate) fn send_request( &mut self, request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, ) { self.client_handler .on_send_request(&mut self.req_resp, request, respond_to); diff --git a/node/src/p2p/header_ex/client.rs b/node/src/p2p/header_ex/client.rs index bdf215adc..fc3d3ba8d 100644 --- a/node/src/p2p/header_ex/client.rs +++ b/node/src/p2p/header_ex/client.rs @@ -7,7 +7,6 @@ use std::task::{Context, Poll}; use celestia_proto::p2p::pb::header_request::Data; use celestia_proto::p2p::pb::{HeaderRequest, HeaderResponse}; -use celestia_types::ExtendedHeader; use futures::future::join_all; use libp2p::request_response::{OutboundFailure, OutboundRequestId}; use libp2p::PeerId; @@ -19,7 +18,7 @@ use crate::p2p::header_ex::utils::{HeaderRequestExt, HeaderResponseExt}; use crate::p2p::header_ex::{HeaderExError, ReqRespBehaviour}; use crate::p2p::P2pError; use crate::peer_tracker::PeerTracker; -use crate::store::utils::VALIDATIONS_PER_YIELD; +use crate::store::utils::{ValidatedExtendedHeaders, VALIDATIONS_PER_YIELD}; use crate::utils::{OneshotResultSender, OneshotResultSenderExt}; const MAX_PEERS: usize = 10; @@ -34,7 +33,7 @@ where struct State { request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, } pub(super) trait RequestSender { @@ -67,7 +66,7 @@ where &mut self, sender: &mut S, request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, ) { if !request.is_valid() { respond_to.maybe_send_err(HeaderExError::InvalidRequest); @@ -87,7 +86,7 @@ where &mut self, sender: &mut S, request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, ) { // Validate amount if usize::try_from(request.amount).is_err() { @@ -113,7 +112,7 @@ where &mut self, sender: &mut S, request: HeaderRequest, - respond_to: OneshotResultSender, P2pError>, + respond_to: OneshotResultSender, ) { const MIN_HEAD_RESPONSES: usize = 2; @@ -149,7 +148,7 @@ where .into_iter() // In case of HEAD all responses have only 1 header. // This was already enforced by `decode_and_verify_responses`. - .filter_map(|v| v.ok()?.ok()?.into_iter().next()) + .filter_map(|v| v.ok()?.ok()?.into_validated_vec().into_iter().next()) .collect(); let mut counter: HashMap<_, usize> = HashMap::new(); @@ -173,14 +172,14 @@ where // Return the header with the highest height that was received by at least 2 peers for resp in &resps { if counter[&resp.hash()] >= MIN_HEAD_RESPONSES { - respond_to.maybe_send_ok(vec![resp.to_owned()]); + respond_to.maybe_send_ok(resp.into()); return; } } // Otherwise return the header with the maximum height let resp = resps.into_iter().next().expect("no reposnes"); - respond_to.maybe_send_ok(vec![resp]); + respond_to.maybe_send_ok(resp.into()); }); } @@ -238,7 +237,7 @@ where async fn decode_and_verify_responses( request: &HeaderRequest, responses: &[HeaderResponse], -) -> Result, HeaderExError> { +) -> Result { if responses.is_empty() { return Err(HeaderExError::InvalidResponse); } @@ -298,7 +297,7 @@ async fn decode_and_verify_responses( _ => return Err(HeaderExError::InvalidResponse), } - Ok(headers) + Ok(headers.into()) } #[cfg(test)] diff --git a/node/src/p2p/header_ex/utils.rs b/node/src/p2p/header_ex/utils.rs index eedf8e9a1..2ab517997 100644 --- a/node/src/p2p/header_ex/utils.rs +++ b/node/src/p2p/header_ex/utils.rs @@ -6,6 +6,7 @@ use celestia_types::hash::Hash; use celestia_types::ExtendedHeader; use crate::p2p::header_ex::HeaderExError; +use crate::store::utils::ValidatedExtendedHeader; pub(crate) trait HeaderRequestExt { fn with_origin(origin: u64, amount: u64) -> HeaderRequest; @@ -51,18 +52,21 @@ impl HeaderRequestExt for HeaderRequest { } pub(super) trait HeaderResponseExt { - fn to_validated_extented_header(&self) -> Result; + fn to_validated_extented_header(&self) -> Result; fn not_found() -> HeaderResponse; fn invalid() -> HeaderResponse; } impl HeaderResponseExt for HeaderResponse { - fn to_validated_extented_header(&self) -> Result { + fn to_validated_extented_header(&self) -> Result { match self.status_code() { StatusCode::Invalid => Err(HeaderExError::InvalidResponse), StatusCode::NotFound => Err(HeaderExError::HeaderNotFound), - StatusCode::Ok => ExtendedHeader::decode_and_validate(&self.body[..]) - .map_err(|_| HeaderExError::InvalidResponse), + StatusCode::Ok => { + let header = ExtendedHeader::decode(&self.body[..]) + .map_err(|_| HeaderExError::InvalidResponse)?; + ValidatedExtendedHeader::new(header).map_err(|_| HeaderExError::InvalidResponse) + } } } diff --git a/node/src/p2p/header_session.rs b/node/src/p2p/header_session.rs index 5b3c4d15d..53e938d6b 100644 --- a/node/src/p2p/header_session.rs +++ b/node/src/p2p/header_session.rs @@ -1,5 +1,4 @@ use celestia_proto::p2p::pb::HeaderRequest; -use celestia_types::ExtendedHeader; use tokio::sync::{mpsc, oneshot}; use tracing::debug; @@ -7,6 +6,7 @@ use crate::block_ranges::{BlockRange, BlockRangeExt}; use crate::executor::spawn; use crate::p2p::header_ex::utils::HeaderRequestExt; use crate::p2p::{P2pCmd, P2pError}; +use crate::store::utils::ValidatedExtendedHeaders; pub(crate) const MIN_AMOUNT_PER_REQ: u64 = 8; pub(crate) const MAX_AMOUNT_PER_REQ: u64 = 64; @@ -17,8 +17,8 @@ type Result = std::result::Result; pub(crate) struct HeaderSession { to_fetch: Option, cmd_tx: mpsc::Sender, - response_tx: mpsc::Sender<(u64, u64, Result>)>, - response_rx: mpsc::Receiver<(u64, u64, Result>)>, + response_tx: mpsc::Sender<(u64, u64, Result)>, + response_rx: mpsc::Receiver<(u64, u64, Result)>, ongoing: usize, batch_size: u64, } @@ -49,7 +49,7 @@ impl HeaderSession { } } - pub(crate) async fn run(&mut self) -> Result> { + pub(crate) async fn run(&mut self) -> Result { let mut responses = Vec::new(); for _ in 0..MAX_CONCURRENT_REQS { @@ -61,6 +61,7 @@ impl HeaderSession { match res { Ok(headers) => { + let headers = headers.into_validated_vec(); let headers_len = headers.len() as u64; if headers_len > 0 { @@ -97,7 +98,7 @@ impl HeaderSession { Ok(responses.into_iter().flatten().collect()) } - async fn recv_response(&mut self) -> (u64, u64, Result>) { + async fn recv_response(&mut self) -> (u64, u64, Result) { let (height, requested_amount, res) = self.response_rx.recv().await.expect("channel never closes"); diff --git a/node/src/store.rs b/node/src/store.rs index 17aa6dfd0..6dbff7a65 100644 --- a/node/src/store.rs +++ b/node/src/store.rs @@ -16,7 +16,8 @@ use thiserror::Error; pub use crate::block_ranges::{BlockRange, BlockRanges, BlockRangesError}; pub use crate::store::utils::{ - IntoVerifiedExtendedHeaders, ValidatedExtendedHeaders, VerifiedExtendedHeaders, + IntoVerifiedExtendedHeaders, ValidatedExtendedHeader, ValidatedExtendedHeaders, + VerifiedExtendedHeaders, }; pub use in_memory_store::InMemoryStore; diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index 894f073e8..d1a7e303a 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -1,3 +1,4 @@ +use std::fmt::{self, Display}; use std::future::Future; use std::mem; use std::ops::{Deref, RangeInclusive}; @@ -146,6 +147,27 @@ impl VerifiedExtendedHeaders { pub fn into_inner(self) -> Vec { self.0 } + + pub fn into_validated_vec(self) -> Vec { + // SAFETY: It is safe to transmute because of `repr(transparent)`. + unsafe { mem::transmute(self.0) } + } +} + +impl Display for ValidatedExtendedHeader { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + Display::fmt(&self.0, f) + } +} + +impl FromIterator for ValidatedExtendedHeaders { + fn from_iter(iter: T) -> Self + where + T: IntoIterator, + { + let headers = iter.into_iter().map(|h| h.0).collect(); + ValidatedExtendedHeaders(headers) + } } impl IntoIterator for ValidatedExtendedHeaders { @@ -314,6 +336,27 @@ impl IntoVerifiedExtendedHeaders for VerifiedExtendedHeaders { } } +impl IntoVerifiedExtendedHeaders for ValidatedExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + // A single validated header has no neighbors so there nothing to verify. + Ok(VerifiedExtendedHeaders(vec![self.0])) + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a ValidatedExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + // A single validated header has no neighbors so there nothing to verify. + Ok(VerifiedExtendedHeaders(vec![self.0.to_owned()])) + } +} + +impl<'a> IntoVerifiedExtendedHeaders for &'a mut ValidatedExtendedHeader { + async fn into_verified(self) -> celestia_types::Result { + // A single validated header has no neighbors so there nothing to verify. + Ok(VerifiedExtendedHeaders(vec![self.0.to_owned()])) + } +} + impl IntoVerifiedExtendedHeaders for ValidatedExtendedHeaders { async fn into_verified(self) -> celestia_types::Result { VerifiedExtendedHeaders::from_validated(self) diff --git a/node/src/syncer.rs b/node/src/syncer.rs index 3fefc6efb..69a0e4088 100644 --- a/node/src/syncer.rs +++ b/node/src/syncer.rs @@ -30,7 +30,7 @@ use crate::events::{EventPublisher, NodeEvent}; use crate::executor::{sleep, spawn, spawn_cancellable, Interval}; use crate::p2p::{P2p, P2pError}; use crate::store::utils::calculate_range_to_fetch; -use crate::store::{Store, StoreError, ValidatedExtendedHeaders}; +use crate::store::{Store, StoreError, ValidatedExtendedHeader, ValidatedExtendedHeaders}; use crate::utils::OneshotSenderExt; type Result = std::result::Result; @@ -178,7 +178,7 @@ where event_pub: EventPublisher, p2p: Arc, store: Arc, - header_sub_watcher: watch::Receiver>, + header_sub_watcher: watch::Receiver>, subjective_head_height: Option, batch_size: u64, ongoing_batch: Option, @@ -415,9 +415,6 @@ where if let Ok(store_head_height) = self.store.head_height().await { // If our new header is adjacent to the HEAD of the store if store_head_height + 1 == new_head_height { - // HeaderSub already validated the header. - let new_head = unsafe { ValidatedExtendedHeaders::new_unchecked(vec![new_head]) }; - if self.store.insert(new_head).await.is_ok() { self.event_pub.send(NodeEvent::AddedHeaderFromHeaderSub { height: new_head_height, @@ -440,7 +437,7 @@ where #[instrument(skip_all)] async fn fetch_next_batch( &mut self, - headers_tx: &mpsc::Sender<(Result, P2pError>, Duration)>, + headers_tx: &mpsc::Sender<(Result, Duration)>, ) { if self.ongoing_batch.is_some() { // Another batch is ongoing. We do not parallelize `Syncer` @@ -515,7 +512,7 @@ where #[instrument(skip_all)] async fn on_fetch_next_batch_result( &mut self, - res: Result, P2pError>, + res: Result, took: Duration, ) { let Some(ongoing) = self.ongoing_batch.take() else { @@ -539,9 +536,6 @@ where } }; - // HeaderEx already validated the headers (but not verified them). - let headers = unsafe { ValidatedExtendedHeaders::new_unchecked(headers) }; - if let Err(e) = self.store.insert(headers).await { self.event_pub.send(NodeEvent::FetchingHeadersFailed { from_height, diff --git a/types/src/extended_header.rs b/types/src/extended_header.rs index 7ddfa168e..aa18351de 100644 --- a/types/src/extended_header.rs +++ b/types/src/extended_header.rs @@ -264,6 +264,18 @@ impl ExtendedHeader { Ok(()) } + pub fn verify_adjacent(&self, untrusted: &ExtendedHeader) -> Result<()> { + if self.height().increment() != untrusted.height() { + bail_verification!( + "untrusted header height ({}) not adjacent to the current trusted ({})", + untrusted.height(), + self.height(), + ); + } + + self.verify(untrusted) + } + /// Verify a chain of adjacent untrusted headers. /// /// # Note From 7dca5bf6813df0a43b68db95caaa8647947c64b1 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 12 Jul 2024 12:01:14 +0300 Subject: [PATCH 6/7] wip --- node/src/store/utils.rs | 6 +++--- node/src/test_utils.rs | 36 ++++++++++++++++++++++++++---------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index d1a7e303a..291c12145 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -61,13 +61,13 @@ fn get_most_recent_missing_range( } #[repr(transparent)] -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct ValidatedExtendedHeader(ExtendedHeader); -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct ValidatedExtendedHeaders(Vec); -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct VerifiedExtendedHeaders(Vec); pub trait IntoVerifiedExtendedHeaders: Send { diff --git a/node/src/test_utils.rs b/node/src/test_utils.rs index cea31cc85..98865c611 100644 --- a/node/src/test_utils.rs +++ b/node/src/test_utils.rs @@ -16,7 +16,9 @@ use crate::{ node::NodeConfig, p2p::{P2pCmd, P2pError}, peer_tracker::PeerTrackerInfo, - store::{InMemoryStore, ValidExtendedHeadersChain}, + store::{ + InMemoryStore, ValidatedExtendedHeader, ValidatedExtendedHeaders, VerifiedExtendedHeaders, + }, utils::OneshotResultSender, }; @@ -30,12 +32,19 @@ pub(crate) use wasm_bindgen_test::wasm_bindgen_test as async_test; /// Extends test header generator for easier insertion into the store pub trait ExtendedHeaderGeneratorExt { /// Generate next amount verified headers - fn next_many_verified(&mut self, amount: u64) -> ValidExtendedHeadersChain; + fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders; + + /// Generate next amount validated headers + fn next_many_validated(&mut self, amount: u64) -> ValidatedExtendedHeaders; } impl ExtendedHeaderGeneratorExt for ExtendedHeaderGenerator { - fn next_many_verified(&mut self, amount: u64) -> ValidExtendedHeadersChain { - unsafe { ValidExtendedHeadersChain::new_unchecked(self.next_many(amount)) } + fn next_many_verified(&mut self, amount: u64) -> VerifiedExtendedHeaders { + unsafe { VerifiedExtendedHeaders::new_unchecked(self.next_many(amount)) } + } + + fn next_many_validated(&mut self, amount: u64) -> ValidatedExtendedHeaders { + unsafe { ValidatedExtendedHeaders::new_unchecked(self.next_many(amount)) } } } @@ -92,7 +101,7 @@ pub struct MockP2pHandle { #[allow(dead_code)] pub(crate) cmd_tx: mpsc::Sender, pub(crate) cmd_rx: mpsc::Receiver, - pub(crate) header_sub_tx: watch::Sender>, + pub(crate) header_sub_tx: watch::Sender>, pub(crate) peer_tracker_tx: watch::Sender, } @@ -121,7 +130,7 @@ impl MockP2pHandle { } /// Simulate a new header announced in the network. - pub fn announce_new_head(&self, header: ExtendedHeader) { + pub fn announce_new_head(&self, header: ValidatedExtendedHeader) { self.header_sub_tx.send_replace(Some(header)); } @@ -158,7 +167,7 @@ impl MockP2pHandle { &mut self, ) -> ( HeaderRequest, - OneshotResultSender, P2pError>, + OneshotResultSender, ) { match self.expect_cmd().await { P2pCmd::HeaderExRequest { @@ -174,7 +183,11 @@ impl MockP2pHandle { /// [`P2p`]: crate::p2p::P2p pub async fn expect_header_request_for_height_cmd( &mut self, - ) -> (u64, u64, OneshotResultSender, P2pError>) { + ) -> ( + u64, + u64, + OneshotResultSender, + ) { let (req, respond_to) = self.expect_header_request_cmd().await; match req.data { @@ -188,7 +201,10 @@ impl MockP2pHandle { /// [`P2p`]: crate::p2p::P2p pub async fn expect_header_request_for_hash_cmd( &mut self, - ) -> (Hash, OneshotResultSender, P2pError>) { + ) -> ( + Hash, + OneshotResultSender, + ) { let (req, respond_to) = self.expect_header_request_cmd().await; match req.data { @@ -204,7 +220,7 @@ impl MockP2pHandle { /// Assert that a header-sub initialization command was sent to the [`P2p`] worker. /// /// [`P2p`]: crate::p2p::P2p - pub async fn expect_init_header_sub(&mut self) -> ExtendedHeader { + pub async fn expect_init_header_sub(&mut self) -> ValidatedExtendedHeader { match self.expect_cmd().await { P2pCmd::InitHeaderSub { head } => *head, cmd => panic!("Expecting InitHeaderSub, but received: {cmd:?}"), From 5dce5fda472a9fb2acfea550beedd9447c2f4730 Mon Sep 17 00:00:00 2001 From: Yiannis Marangos Date: Fri, 12 Jul 2024 12:04:01 +0300 Subject: [PATCH 7/7] remove commented code --- node/src/store/utils.rs | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/node/src/store/utils.rs b/node/src/store/utils.rs index 291c12145..7c8ec2966 100644 --- a/node/src/store/utils.rs +++ b/node/src/store/utils.rs @@ -304,32 +304,6 @@ impl<'a, const N: usize> From<&'a mut [ValidatedExtendedHeader; N]> for Validate } } -/* -impl IntoIterator for ValidExtendedHeadersChain { - type Item = &ExtendedHeader; - type IntoIter<'a> = std::slice::Iter<'a, ExtendedHeader>; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl IntoIterator for VerifiedExtendedHeaders { - type Item = ExtendedHeader; - type IntoIter = std::vec::IntoIter; - - fn into_iter(self) -> Self::IntoIter { - self.0.into_iter() - } -} - -impl AsRef<[ExtendedHeader]> for ValidExtendedHeadersChain { - fn as_ref(&self) -> &[ExtendedHeader] { - unsafe { mem::transmute(self.0.as_ref()) } - } -} -*/ - impl IntoVerifiedExtendedHeaders for VerifiedExtendedHeaders { async fn into_verified(self) -> celestia_types::Result { Ok(self)