diff --git a/Cargo.lock b/Cargo.lock index ab42a8d77..b8ff98545 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1589,6 +1589,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "signature" version = "2.2.0" @@ -1861,7 +1870,9 @@ dependencies = [ "libc", "mio", "num_cpus", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.48.0", @@ -2031,6 +2042,7 @@ dependencies = [ "castaway", "everscale-types", "futures-util", + "tokio", "tracing", "tycho-network", "tycho-storage", diff --git a/Cargo.toml b/Cargo.toml index bd2ea64f2..2c71a97f5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,14 @@ [workspace] +members = [ + "consensus", + "core", + "network", + "simulator", + "storage", + "util", + "collator", +] resolver = "2" -members = ["consensus", "core", "network", "simulator", "storage", "util", "collator"] [profile.release] debug = true @@ -73,8 +81,8 @@ rest_pat_in_fully_bound_structs = "warn" same_functions_in_if_condition = "warn" semicolon_if_nothing_returned = "warn" single_match_else = "warn" -string_add_assign = "warn" string_add = "warn" +string_add_assign = "warn" string_lit_as_bytes = "warn" string_to_string = "warn" todo = "warn" diff --git a/core/Cargo.toml b/core/Cargo.toml index f6a6fc6a0..f5f49b53e 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -1,8 +1,8 @@ [package] +description = "Basic functionality of peer." +edition = "2021" name = "tycho-core" version = "0.0.1" -edition = "2021" -description = "Basic functionality of peer." [dependencies] anyhow = "1.0.80" @@ -11,6 +11,7 @@ castaway = "0.2" everscale-types = "0.1.0-rc.6" futures-util = "0.3.30" tracing = "0.1.40" +tokio = { version = "1.36.0", features = ["full"] } # local deps tycho-network = { path = "../network", version = "=0.0.1" } diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index d82565432..e2f60d022 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -1,9 +1,13 @@ -use std::future::Future; -use std::sync::atomic::{AtomicBool, Ordering}; - use anyhow::Result; use everscale_types::models::{Block, BlockId}; -use futures_util::future::BoxFuture; + +pub mod provider; +pub mod state; +pub mod subscriber; + +use provider::BlockProvider; +use state::BlockStriderState; +use subscriber::BlockSubscriber; pub struct BlockStriderBuilder(BlockStrider); @@ -140,62 +144,6 @@ where } } -pub trait BlockStriderState: Send + Sync + 'static { - fn load_last_traversed_master_block_id(&self) -> BlockId; - fn is_traversed(&self, block_id: &BlockId) -> bool; - fn commit_traversed(&self, block_id: BlockId); -} - -impl BlockStriderState for Box { - fn load_last_traversed_master_block_id(&self) -> BlockId { - ::load_last_traversed_master_block_id(self) - } - - fn is_traversed(&self, block_id: &BlockId) -> bool { - ::is_traversed(self, block_id) - } - - fn commit_traversed(&self, block_id: BlockId) { - ::commit_traversed(self, block_id); - } -} - -/// Block provider *MUST* validate the block before returning it. -pub trait BlockProvider: Send + Sync + 'static { - type GetNextBlockFut<'a>: Future>> + Send + 'a; - type GetBlockFut<'a>: Future>> + Send + 'a; - - fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>; - fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>; -} - -impl BlockProvider for Box { - type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>; - type GetBlockFut<'a> = T::GetBlockFut<'a>; - - fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { - ::get_next_block(self, prev_block_id) - } - - fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { - ::get_block(self, block_id) - } -} - -pub trait BlockSubscriber: Send + Sync + 'static { - type HandleBlockFut: Future> + Send + 'static; - - fn handle_block(&self, block: &Block) -> Self::HandleBlockFut; -} - -impl BlockSubscriber for Box { - type HandleBlockFut = T::HandleBlockFut; - - fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { - ::handle_block(self, block) - } -} - fn get_shard_hashes(_block: &Block) -> impl IntoIterator { vec![].into_iter() } @@ -203,38 +151,3 @@ fn get_shard_hashes(_block: &Block) -> impl IntoIterator { fn get_block_id(_block: &Block) -> BlockId { unimplemented!() } - -// === Provider combinators === -struct ChainBlockProvider { - left: T1, - right: T2, - is_right: AtomicBool, -} - -impl BlockProvider for ChainBlockProvider { - type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; - type GetBlockFut<'a> = BoxFuture<'a, Option>>; - - fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { - Box::pin(async move { - if !self.is_right.load(Ordering::Acquire) { - let res = self.left.get_next_block(prev_block_id).await; - if res.is_some() { - return res; - } - self.is_right.store(true, Ordering::Release); - } - self.right.get_next_block(prev_block_id).await - }) - } - - fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { - Box::pin(async { - let res = self.left.get_block(block_id).await; - if res.is_some() { - return res; - } - self.right.get_block(block_id).await - }) - } -} diff --git a/core/src/block_strider/provider.rs b/core/src/block_strider/provider.rs new file mode 100644 index 000000000..edda7d7d5 --- /dev/null +++ b/core/src/block_strider/provider.rs @@ -0,0 +1,162 @@ +use everscale_types::models::{Block, BlockId}; +use futures_util::future::BoxFuture; +use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +/// Block provider *MUST* validate the block before returning it. +pub trait BlockProvider: Send + Sync + 'static { + type GetNextBlockFut<'a>: Future>> + Send + 'a; + type GetBlockFut<'a>: Future>> + Send + 'a; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>; + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>; +} + +impl BlockProvider for Box { + type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>; + type GetBlockFut<'a> = T::GetBlockFut<'a>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + ::get_next_block(self, prev_block_id) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + ::get_block(self, block_id) + } +} + +impl BlockProvider for Arc { + type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>; + type GetBlockFut<'a> = T::GetBlockFut<'a>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + ::get_next_block(self, prev_block_id) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + ::get_block(self, block_id) + } +} + +// === Provider combinators === +struct ChainBlockProvider { + left: T1, + right: T2, + is_right: AtomicBool, +} + +impl BlockProvider for ChainBlockProvider { + type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; + type GetBlockFut<'a> = BoxFuture<'a, Option>>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + Box::pin(async move { + if !self.is_right.load(Ordering::Acquire) { + let res = self.left.get_next_block(prev_block_id).await; + if res.is_some() { + return res; + } + self.is_right.store(true, Ordering::Release); + } + self.right.get_next_block(prev_block_id).await + }) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + Box::pin(async { + let res = self.left.get_block(block_id).await; + if res.is_some() { + return res; + } + self.right.get_block(block_id).await + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + struct MockBlockProvider { + // let's give it some state, pretending it's useful + has_block: AtomicBool, + } + + impl BlockProvider for MockBlockProvider { + type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; + type GetBlockFut<'a> = BoxFuture<'a, Option>>; + + fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + Box::pin(async { + if self.has_block.load(Ordering::Acquire) { + Some(Ok(get_empty_block())) + } else { + None + } + }) + } + + fn get_block<'a>(&'a self, _block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + Box::pin(async { + if self.has_block.load(Ordering::Acquire) { + Some(Ok(get_empty_block())) + } else { + None + } + }) + } + } + + #[tokio::test] + async fn chain_block_provider_switches_providers_correctly() { + let left_provider = Arc::new(MockBlockProvider { + has_block: AtomicBool::new(true), + }); + let right_provider = Arc::new(MockBlockProvider { + has_block: AtomicBool::new(false), + }); + + let chain_provider = ChainBlockProvider { + left: Arc::clone(&left_provider), + right: Arc::clone(&right_provider), + is_right: AtomicBool::new(false), + }; + + chain_provider + .get_next_block(&get_default_block_id()) + .await + .unwrap() + .unwrap(); + + // Now let's pretend the left provider ran out of blocks. + left_provider.has_block.store(false, Ordering::Release); + right_provider.has_block.store(true, Ordering::Release); + + chain_provider + .get_next_block(&get_default_block_id()) + .await + .unwrap() + .unwrap(); + + // End of blocks stream for both providers + left_provider.has_block.store(false, Ordering::Release); + right_provider.has_block.store(false, Ordering::Release); + + assert!(chain_provider + .get_next_block(&get_default_block_id()) + .await + .is_none()); + } + + fn get_empty_block() -> Block { + let block = ""; + everscale_types::boc::BocRepr::decode_base64(block).unwrap() + } + + fn get_default_block_id() -> BlockId { + BlockId::default() + } +} diff --git a/core/src/block_strider/state.rs b/core/src/block_strider/state.rs new file mode 100644 index 000000000..f6eef8363 --- /dev/null +++ b/core/src/block_strider/state.rs @@ -0,0 +1,21 @@ +use everscale_types::models::BlockId; + +pub trait BlockStriderState: Send + Sync + 'static { + fn load_last_traversed_master_block_id(&self) -> BlockId; + fn is_traversed(&self, block_id: &BlockId) -> bool; + fn commit_traversed(&self, block_id: BlockId); +} + +impl BlockStriderState for Box { + fn load_last_traversed_master_block_id(&self) -> BlockId { + ::load_last_traversed_master_block_id(self) + } + + fn is_traversed(&self, block_id: &BlockId) -> bool { + ::is_traversed(self, block_id) + } + + fn commit_traversed(&self, block_id: BlockId) { + ::commit_traversed(self, block_id); + } +} diff --git a/core/src/block_strider/subscriber.rs b/core/src/block_strider/subscriber.rs new file mode 100644 index 000000000..88bfa3f8b --- /dev/null +++ b/core/src/block_strider/subscriber.rs @@ -0,0 +1,36 @@ +use everscale_types::models::Block; +use futures_util::future; +use std::future::Future; + +pub trait BlockSubscriber: Send + Sync + 'static { + type HandleBlockFut: Future> + Send + 'static; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut; +} + +impl BlockSubscriber for Box { + type HandleBlockFut = T::HandleBlockFut; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { + ::handle_block(self, block) + } +} + +pub struct FanoutBlockSubscriber { + pub left: T1, + pub right: T2, +} + +impl BlockSubscriber for FanoutBlockSubscriber { + type HandleBlockFut = future::BoxFuture<'static, anyhow::Result<()>>; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { + let left = self.left.handle_block(block); + let right = self.right.handle_block(block); + + Box::pin(async move { + let (l, r) = future::join(left, right).await; + l.and(r) + }) + } +}