From 3d588d7f621abb95d3a84d5df804a415d1411d96 Mon Sep 17 00:00:00 2001 From: Vladimir Petrzhikovskii Date: Thu, 22 Feb 2024 18:33:14 +0100 Subject: [PATCH 1/8] feat(core): add block strider --- Cargo.lock | 16 ++++ core/Cargo.toml | 7 +- core/src/block_strider/mod.rs | 136 ++++++++++++++++++++++++++++++++++ core/src/lib.rs | 2 +- 4 files changed, 159 insertions(+), 2 deletions(-) create mode 100644 core/src/block_strider/mod.rs diff --git a/Cargo.lock b/Cargo.lock index c223b1954..b65eee847 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -138,6 +138,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-trait" +version = "0.1.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.50", +] + [[package]] name = "autocfg" version = "1.2.0" @@ -2165,6 +2176,11 @@ dependencies = [ name = "tycho-core" version = "0.0.1" dependencies = [ + "anyhow", + "async-trait", + "everscale-types", + "futures-util", + "tracing", "tycho-network", "tycho-storage", "tycho-util", diff --git a/core/Cargo.toml b/core/Cargo.toml index 189a5aad6..695dcff88 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -9,7 +9,12 @@ repository.workspace = true license.workspace = true [dependencies] -# crates.io deps +async-trait = "0.1.77" +anyhow = "1.0.80" +everscale-types = "0.1.0-rc.6" +tracing = "0.1.40" +futures-util = "0.3.30" + # local deps tycho-network = { workspace = true } diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs new file mode 100644 index 000000000..a53a1e357 --- /dev/null +++ b/core/src/block_strider/mod.rs @@ -0,0 +1,136 @@ +use async_trait::async_trait; + +use everscale_types::models::{Block, BlockId}; +use futures_util::stream::FuturesUnordered; +use futures_util::StreamExt; +use std::future::Future; + +#[async_trait] +pub trait OnStriderStep { + async fn handle_block(&self, block: &Block) -> anyhow::Result<()>; +} + +/// Block provider *MUST* validate the block before returning it. +pub trait BlockProvider { + type GetNextBlockFut: Future; + type GetBlockFut: Future; + + fn get_next_block(&self, prev_block_id: &BlockId) -> Self::GetNextBlockFut; + fn get_block(&self, block_id: &BlockId) -> Self::GetBlockFut; + + fn status(&self) -> ProviderStatus; +} + +pub trait PersistenceProvider { + fn load_last_traversed_master_block_seqno(&self) -> BlockId; + fn commit_last_traversed_master_block_seqno(&self, block_id: BlockId); + + fn shard_block_traversed(&self, block_id: &BlockId) -> bool; + fn commit_shard_block_traversed(&self, block_id: BlockId); + + // drop all shard blocks in the same shard with seqno < block_id + fn gc_shard_blocks(&self, block_id: &BlockId); +} + +pub enum ProviderResult { + Error(anyhow::Error), + NotFound, // or should provider backoff until the block is found? + Found(Block), +} + +#[derive(Debug, PartialEq)] +pub enum ProviderStatus { + Ready, + NotReady, +} + +pub struct StriderBuilder { + subscribers: Vec>, + persistence_provider: Provider, // or it also should be a vec? +} + +impl StriderBuilder +where + Provider: PersistenceProvider, +{ + pub fn new(persistence_provider: Provider) -> Self { + Self { + subscribers: Vec::new(), + persistence_provider, + } + } + + pub fn add_subscriber(&mut self, subscriber: Box) { + self.subscribers.push(subscriber); + } + + // this function gurarantees at least once delivery + pub async fn start(self, block_provider: impl BlockProvider) { + loop { + let master_block = self.fetch_next_master_block(&block_provider).await; + let master_id = get_block_id(&master_block); + let shard_hashes = get_shard_hashes(&master_block); + for hash in shard_hashes { + if !self.persistence_provider.shard_block_traversed(&hash) { + let block = self.fetch_block(&hash, &block_provider).await; + let mut subscribers: FuturesUnordered<_> = self + .subscribers + .iter() + .map(|subscriber| subscriber.handle_block(&block)) + .collect(); + // wait for all subscribers to finish + while subscribers.next().await.is_some() {} + self.persistence_provider.commit_shard_block_traversed(hash); + } + } + self.persistence_provider + .commit_last_traversed_master_block_seqno(master_id); + } + } + + async fn fetch_next_master_block(&self, block_provider: &impl BlockProvider) -> Block { + let last_traversed_master_block = self + .persistence_provider + .load_last_traversed_master_block_seqno(); + loop { + match block_provider + .get_next_block(&last_traversed_master_block) + .await + { + ProviderResult::Error(e) => { + tracing::error!( + ?last_traversed_master_block, + "error while fetching master block: {:?}", + e + ); + } + ProviderResult::NotFound => { + tracing::info!(?last_traversed_master_block, "master block not found"); + } + ProviderResult::Found(block) => break block, + } + } + } + + async fn fetch_block(&self, id: &BlockId, block_provider: &impl BlockProvider) -> Block { + loop { + match block_provider.get_block(id).await { + ProviderResult::Error(e) => { + tracing::error!("error while fetching block: {:?}", e); + } + ProviderResult::NotFound => { + tracing::info!(?id, "no block found"); + } + ProviderResult::Found(block) => break block, + } + } + } +} + +fn get_shard_hashes(_block: &Block) -> impl IntoIterator { + vec![].into_iter() +} + +fn get_block_id(_block: &Block) -> BlockId { + unimplemented!() +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 8b1378917..071271136 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1 +1 @@ - +pub mod block_strider; From b01d70f9c9bf684811809c0cc3b7997b743ba549 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Mon, 26 Feb 2024 19:13:32 +0100 Subject: [PATCH 2/8] refactor(core): change block strider interface --- Cargo.lock | 1 + core/Cargo.toml | 6 +- core/src/block_strider/mod.rs | 274 +++++++++++++++++++++++----------- 3 files changed, 193 insertions(+), 88 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b65eee847..dab8a8073 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2178,6 +2178,7 @@ version = "0.0.1" dependencies = [ "anyhow", "async-trait", + "castaway", "everscale-types", "futures-util", "tracing", diff --git a/core/Cargo.toml b/core/Cargo.toml index 695dcff88..6935686e2 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -9,12 +9,12 @@ repository.workspace = true license.workspace = true [dependencies] -async-trait = "0.1.77" anyhow = "1.0.80" +async-trait = "0.1.77" +castaway = "0.2" everscale-types = "0.1.0-rc.6" -tracing = "0.1.40" futures-util = "0.3.30" - +tracing = "0.1.40" # local deps tycho-network = { workspace = true } diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index a53a1e357..d82565432 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -1,132 +1,201 @@ -use async_trait::async_trait; - -use everscale_types::models::{Block, BlockId}; -use futures_util::stream::FuturesUnordered; -use futures_util::StreamExt; use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; -#[async_trait] -pub trait OnStriderStep { - async fn handle_block(&self, block: &Block) -> anyhow::Result<()>; -} - -/// Block provider *MUST* validate the block before returning it. -pub trait BlockProvider { - type GetNextBlockFut: Future; - type GetBlockFut: Future; +use anyhow::Result; +use everscale_types::models::{Block, BlockId}; +use futures_util::future::BoxFuture; - fn get_next_block(&self, prev_block_id: &BlockId) -> Self::GetNextBlockFut; - fn get_block(&self, block_id: &BlockId) -> Self::GetBlockFut; +pub struct BlockStriderBuilder(BlockStrider); - fn status(&self) -> ProviderStatus; +impl BlockStriderBuilder<(), T2, T3> { + pub fn with_state(self, state: S) -> BlockStriderBuilder { + BlockStriderBuilder(BlockStrider { + state, + provider: self.0.provider, + subscriber: self.0.subscriber, + }) + } } -pub trait PersistenceProvider { - fn load_last_traversed_master_block_seqno(&self) -> BlockId; - fn commit_last_traversed_master_block_seqno(&self, block_id: BlockId); - - fn shard_block_traversed(&self, block_id: &BlockId) -> bool; - fn commit_shard_block_traversed(&self, block_id: BlockId); +impl BlockStriderBuilder { + pub fn with_provider(self, provider: P) -> BlockStriderBuilder { + BlockStriderBuilder(BlockStrider { + state: self.0.state, + provider, + subscriber: self.0.subscriber, + }) + } +} - // drop all shard blocks in the same shard with seqno < block_id - fn gc_shard_blocks(&self, block_id: &BlockId); +impl BlockStriderBuilder { + pub fn with_subscriber( + self, + subscriber: B, + ) -> BlockStriderBuilder { + BlockStriderBuilder(BlockStrider { + state: self.0.state, + provider: self.0.provider, + subscriber, + }) + } } -pub enum ProviderResult { - Error(anyhow::Error), - NotFound, // or should provider backoff until the block is found? - Found(Block), +impl BlockStriderBuilder +where + S: BlockStriderState, + P: BlockProvider, + B: BlockSubscriber, +{ + pub fn build(self) -> BlockStrider { + self.0 + } } -#[derive(Debug, PartialEq)] -pub enum ProviderStatus { - Ready, - NotReady, +pub struct BlockStrider { + state: S, + provider: P, + subscriber: B, } -pub struct StriderBuilder { - subscribers: Vec>, - persistence_provider: Provider, // or it also should be a vec? +impl BlockStrider<(), (), ()> { + pub fn builder() -> BlockStriderBuilder<(), (), ()> { + BlockStriderBuilder(BlockStrider { + state: (), + provider: (), + subscriber: (), + }) + } } -impl StriderBuilder +impl BlockStrider where - Provider: PersistenceProvider, + S: BlockStriderState, + P: BlockProvider, + B: BlockSubscriber, { - pub fn new(persistence_provider: Provider) -> Self { - Self { - subscribers: Vec::new(), - persistence_provider, - } - } - - pub fn add_subscriber(&mut self, subscriber: Box) { - self.subscribers.push(subscriber); - } - - // this function gurarantees at least once delivery - pub async fn start(self, block_provider: impl BlockProvider) { - loop { - let master_block = self.fetch_next_master_block(&block_provider).await; + /// Walks through blocks and handles them. + /// + /// Stops either when the provider is exhausted or it can't provide a requested block. + pub async fn run(self) -> Result<()> { + tracing::info!("block strider loop started"); + + while let Some(master_block) = self.fetch_next_master_block().await { + // TODO: replace with block stuff let master_id = get_block_id(&master_block); let shard_hashes = get_shard_hashes(&master_block); + for hash in shard_hashes { - if !self.persistence_provider.shard_block_traversed(&hash) { - let block = self.fetch_block(&hash, &block_provider).await; - let mut subscribers: FuturesUnordered<_> = self - .subscribers - .iter() - .map(|subscriber| subscriber.handle_block(&block)) - .collect(); - // wait for all subscribers to finish - while subscribers.next().await.is_some() {} - self.persistence_provider.commit_shard_block_traversed(hash); + if !self.state.is_traversed(&hash) { + let block = self.fetch_block(&hash).await?; + + if let Err(e) = self.subscriber.handle_block(&block).await { + tracing::error!("error while handling block: {e:?}"); + // TODO: retry + backoff? + } + + self.state.commit_traversed(hash); } } - self.persistence_provider - .commit_last_traversed_master_block_seqno(master_id); + + self.state.commit_traversed(master_id); } + + tracing::info!("block strider loop finished"); + Ok(()) } - async fn fetch_next_master_block(&self, block_provider: &impl BlockProvider) -> Block { - let last_traversed_master_block = self - .persistence_provider - .load_last_traversed_master_block_seqno(); + async fn fetch_next_master_block(&self) -> Option { + let last_traversed_master_block = self.state.load_last_traversed_master_block_id(); loop { - match block_provider + match self + .provider .get_next_block(&last_traversed_master_block) - .await + .await? { - ProviderResult::Error(e) => { + Ok(block) => break Some(block), + Err(e) => { tracing::error!( ?last_traversed_master_block, - "error while fetching master block: {:?}", - e + "error while fetching master block: {e:?}", ); + // TODO: backoff } - ProviderResult::NotFound => { - tracing::info!(?last_traversed_master_block, "master block not found"); - } - ProviderResult::Found(block) => break block, } } } - async fn fetch_block(&self, id: &BlockId, block_provider: &impl BlockProvider) -> Block { + async fn fetch_block(&self, block_id: &BlockId) -> Result { loop { - match block_provider.get_block(id).await { - ProviderResult::Error(e) => { - tracing::error!("error while fetching block: {:?}", e); + match self.provider.get_block(block_id).await { + Some(Ok(block)) => break Ok(block), + Some(Err(e)) => { + tracing::error!("error while fetching block: {e:?}"); + // TODO: backoff } - ProviderResult::NotFound => { - tracing::info!(?id, "no block found"); + None => { + anyhow::bail!("block not found: {block_id}") } - ProviderResult::Found(block) => break block, } } } } +pub trait BlockStriderState: Send + Sync + 'static { + fn load_last_traversed_master_block_id(&self) -> BlockId; + fn is_traversed(&self, block_id: &BlockId) -> bool; + fn commit_traversed(&self, block_id: BlockId); +} + +impl BlockStriderState for Box { + fn load_last_traversed_master_block_id(&self) -> BlockId { + ::load_last_traversed_master_block_id(self) + } + + fn is_traversed(&self, block_id: &BlockId) -> bool { + ::is_traversed(self, block_id) + } + + fn commit_traversed(&self, block_id: BlockId) { + ::commit_traversed(self, block_id); + } +} + +/// Block provider *MUST* validate the block before returning it. +pub trait BlockProvider: Send + Sync + 'static { + type GetNextBlockFut<'a>: Future>> + Send + 'a; + type GetBlockFut<'a>: Future>> + Send + 'a; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>; + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>; +} + +impl BlockProvider for Box { + type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>; + type GetBlockFut<'a> = T::GetBlockFut<'a>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + ::get_next_block(self, prev_block_id) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + ::get_block(self, block_id) + } +} + +pub trait BlockSubscriber: Send + Sync + 'static { + type HandleBlockFut: Future> + Send + 'static; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut; +} + +impl BlockSubscriber for Box { + type HandleBlockFut = T::HandleBlockFut; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { + ::handle_block(self, block) + } +} + fn get_shard_hashes(_block: &Block) -> impl IntoIterator { vec![].into_iter() } @@ -134,3 +203,38 @@ fn get_shard_hashes(_block: &Block) -> impl IntoIterator { fn get_block_id(_block: &Block) -> BlockId { unimplemented!() } + +// === Provider combinators === +struct ChainBlockProvider { + left: T1, + right: T2, + is_right: AtomicBool, +} + +impl BlockProvider for ChainBlockProvider { + type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; + type GetBlockFut<'a> = BoxFuture<'a, Option>>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + Box::pin(async move { + if !self.is_right.load(Ordering::Acquire) { + let res = self.left.get_next_block(prev_block_id).await; + if res.is_some() { + return res; + } + self.is_right.store(true, Ordering::Release); + } + self.right.get_next_block(prev_block_id).await + }) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + Box::pin(async { + let res = self.left.get_block(block_id).await; + if res.is_some() { + return res; + } + self.right.get_block(block_id).await + }) + } +} From e9ec58714eb28e334e1d624037e392ce94d2e8d1 Mon Sep 17 00:00:00 2001 From: Vladimir Petrzhikovskii Date: Tue, 27 Feb 2024 11:07:47 +0100 Subject: [PATCH 3/8] refactor(core): move block-strider traits to separate files --- Cargo.lock | 3 +- Cargo.toml | 1 + core/Cargo.toml | 13 ++- core/src/block_strider/mod.rs | 103 ++--------------- core/src/block_strider/provider.rs | 162 +++++++++++++++++++++++++++ core/src/block_strider/state.rs | 21 ++++ core/src/block_strider/subscriber.rs | 36 ++++++ 7 files changed, 237 insertions(+), 102 deletions(-) create mode 100644 core/src/block_strider/provider.rs create mode 100644 core/src/block_strider/state.rs create mode 100644 core/src/block_strider/subscriber.rs diff --git a/Cargo.lock b/Cargo.lock index dab8a8073..8fb6170fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -146,7 +146,7 @@ checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" dependencies = [ "proc-macro2", "quote", - "syn 2.0.50", + "syn 2.0.58", ] [[package]] @@ -2181,6 +2181,7 @@ dependencies = [ "castaway", "everscale-types", "futures-util", + "tokio", "tracing", "tycho-network", "tycho-storage", diff --git a/Cargo.toml b/Cargo.toml index 2f05c0161..af5350f16 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,6 +25,7 @@ members = [ ahash = "0.8" anyhow = "1.0.79" arc-swap = "1.6.0" +async-trait = "0.1" base64 = "0.22.0" bincode = "1.3" bumpalo = "3.14.0" diff --git a/core/Cargo.toml b/core/Cargo.toml index 6935686e2..a94c8a4fe 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -9,12 +9,13 @@ repository.workspace = true license.workspace = true [dependencies] -anyhow = "1.0.80" -async-trait = "0.1.77" -castaway = "0.2" -everscale-types = "0.1.0-rc.6" -futures-util = "0.3.30" -tracing = "0.1.40" +anyhow = { workspace = true } +async-trait = { workspace = true } +castaway = { workspace = true } +everscale-types = { workspace = true } +futures-util = { workspace = true } +tracing = { workspace = true } +tokio = { workspace = true, features = ["rt"] } # local deps tycho-network = { workspace = true } diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index d82565432..e2f60d022 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -1,9 +1,13 @@ -use std::future::Future; -use std::sync::atomic::{AtomicBool, Ordering}; - use anyhow::Result; use everscale_types::models::{Block, BlockId}; -use futures_util::future::BoxFuture; + +pub mod provider; +pub mod state; +pub mod subscriber; + +use provider::BlockProvider; +use state::BlockStriderState; +use subscriber::BlockSubscriber; pub struct BlockStriderBuilder(BlockStrider); @@ -140,62 +144,6 @@ where } } -pub trait BlockStriderState: Send + Sync + 'static { - fn load_last_traversed_master_block_id(&self) -> BlockId; - fn is_traversed(&self, block_id: &BlockId) -> bool; - fn commit_traversed(&self, block_id: BlockId); -} - -impl BlockStriderState for Box { - fn load_last_traversed_master_block_id(&self) -> BlockId { - ::load_last_traversed_master_block_id(self) - } - - fn is_traversed(&self, block_id: &BlockId) -> bool { - ::is_traversed(self, block_id) - } - - fn commit_traversed(&self, block_id: BlockId) { - ::commit_traversed(self, block_id); - } -} - -/// Block provider *MUST* validate the block before returning it. -pub trait BlockProvider: Send + Sync + 'static { - type GetNextBlockFut<'a>: Future>> + Send + 'a; - type GetBlockFut<'a>: Future>> + Send + 'a; - - fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>; - fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>; -} - -impl BlockProvider for Box { - type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>; - type GetBlockFut<'a> = T::GetBlockFut<'a>; - - fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { - ::get_next_block(self, prev_block_id) - } - - fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { - ::get_block(self, block_id) - } -} - -pub trait BlockSubscriber: Send + Sync + 'static { - type HandleBlockFut: Future> + Send + 'static; - - fn handle_block(&self, block: &Block) -> Self::HandleBlockFut; -} - -impl BlockSubscriber for Box { - type HandleBlockFut = T::HandleBlockFut; - - fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { - ::handle_block(self, block) - } -} - fn get_shard_hashes(_block: &Block) -> impl IntoIterator { vec![].into_iter() } @@ -203,38 +151,3 @@ fn get_shard_hashes(_block: &Block) -> impl IntoIterator { fn get_block_id(_block: &Block) -> BlockId { unimplemented!() } - -// === Provider combinators === -struct ChainBlockProvider { - left: T1, - right: T2, - is_right: AtomicBool, -} - -impl BlockProvider for ChainBlockProvider { - type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; - type GetBlockFut<'a> = BoxFuture<'a, Option>>; - - fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { - Box::pin(async move { - if !self.is_right.load(Ordering::Acquire) { - let res = self.left.get_next_block(prev_block_id).await; - if res.is_some() { - return res; - } - self.is_right.store(true, Ordering::Release); - } - self.right.get_next_block(prev_block_id).await - }) - } - - fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { - Box::pin(async { - let res = self.left.get_block(block_id).await; - if res.is_some() { - return res; - } - self.right.get_block(block_id).await - }) - } -} diff --git a/core/src/block_strider/provider.rs b/core/src/block_strider/provider.rs new file mode 100644 index 000000000..edda7d7d5 --- /dev/null +++ b/core/src/block_strider/provider.rs @@ -0,0 +1,162 @@ +use everscale_types::models::{Block, BlockId}; +use futures_util::future::BoxFuture; +use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; + +/// Block provider *MUST* validate the block before returning it. +pub trait BlockProvider: Send + Sync + 'static { + type GetNextBlockFut<'a>: Future>> + Send + 'a; + type GetBlockFut<'a>: Future>> + Send + 'a; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>; + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>; +} + +impl BlockProvider for Box { + type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>; + type GetBlockFut<'a> = T::GetBlockFut<'a>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + ::get_next_block(self, prev_block_id) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + ::get_block(self, block_id) + } +} + +impl BlockProvider for Arc { + type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>; + type GetBlockFut<'a> = T::GetBlockFut<'a>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + ::get_next_block(self, prev_block_id) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + ::get_block(self, block_id) + } +} + +// === Provider combinators === +struct ChainBlockProvider { + left: T1, + right: T2, + is_right: AtomicBool, +} + +impl BlockProvider for ChainBlockProvider { + type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; + type GetBlockFut<'a> = BoxFuture<'a, Option>>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + Box::pin(async move { + if !self.is_right.load(Ordering::Acquire) { + let res = self.left.get_next_block(prev_block_id).await; + if res.is_some() { + return res; + } + self.is_right.store(true, Ordering::Release); + } + self.right.get_next_block(prev_block_id).await + }) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + Box::pin(async { + let res = self.left.get_block(block_id).await; + if res.is_some() { + return res; + } + self.right.get_block(block_id).await + }) + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::sync::Arc; + + struct MockBlockProvider { + // let's give it some state, pretending it's useful + has_block: AtomicBool, + } + + impl BlockProvider for MockBlockProvider { + type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; + type GetBlockFut<'a> = BoxFuture<'a, Option>>; + + fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + Box::pin(async { + if self.has_block.load(Ordering::Acquire) { + Some(Ok(get_empty_block())) + } else { + None + } + }) + } + + fn get_block<'a>(&'a self, _block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + Box::pin(async { + if self.has_block.load(Ordering::Acquire) { + Some(Ok(get_empty_block())) + } else { + None + } + }) + } + } + + #[tokio::test] + async fn chain_block_provider_switches_providers_correctly() { + let left_provider = Arc::new(MockBlockProvider { + has_block: AtomicBool::new(true), + }); + let right_provider = Arc::new(MockBlockProvider { + has_block: AtomicBool::new(false), + }); + + let chain_provider = ChainBlockProvider { + left: Arc::clone(&left_provider), + right: Arc::clone(&right_provider), + is_right: AtomicBool::new(false), + }; + + chain_provider + .get_next_block(&get_default_block_id()) + .await + .unwrap() + .unwrap(); + + // Now let's pretend the left provider ran out of blocks. + left_provider.has_block.store(false, Ordering::Release); + right_provider.has_block.store(true, Ordering::Release); + + chain_provider + .get_next_block(&get_default_block_id()) + .await + .unwrap() + .unwrap(); + + // End of blocks stream for both providers + left_provider.has_block.store(false, Ordering::Release); + right_provider.has_block.store(false, Ordering::Release); + + assert!(chain_provider + .get_next_block(&get_default_block_id()) + .await + .is_none()); + } + + fn get_empty_block() -> Block { + let block = ""; + everscale_types::boc::BocRepr::decode_base64(block).unwrap() + } + + fn get_default_block_id() -> BlockId { + BlockId::default() + } +} diff --git a/core/src/block_strider/state.rs b/core/src/block_strider/state.rs new file mode 100644 index 000000000..f6eef8363 --- /dev/null +++ b/core/src/block_strider/state.rs @@ -0,0 +1,21 @@ +use everscale_types::models::BlockId; + +pub trait BlockStriderState: Send + Sync + 'static { + fn load_last_traversed_master_block_id(&self) -> BlockId; + fn is_traversed(&self, block_id: &BlockId) -> bool; + fn commit_traversed(&self, block_id: BlockId); +} + +impl BlockStriderState for Box { + fn load_last_traversed_master_block_id(&self) -> BlockId { + ::load_last_traversed_master_block_id(self) + } + + fn is_traversed(&self, block_id: &BlockId) -> bool { + ::is_traversed(self, block_id) + } + + fn commit_traversed(&self, block_id: BlockId) { + ::commit_traversed(self, block_id); + } +} diff --git a/core/src/block_strider/subscriber.rs b/core/src/block_strider/subscriber.rs new file mode 100644 index 000000000..88bfa3f8b --- /dev/null +++ b/core/src/block_strider/subscriber.rs @@ -0,0 +1,36 @@ +use everscale_types::models::Block; +use futures_util::future; +use std::future::Future; + +pub trait BlockSubscriber: Send + Sync + 'static { + type HandleBlockFut: Future> + Send + 'static; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut; +} + +impl BlockSubscriber for Box { + type HandleBlockFut = T::HandleBlockFut; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { + ::handle_block(self, block) + } +} + +pub struct FanoutBlockSubscriber { + pub left: T1, + pub right: T2, +} + +impl BlockSubscriber for FanoutBlockSubscriber { + type HandleBlockFut = future::BoxFuture<'static, anyhow::Result<()>>; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { + let left = self.left.handle_block(block); + let right = self.right.handle_block(block); + + Box::pin(async move { + let (l, r) = future::join(left, right).await; + l.and(r) + }) + } +} From fd848bffe2a527419763b9962e057e2245138d84 Mon Sep 17 00:00:00 2001 From: Vladimir Petrzhikovskii Date: Tue, 27 Feb 2024 14:08:13 +0100 Subject: [PATCH 4/8] refactor(core): replace mocks with block-utils in block-strider --- Cargo.lock | 5 +- block-util/src/block/block_stuff.rs | 4 + core/Cargo.toml | 3 + core/src/block_strider/mod.rs | 202 ++++++++++++++--- core/src/block_strider/provider.rs | 22 +- core/src/block_strider/state.rs | 22 ++ core/src/block_strider/subscriber.rs | 21 +- core/src/block_strider/test_provider.rs | 279 ++++++++++++++++++++++++ util/Cargo.toml | 4 + util/src/lib.rs | 8 + 10 files changed, 530 insertions(+), 40 deletions(-) create mode 100644 core/src/block_strider/test_provider.rs diff --git a/Cargo.lock b/Cargo.lock index 8fb6170fc..43027e6c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -140,9 +140,9 @@ dependencies = [ [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "a507401cad91ec6a857ed5513a2073c82a9b9048762b885bb98655b306964681" dependencies = [ "proc-macro2", "quote", @@ -2298,6 +2298,7 @@ dependencies = [ "thiserror", "tokio", "tracing", + "tracing-subscriber", ] [[package]] diff --git a/block-util/src/block/block_stuff.rs b/block-util/src/block/block_stuff.rs index 21dfe5191..8fcca5586 100644 --- a/block-util/src/block/block_stuff.rs +++ b/block-util/src/block/block_stuff.rs @@ -16,6 +16,10 @@ pub struct BlockStuff { } impl BlockStuff { + pub fn with_block(id: BlockId, block: Block) -> Self { + Self { id, block } + } + pub fn deserialize_checked(id: BlockId, data: &[u8]) -> Result { let file_hash = sha2::Sha256::digest(data); anyhow::ensure!( diff --git a/core/Cargo.toml b/core/Cargo.toml index a94c8a4fe..b5cebe425 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -22,5 +22,8 @@ tycho-network = { workspace = true } tycho-storage = { workspace = true } tycho-util = { workspace = true } +[dev-dependencies] +tycho-util = { path = "../util", features = ["test-util"] } + [lints] workspace = true diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index e2f60d022..ba9a641e9 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -1,13 +1,22 @@ -use anyhow::Result; -use everscale_types::models::{Block, BlockId}; +use anyhow::{Context, Result}; +use everscale_types::models::{BlockId, PrevBlockRef}; +use futures_util::future::BoxFuture; +use futures_util::stream::FuturesOrdered; +use futures_util::{FutureExt, TryStreamExt}; +use itertools::Itertools; pub mod provider; pub mod state; pub mod subscriber; +#[cfg(test)] +mod test_provider; + use provider::BlockProvider; use state::BlockStriderState; use subscriber::BlockSubscriber; +use tycho_block_util::block::BlockStuff; +use tycho_util::FastDashMap; pub struct BlockStriderBuilder(BlockStrider); @@ -32,7 +41,7 @@ impl BlockStriderBuilder { } impl BlockStriderBuilder { - pub fn with_subscriber( + pub fn with_subscriber( self, subscriber: B, ) -> BlockStriderBuilder { @@ -83,33 +92,102 @@ where pub async fn run(self) -> Result<()> { tracing::info!("block strider loop started"); + let mut map = BlocksGraph::new(); while let Some(master_block) = self.fetch_next_master_block().await { - // TODO: replace with block stuff - let master_id = get_block_id(&master_block); - let shard_hashes = get_shard_hashes(&master_block); - - for hash in shard_hashes { - if !self.state.is_traversed(&hash) { - let block = self.fetch_block(&hash).await?; + let master_id = master_block.id(); + tracing::debug!(id=?master_id, "Fetched next master block"); + let extra = master_block.block().load_extra()?; + dbg!(&extra); + let mc_extra = extra.load_custom()?.context("not a master block")?; + let shard_hashes = mc_extra.shards.latest_blocks(); + // todo: is order important? + let mut futures = FuturesOrdered::new(); - if let Err(e) = self.subscriber.handle_block(&block).await { - tracing::error!("error while handling block: {e:?}"); - // TODO: retry + backoff? - } - - self.state.commit_traversed(hash); - } + for shard_block_id in shard_hashes { + let this = &self; + let blocks_graph = ↦ + let block_id = shard_block_id.expect("Invalid shard block id"); + futures.push_back(async move { + this.find_prev_shard_blocks(block_id, blocks_graph).await + }); } - - self.state.commit_traversed(master_id); + let blocks: Vec<_> = futures + .try_collect() + .await + .expect("failed to collect shard blocks"); + let blocks = blocks.into_iter().flatten().collect_vec(); + map.set_bottom_blocks(blocks); + map.walk_topo(&self.subscriber, &self.state).await; + self.state.commit_traversed(*master_id); } tracing::info!("block strider loop finished"); Ok(()) } - async fn fetch_next_master_block(&self) -> Option { + fn find_prev_shard_blocks<'a>( + &'a self, + mut shard_block_id: BlockId, + blocks: &'a BlocksGraph, + ) -> BoxFuture<'a, Result>> { + async move { + let mut prev_shard_block_id = shard_block_id; + while !self.state.is_traversed(&shard_block_id) { + prev_shard_block_id = shard_block_id; + let block = self + .fetch_block(&shard_block_id) + .await + .expect("provider failed to fetch shard block"); + let info = block.block().load_info()?; + shard_block_id = match info.load_prev_ref()? { + PrevBlockRef::Single(id) => { + let id = BlockId { + shard: info.shard, + seqno: id.seqno, + root_hash: id.root_hash, + file_hash: id.file_hash, + }; + blocks.add_connection(id, shard_block_id); + id + } + PrevBlockRef::AfterMerge { left, right } => { + let (left_shard, right_shard) = + info.shard.split().expect("split on unsplitable shard"); + let left = BlockId { + shard: left_shard, + seqno: left.seqno, + root_hash: left.root_hash, + file_hash: left.file_hash, + }; + let right = BlockId { + shard: right_shard, + seqno: right.seqno, + root_hash: right.root_hash, + file_hash: right.file_hash, + }; + blocks.add_connection(left, shard_block_id); + blocks.add_connection(right, shard_block_id); + + return futures_util::try_join!( + self.find_prev_shard_blocks(left, blocks), + self.find_prev_shard_blocks(right, blocks) + ) + .map(|(mut left, right)| { + left.extend(right); + left + }); + } + }; + blocks.store_block(block); + } + Ok(vec![prev_shard_block_id]) + } + .boxed() + } + + async fn fetch_next_master_block(&self) -> Option { let last_traversed_master_block = self.state.load_last_traversed_master_block_id(); + tracing::debug!(?last_traversed_master_block, "Fetching next master block"); loop { match self .provider @@ -128,7 +206,7 @@ where } } - async fn fetch_block(&self, block_id: &BlockId) -> Result { + async fn fetch_block(&self, block_id: &BlockId) -> Result { loop { match self.provider.get_block(block_id).await { Some(Ok(block)) => break Ok(block), @@ -144,10 +222,84 @@ where } } -fn get_shard_hashes(_block: &Block) -> impl IntoIterator { - vec![].into_iter() +struct BlocksGraph { + block_store_map: FastDashMap, + connections: FastDashMap, + bottom_blocks: Vec, } -fn get_block_id(_block: &Block) -> BlockId { - unimplemented!() +impl BlocksGraph { + fn new() -> Self { + Self { + block_store_map: FastDashMap::default(), + connections: FastDashMap::default(), + bottom_blocks: Vec::new(), + } + } + + fn store_block(&self, block: BlockStuff) { + self.block_store_map.insert(*block.id(), block); + } + + // connection between the block and it child + fn add_connection(&self, id: BlockId, prev: BlockId) { + self.connections.insert(id, prev); + } + + fn set_bottom_blocks(&mut self, blocks: Vec) { + self.bottom_blocks = blocks; + } + + async fn walk_topo(&mut self, subscriber: &Sub, state: &dyn BlockStriderState) + where + Sub: BlockSubscriber + Send + Sync + 'static, + { + let mut next_blocks = Vec::with_capacity(self.bottom_blocks.len()); + loop { + if self.bottom_blocks.is_empty() { + break; + } + self.bottom_blocks.sort_unstable(); + for block_id in &self.bottom_blocks { + let block = self + .block_store_map + .get(block_id) + .expect("should be in map"); + subscriber + .handle_block(&block) + .await + .expect("subscriber failed"); + state.commit_traversed(*block_id); + let next_block = self.connections.get(block_id); + if let Some(next_block) = next_block { + next_blocks.push(*next_block.key()); + } + } + std::mem::swap(&mut next_blocks, &mut self.bottom_blocks); + next_blocks.clear(); + } + } +} + +#[cfg(test)] +mod test { + use super::state::InMemoryBlockStriderState; + use super::subscriber::PrintSubscriber; + use super::test_provider::TestBlockProvider; + use crate::block_strider::BlockStrider; + + #[tokio::test] + async fn test_block_strider() { + tycho_util::init_logger(); + let provider = TestBlockProvider::new(10); + let subscriber = PrintSubscriber; + let state = InMemoryBlockStriderState::default(); + + let strider = BlockStrider::builder() + .with_state(state) + .with_provider(provider) + .with_subscriber(subscriber) + .build(); + strider.run().await.unwrap(); + } } diff --git a/core/src/block_strider/provider.rs b/core/src/block_strider/provider.rs index edda7d7d5..3eb079631 100644 --- a/core/src/block_strider/provider.rs +++ b/core/src/block_strider/provider.rs @@ -1,13 +1,16 @@ -use everscale_types::models::{Block, BlockId}; +use everscale_types::models::BlockId; use futures_util::future::BoxFuture; use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; +use tycho_block_util::block::BlockStuff; + +pub type OptionalBlockStuff = Option>; /// Block provider *MUST* validate the block before returning it. pub trait BlockProvider: Send + Sync + 'static { - type GetNextBlockFut<'a>: Future>> + Send + 'a; - type GetBlockFut<'a>: Future>> + Send + 'a; + type GetNextBlockFut<'a>: Future + Send + 'a; + type GetBlockFut<'a>: Future + Send + 'a; fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>; fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>; @@ -47,8 +50,8 @@ struct ChainBlockProvider { } impl BlockProvider for ChainBlockProvider { - type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; - type GetBlockFut<'a> = BoxFuture<'a, Option>>; + type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; + type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { Box::pin(async move { @@ -86,8 +89,8 @@ mod test { } impl BlockProvider for MockBlockProvider { - type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; - type GetBlockFut<'a> = BoxFuture<'a, Option>>; + type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; + type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { Box::pin(async { @@ -151,9 +154,10 @@ mod test { .is_none()); } - fn get_empty_block() -> Block { + fn get_empty_block() -> BlockStuff { let block = ""; - everscale_types::boc::BocRepr::decode_base64(block).unwrap() + let block = everscale_types::boc::BocRepr::decode_base64(block).unwrap(); + BlockStuff::with_block(get_default_block_id(), block) } fn get_default_block_id() -> BlockId { diff --git a/core/src/block_strider/state.rs b/core/src/block_strider/state.rs index f6eef8363..7b229eea2 100644 --- a/core/src/block_strider/state.rs +++ b/core/src/block_strider/state.rs @@ -19,3 +19,25 @@ impl BlockStriderState for Box { ::commit_traversed(self, block_id); } } + +#[cfg(test)] +#[derive(Default)] +pub struct InMemoryBlockStriderState { + last_traversed_master_block_id: BlockId, + traversed_blocks: tycho_util::FastDashSet, +} + +#[cfg(test)] +impl BlockStriderState for InMemoryBlockStriderState { + fn load_last_traversed_master_block_id(&self) -> BlockId { + self.last_traversed_master_block_id + } + + fn is_traversed(&self, block_id: &BlockId) -> bool { + self.traversed_blocks.contains(block_id) + } + + fn commit_traversed(&self, block_id: BlockId) { + self.traversed_blocks.insert(block_id); + } +} diff --git a/core/src/block_strider/subscriber.rs b/core/src/block_strider/subscriber.rs index 88bfa3f8b..261dfba08 100644 --- a/core/src/block_strider/subscriber.rs +++ b/core/src/block_strider/subscriber.rs @@ -1,17 +1,17 @@ -use everscale_types::models::Block; use futures_util::future; use std::future::Future; +use tycho_block_util::block::BlockStuff; pub trait BlockSubscriber: Send + Sync + 'static { type HandleBlockFut: Future> + Send + 'static; - fn handle_block(&self, block: &Block) -> Self::HandleBlockFut; + fn handle_block(&self, block: &BlockStuff) -> Self::HandleBlockFut; } impl BlockSubscriber for Box { type HandleBlockFut = T::HandleBlockFut; - fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { + fn handle_block(&self, block: &BlockStuff) -> Self::HandleBlockFut { ::handle_block(self, block) } } @@ -24,7 +24,7 @@ pub struct FanoutBlockSubscriber { impl BlockSubscriber for FanoutBlockSubscriber { type HandleBlockFut = future::BoxFuture<'static, anyhow::Result<()>>; - fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { + fn handle_block(&self, block: &BlockStuff) -> Self::HandleBlockFut { let left = self.left.handle_block(block); let right = self.right.handle_block(block); @@ -34,3 +34,16 @@ impl BlockSubscriber for FanoutBlockSu }) } } + +#[cfg(test)] +pub struct PrintSubscriber; + +#[cfg(test)] +impl BlockSubscriber for PrintSubscriber { + type HandleBlockFut = future::Ready>; + + fn handle_block(&self, block: &BlockStuff) -> Self::HandleBlockFut { + println!("Handling block: {:?}", block.id()); + future::ready(Ok(())) + } +} diff --git a/core/src/block_strider/test_provider.rs b/core/src/block_strider/test_provider.rs new file mode 100644 index 000000000..2d1636cbd --- /dev/null +++ b/core/src/block_strider/test_provider.rs @@ -0,0 +1,279 @@ +use super::BlockProvider; +use crate::block_strider::provider::OptionalBlockStuff; +use everscale_types::cell::{Cell, CellFamily, Store}; +use everscale_types::dict::{AugDict, Dict}; +use everscale_types::merkle::MerkleUpdate; +use everscale_types::models::{ + Block, BlockExtra, BlockId, BlockInfo, BlockRef, CurrencyCollection, Lazy, McBlockExtra, + PrevBlockRef, ShardDescription, ShardFees, ShardHashes, ShardIdent, ValueFlow, +}; +use everscale_types::prelude::HashBytes; +use std::collections::HashMap; +use tycho_block_util::block::BlockStuff; + +const ZERO_HASH: HashBytes = HashBytes([0; 32]); + +impl BlockProvider for TestBlockProvider { + // type GetNextBlockFut<'a>: Future + Send + 'a; + type GetNextBlockFut<'a> = futures_util::future::Ready; + type GetBlockFut<'a> = futures_util::future::Ready; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + let next_id = self + .master_blocks + .iter() + .find(|id| id.shard == prev_block_id.shard && id.seqno == prev_block_id.seqno + 1); + futures_util::future::ready(next_id.and_then(|id| { + self.blocks + .get(id) + .map(|b| Ok(BlockStuff::with_block(id.clone(), b.clone()))) + })) + } + + fn get_block(&self, id: &BlockId) -> Self::GetBlockFut<'_> { + futures_util::future::ready( + self.blocks + .get(id) + .map(|b| Ok(BlockStuff::with_block(id.clone(), b.clone()))), + ) + } +} + +pub struct TestBlockProvider { + master_blocks: Vec, + blocks: HashMap, +} + +impl TestBlockProvider { + pub fn new(num_master_blocks: u32) -> Self { + let (blocks, master_blocks) = create_block_chain(num_master_blocks); + Self { + blocks, + master_blocks, + } + } +} + +fn create_block_chain(num_blocks: u32) -> (HashMap, Vec) { + let mut blocks = HashMap::new(); + let mut master_block_ids = Vec::new(); + let mut prev_shard_block_ref = zero_ref(); + let mut prev_block_ref = zero_ref(); + + for seqno in 1..=num_blocks { + master_block( + seqno, + &mut prev_block_ref, + &mut prev_shard_block_ref, + &mut blocks, + &mut master_block_ids, + ); + } + (blocks, master_block_ids) +} + +fn master_block( + seqno: u32, + prev_block_ref: &mut PrevBlockRef, + prev_shard_block_ref: &mut PrevBlockRef, + blocks: &mut HashMap, + master_ids: &mut Vec, +) { + let (block_ref, block_info) = block_info(prev_block_ref, seqno); + *prev_block_ref = PrevBlockRef::Single(block_ref.clone()); + + let shard_block_ids = link_shard_blocks(prev_shard_block_ref, 2, blocks); + let block_extra = McBlockExtra { + shards: ShardHashes::from_shards(shard_block_ids).unwrap(), + fees: ShardFees { + root: None, + fees: Default::default(), + create: Default::default(), + }, + prev_block_signatures: Default::default(), + recover_create_msg: None, + mint_msg: None, + copyleft_msgs: Default::default(), + config: None, + }; + let master_extra = Some(Lazy::new(&block_extra).unwrap()); + insert_block( + seqno, + block_info, + block_ref, + blocks, + Some(master_ids), + master_extra, + ); +} + +fn insert_block( + seqno: u32, + block_info: BlockInfo, + block_ref: BlockRef, + blocks: &mut HashMap, + master_ids: Option<&mut Vec>, + mc_extra: Option>, +) { + let block = Block { + global_id: 0, + info: Lazy::new(&block_info).unwrap(), + value_flow: default_cc(), + state_update: Lazy::new(&MerkleUpdate::default()).unwrap(), + out_msg_queue_updates: None, + extra: extra(mc_extra), + }; + let id = BlockId { + shard: ShardIdent::default(), + seqno, + root_hash: block_ref.root_hash, + file_hash: block_ref.file_hash, + }; + blocks.insert(id.clone(), block); + if let Some(master_ids) = master_ids { + master_ids.push(id); + } +} + +fn extra(custom: Option>) -> Lazy { + Lazy::new(&BlockExtra { + in_msg_description: Default::default(), + out_msg_description: Default::default(), + account_blocks: Lazy::new(&AugDict::new()).unwrap(), + rand_seed: Default::default(), + created_by: Default::default(), + custom, + }) + .unwrap() +} + +fn link_shard_blocks( + prev_block_ref: &mut PrevBlockRef, + chain_len: u32, + blocks: &mut HashMap, +) -> Vec<(ShardIdent, ShardDescription)> { + let starting_seqno = match &prev_block_ref { + PrevBlockRef::Single(s) => s.seqno, + PrevBlockRef::AfterMerge { .. } => { + unreachable!() + } + }; + let mut last_ref = None; + for seqno in starting_seqno..starting_seqno + chain_len { + let (block_ref, info) = block_info(prev_block_ref, seqno); + + last_ref = Some(( + ShardIdent::BASECHAIN, + ShardDescription { + seqno, + reg_mc_seqno: 0, + start_lt: 0, + end_lt: 0, + root_hash: block_ref.root_hash, + file_hash: block_ref.file_hash, + before_split: false, + before_merge: false, + want_split: false, + want_merge: false, + nx_cc_updated: false, + next_catchain_seqno: 0, + next_validator_shard: 0, + min_ref_mc_seqno: 0, + gen_utime: 0, + split_merge_at: None, + fees_collected: Default::default(), + funds_created: Default::default(), + copyleft_rewards: Default::default(), + proof_chain: None, + }, + )); + insert_block(seqno, info, block_ref.clone(), blocks, None, None); + *prev_block_ref = PrevBlockRef::Single(block_ref); + } + vec![last_ref.unwrap()] +} + +fn default_cc() -> Lazy { + let def_cc = CurrencyCollection::default(); + Lazy::new(&ValueFlow { + from_prev_block: def_cc.clone(), + to_next_block: def_cc.clone(), + imported: def_cc.clone(), + exported: def_cc.clone(), + fees_collected: def_cc.clone(), + fees_imported: def_cc.clone(), + recovered: def_cc.clone(), + created: def_cc.clone(), + minted: def_cc.clone(), + copyleft_rewards: Dict::new(), + }) + .unwrap() +} + +fn block_info(prev_block_ref: &PrevBlockRef, seqno: u32) -> (BlockRef, BlockInfo) { + let prev_block_ref = encode_ref(prev_block_ref.clone()); + let block_info = BlockInfo { + version: 0, + after_merge: false, + before_split: false, + after_split: false, + want_split: false, + want_merge: false, + key_block: false, + flags: 0, + seqno, + vert_seqno: 0, + shard: Default::default(), + gen_utime: 0, + start_lt: 0, + end_lt: 0, + gen_validator_list_hash_short: 0, + gen_catchain_seqno: 0, + min_ref_mc_seqno: 0, + prev_key_block_seqno: 0, + gen_software: Default::default(), + master_ref: None, + prev_ref: prev_block_ref, + prev_vert_ref: None, + }; + ( + BlockRef { + end_lt: 0, + seqno, + root_hash: seqno_to_hash(seqno), + file_hash: seqno_to_hash(seqno), + }, + block_info, + ) +} + +fn seqno_to_hash(i: u32) -> HashBytes { + let mut bytes = [0; 32]; + bytes[0] = i as u8; + HashBytes::from(bytes) +} + +fn zero_ref() -> PrevBlockRef { + PrevBlockRef::Single(BlockRef { + end_lt: 0, + seqno: 0, + root_hash: ZERO_HASH, + file_hash: ZERO_HASH, + }) +} + +fn encode_ref(prev_block_ref: PrevBlockRef) -> Cell { + let mut builder = everscale_types::cell::CellBuilder::new(); + match prev_block_ref { + PrevBlockRef::Single(r) => { + r.store_into(&mut builder, &mut Cell::empty_context()) + .unwrap(); + } + PrevBlockRef::AfterMerge { left, right } => { + let context = &mut Cell::empty_context(); + left.store_into(&mut builder, context).unwrap(); + right.store_into(&mut builder, context).unwrap(); + } + } + builder.build().unwrap() +} diff --git a/util/Cargo.toml b/util/Cargo.toml index b7ba4f49d..dc8e80899 100644 --- a/util/Cargo.toml +++ b/util/Cargo.toml @@ -22,9 +22,13 @@ serde = { workspace = true, features = ["derive"] } thiserror = { workspace = true } tokio = { workspace = true, features = ["time", "sync", "rt"] } tracing = { workspace = true } +tracing-subscriber = { workspace = true, optional = true } [dev-dependencies] tokio = { workspace = true, features = ["time", "sync", "rt-multi-thread", "macros"] } +[features] +test-util = ["tracing-subscriber"] + [lints] workspace = true diff --git a/util/src/lib.rs b/util/src/lib.rs index 40cbda55e..50a8907f6 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -138,3 +138,11 @@ mod tests { assert_eq!(one.unwrap().0, value.0); } } + +#[cfg(feature = "test-util")] +pub fn init_logger() { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::new("debug")) + .try_init() + .ok(); +} From 99a6385f4b851a032035205483e469da5c94adcd Mon Sep 17 00:00:00 2001 From: Vladimir Petrzhikovskii Date: Tue, 19 Mar 2024 18:07:19 +0100 Subject: [PATCH 5/8] test(strider): wip --- core/src/block_strider/mod.rs | 11 +++++++---- core/src/block_strider/state.rs | 20 +++++++++++++++++--- core/src/block_strider/test_provider.rs | 19 ++++++++++++++++--- 3 files changed, 40 insertions(+), 10 deletions(-) diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index ba9a641e9..58b48c94f 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -97,8 +97,9 @@ where let master_id = master_block.id(); tracing::debug!(id=?master_id, "Fetched next master block"); let extra = master_block.block().load_extra()?; - dbg!(&extra); - let mc_extra = extra.load_custom()?.context("not a master block")?; + let mc_extra = extra + .load_custom()? + .with_context(|| format!("failed to load custom for block: {:?}", master_id))?; let shard_hashes = mc_extra.shards.latest_blocks(); // todo: is order important? let mut futures = FuturesOrdered::new(); @@ -291,9 +292,11 @@ mod test { #[tokio::test] async fn test_block_strider() { tycho_util::init_logger(); - let provider = TestBlockProvider::new(10); + let provider = TestBlockProvider::new(3); + provider.validate(); + let subscriber = PrintSubscriber; - let state = InMemoryBlockStriderState::default(); + let state = InMemoryBlockStriderState::new(provider.first_master_block()); let strider = BlockStrider::builder() .with_state(state) diff --git a/core/src/block_strider/state.rs b/core/src/block_strider/state.rs index 7b229eea2..241b05f13 100644 --- a/core/src/block_strider/state.rs +++ b/core/src/block_strider/state.rs @@ -1,4 +1,5 @@ use everscale_types::models::BlockId; +use parking_lot::Mutex; pub trait BlockStriderState: Send + Sync + 'static { fn load_last_traversed_master_block_id(&self) -> BlockId; @@ -21,16 +22,25 @@ impl BlockStriderState for Box { } #[cfg(test)] -#[derive(Default)] pub struct InMemoryBlockStriderState { - last_traversed_master_block_id: BlockId, + last_traversed_master_block_id: Mutex, traversed_blocks: tycho_util::FastDashSet, } +#[cfg(test)] +impl InMemoryBlockStriderState { + pub fn new(id: BlockId) -> Self { + Self { + last_traversed_master_block_id: Mutex::new(id), + traversed_blocks: tycho_util::FastDashSet::default(), + } + } +} + #[cfg(test)] impl BlockStriderState for InMemoryBlockStriderState { fn load_last_traversed_master_block_id(&self) -> BlockId { - self.last_traversed_master_block_id + *self.last_traversed_master_block_id.lock() } fn is_traversed(&self, block_id: &BlockId) -> bool { @@ -38,6 +48,10 @@ impl BlockStriderState for InMemoryBlockStriderState { } fn commit_traversed(&self, block_id: BlockId) { + if block_id.is_masterchain() { + *self.last_traversed_master_block_id.lock() = block_id; + } + self.traversed_blocks.insert(block_id); } } diff --git a/core/src/block_strider/test_provider.rs b/core/src/block_strider/test_provider.rs index 2d1636cbd..e1bef5066 100644 --- a/core/src/block_strider/test_provider.rs +++ b/core/src/block_strider/test_provider.rs @@ -22,11 +22,11 @@ impl BlockProvider for TestBlockProvider { let next_id = self .master_blocks .iter() - .find(|id| id.shard == prev_block_id.shard && id.seqno == prev_block_id.seqno + 1); + .find(|id| id.seqno == prev_block_id.seqno + 1); futures_util::future::ready(next_id.and_then(|id| { self.blocks .get(id) - .map(|b| Ok(BlockStuff::with_block(id.clone(), b.clone()))) + .map(|b| Ok(BlockStuff::with_block(*id, b.clone()))) })) } @@ -34,7 +34,7 @@ impl BlockProvider for TestBlockProvider { futures_util::future::ready( self.blocks .get(id) - .map(|b| Ok(BlockStuff::with_block(id.clone(), b.clone()))), + .map(|b| Ok(BlockStuff::with_block(*id, b.clone()))), ) } } @@ -52,6 +52,19 @@ impl TestBlockProvider { master_blocks, } } + + pub fn first_master_block(&self) -> BlockId { + *self.master_blocks.first().unwrap() + } + + pub fn validate(&self) { + for master in &self.master_blocks { + tracing::info!("Loading extra for block {:?}", master); + let block = self.blocks.get(master).unwrap(); + let extra = block.load_extra().unwrap(); + extra.load_custom().unwrap().expect("validation failed"); + } + } } fn create_block_chain(num_blocks: u32) -> (HashMap, Vec) { From dd6e9ae6f8e17834a0908f62f6493d5af5815da0 Mon Sep 17 00:00:00 2001 From: Alexey Pashinov Date: Wed, 27 Mar 2024 11:43:52 +0100 Subject: [PATCH 6/8] refactor(core): fix tests for block strider --- core/src/block_strider/mod.rs | 8 +++++++- core/src/block_strider/state.rs | 5 ++++- core/src/block_strider/test_provider.rs | 21 +++++++++++++-------- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index 58b48c94f..d42bf9354 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -133,8 +133,14 @@ where ) -> BoxFuture<'a, Result>> { async move { let mut prev_shard_block_id = shard_block_id; + while !self.state.is_traversed(&shard_block_id) { + if shard_block_id.seqno == 0 { + break; + } + prev_shard_block_id = shard_block_id; + let block = self .fetch_block(&shard_block_id) .await @@ -273,7 +279,7 @@ impl BlocksGraph { state.commit_traversed(*block_id); let next_block = self.connections.get(block_id); if let Some(next_block) = next_block { - next_blocks.push(*next_block.key()); + next_blocks.push(*next_block.value()); } } std::mem::swap(&mut next_blocks, &mut self.bottom_blocks); diff --git a/core/src/block_strider/state.rs b/core/src/block_strider/state.rs index 241b05f13..ee31821db 100644 --- a/core/src/block_strider/state.rs +++ b/core/src/block_strider/state.rs @@ -30,9 +30,12 @@ pub struct InMemoryBlockStriderState { #[cfg(test)] impl InMemoryBlockStriderState { pub fn new(id: BlockId) -> Self { + let mut traversed_blocks = tycho_util::FastDashSet::default(); + traversed_blocks.insert(id); + Self { last_traversed_master_block_id: Mutex::new(id), - traversed_blocks: tycho_util::FastDashSet::default(), + traversed_blocks, } } } diff --git a/core/src/block_strider/test_provider.rs b/core/src/block_strider/test_provider.rs index e1bef5066..6a6e0c405 100644 --- a/core/src/block_strider/test_provider.rs +++ b/core/src/block_strider/test_provider.rs @@ -92,7 +92,7 @@ fn master_block( blocks: &mut HashMap, master_ids: &mut Vec, ) { - let (block_ref, block_info) = block_info(prev_block_ref, seqno); + let (block_ref, block_info) = block_info(prev_block_ref, seqno, true); *prev_block_ref = PrevBlockRef::Single(block_ref.clone()); let shard_block_ids = link_shard_blocks(prev_shard_block_ref, 2, blocks); @@ -134,10 +134,10 @@ fn insert_block( value_flow: default_cc(), state_update: Lazy::new(&MerkleUpdate::default()).unwrap(), out_msg_queue_updates: None, - extra: extra(mc_extra), + extra: extra(mc_extra.clone()), }; let id = BlockId { - shard: ShardIdent::default(), + shard: block_info.shard, seqno, root_hash: block_ref.root_hash, file_hash: block_ref.file_hash, @@ -172,9 +172,8 @@ fn link_shard_blocks( } }; let mut last_ref = None; - for seqno in starting_seqno..starting_seqno + chain_len { - let (block_ref, info) = block_info(prev_block_ref, seqno); - + for seqno in starting_seqno + 1..starting_seqno + chain_len { + let (block_ref, info) = block_info(prev_block_ref, seqno, false); last_ref = Some(( ShardIdent::BASECHAIN, ShardDescription { @@ -223,7 +222,13 @@ fn default_cc() -> Lazy { .unwrap() } -fn block_info(prev_block_ref: &PrevBlockRef, seqno: u32) -> (BlockRef, BlockInfo) { +fn block_info(prev_block_ref: &PrevBlockRef, seqno: u32, is_mc: bool) -> (BlockRef, BlockInfo) { + let shard = if is_mc { + ShardIdent::MASTERCHAIN + } else { + ShardIdent::BASECHAIN + }; + let prev_block_ref = encode_ref(prev_block_ref.clone()); let block_info = BlockInfo { version: 0, @@ -236,7 +241,7 @@ fn block_info(prev_block_ref: &PrevBlockRef, seqno: u32) -> (BlockRef, BlockInfo flags: 0, seqno, vert_seqno: 0, - shard: Default::default(), + shard, gen_utime: 0, start_lt: 0, end_lt: 0, From f70df9a5bd23b02f38cf74a3750d72bb1e06ed1f Mon Sep 17 00:00:00 2001 From: Vladimir Petrzhikovskii Date: Thu, 4 Apr 2024 15:14:02 +0200 Subject: [PATCH 7/8] refactor(block-strider): resolve rebase issues --- core/src/block_strider/state.rs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/core/src/block_strider/state.rs b/core/src/block_strider/state.rs index ee31821db..6b3308097 100644 --- a/core/src/block_strider/state.rs +++ b/core/src/block_strider/state.rs @@ -1,5 +1,4 @@ use everscale_types::models::BlockId; -use parking_lot::Mutex; pub trait BlockStriderState: Send + Sync + 'static { fn load_last_traversed_master_block_id(&self) -> BlockId; @@ -23,18 +22,18 @@ impl BlockStriderState for Box { #[cfg(test)] pub struct InMemoryBlockStriderState { - last_traversed_master_block_id: Mutex, + last_traversed_master_block_id: parking_lot::Mutex, traversed_blocks: tycho_util::FastDashSet, } #[cfg(test)] impl InMemoryBlockStriderState { pub fn new(id: BlockId) -> Self { - let mut traversed_blocks = tycho_util::FastDashSet::default(); + let traversed_blocks = tycho_util::FastDashSet::default(); traversed_blocks.insert(id); Self { - last_traversed_master_block_id: Mutex::new(id), + last_traversed_master_block_id: parking_lot::Mutex::new(id), traversed_blocks, } } From 1c190838a69265ee6680ed155439878a74d9352f Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Tue, 9 Apr 2024 19:00:27 +0200 Subject: [PATCH 8/8] fix(core): fix build --- Cargo.lock | 19 +++++++++++++++---- Cargo.toml | 2 ++ core/Cargo.toml | 7 +++++-- 3 files changed, 22 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 43027e6c4..5a077e243 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -648,8 +648,7 @@ dependencies = [ [[package]] name = "everscale-types" version = "0.1.0-rc.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff3c058b07bdb5414da10bc8a2489715e31b0c3f4274a213c1a23831e9d94e91" +source = "git+https://github.com/broxus/everscale-types.git?branch=0xdeafbeef/push-yntmntzvxrlu#9ef94cf9f1042d0605d2cc3325fdc570c1092bcd" dependencies = [ "ahash", "base64 0.21.7", @@ -658,6 +657,7 @@ dependencies = [ "everscale-crypto", "everscale-types-proc", "hex", + "itertools", "once_cell", "serde", "sha2", @@ -669,8 +669,7 @@ dependencies = [ [[package]] name = "everscale-types-proc" version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "323d8b61c76be2c16eb2d72d007f1542fdeb3760fdf2e2cae219fc0da3db0c09" +source = "git+https://github.com/broxus/everscale-types.git?branch=0xdeafbeef/push-yntmntzvxrlu#9ef94cf9f1042d0605d2cc3325fdc570c1092bcd" dependencies = [ "proc-macro2", "quote", @@ -815,6 +814,15 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "itertools" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba291022dbbd398a455acf126c1e341954079855bc60dfdda641363bd6922569" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -2181,8 +2189,11 @@ dependencies = [ "castaway", "everscale-types", "futures-util", + "itertools", + "parking_lot", "tokio", "tracing", + "tycho-block-util", "tycho-network", "tycho-storage", "tycho-util", diff --git a/Cargo.toml b/Cargo.toml index af5350f16..773233388 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ fdlimit = "0.3.0" futures-util = "0.3" hex = "0.4" humantime = "2" +itertools = "0.12" libc = "0.2" moka = { version = "0.12", features = ["sync"] } num-traits = "0.2.18" @@ -98,6 +99,7 @@ debug = true # NOTE: use crates.io dependency when it is released # https://github.com/sagebind/castaway/issues/18 castaway = { git = "https://github.com/sagebind/castaway.git" } +everscale-types = { git = "https://github.com/broxus/everscale-types.git", branch = "0xdeafbeef/push-yntmntzvxrlu" } [workspace.lints.rust] future_incompatible = "warn" diff --git a/core/Cargo.toml b/core/Cargo.toml index b5cebe425..e0ba3defd 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -14,16 +14,19 @@ async-trait = { workspace = true } castaway = { workspace = true } everscale-types = { workspace = true } futures-util = { workspace = true } -tracing = { workspace = true } +itertools = { workspace = true } +parking_lot = { workspace = true } tokio = { workspace = true, features = ["rt"] } +tracing = { workspace = true } # local deps +tycho-block-util = { workspace = true } tycho-network = { workspace = true } tycho-storage = { workspace = true } tycho-util = { workspace = true } [dev-dependencies] -tycho-util = { path = "../util", features = ["test-util"] } +tycho-util = { workspace = true, features = ["test-util"] } [lints] workspace = true