diff --git a/Cargo.lock b/Cargo.lock index be80f1e42..ab42a8d77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -163,6 +163,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-trait" +version = "0.1.77" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "autocfg" version = "1.1.0" @@ -430,6 +441,15 @@ version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "19d374276b40fb8bbdee95aef7c7fa6b5316ec764510eb64b8dd0e2ed0d7e7f5" +[[package]] +name = "crc32c" +version = "0.6.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89254598aa9b9fa608de44b3ae54c810f0f06d755e24c50177f1f8f31ff50ce2" +dependencies = [ + "rustc_version", +] + [[package]] name = "crossbeam-channel" version = "0.5.11" @@ -607,6 +627,38 @@ dependencies = [ "tl-proto", ] +[[package]] +name = "everscale-types" +version = "0.1.0-rc.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff3c058b07bdb5414da10bc8a2489715e31b0c3f4274a213c1a23831e9d94e91" +dependencies = [ + "ahash", + "base64", + "bitflags 2.4.2", + "crc32c", + "everscale-crypto", + "everscale-types-proc", + "hex", + "once_cell", + "serde", + "sha2", + "smallvec", + "thiserror", + "tl-proto", +] + +[[package]] +name = "everscale-types-proc" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "323d8b61c76be2c16eb2d72d007f1542fdeb3760fdf2e2cae219fc0da3db0c09" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "fastrand" version = "2.0.1" @@ -1974,6 +2026,12 @@ dependencies = [ name = "tycho-core" version = "0.0.1" dependencies = [ + "anyhow", + "async-trait", + "castaway", + "everscale-types", + "futures-util", + "tracing", "tycho-network", "tycho-storage", "tycho-util", diff --git a/core/Cargo.toml b/core/Cargo.toml index d9fab0eff..f6a6fc6a0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -5,7 +5,12 @@ edition = "2021" description = "Basic functionality of peer." [dependencies] -# crates.io deps +anyhow = "1.0.80" +async-trait = "0.1.77" +castaway = "0.2" +everscale-types = "0.1.0-rc.6" +futures-util = "0.3.30" +tracing = "0.1.40" # local deps tycho-network = { path = "../network", version = "=0.0.1" } @@ -13,4 +18,4 @@ tycho-storage = { path = "../storage", version = "=0.0.1" } tycho-util = { path = "../util", version = "=0.0.1" } [lints] -workspace= true \ No newline at end of file +workspace = true diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs new file mode 100644 index 000000000..d82565432 --- /dev/null +++ b/core/src/block_strider/mod.rs @@ -0,0 +1,240 @@ +use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; + +use anyhow::Result; +use everscale_types::models::{Block, BlockId}; +use futures_util::future::BoxFuture; + +pub struct BlockStriderBuilder(BlockStrider); + +impl BlockStriderBuilder<(), T2, T3> { + pub fn with_state(self, state: S) -> BlockStriderBuilder { + BlockStriderBuilder(BlockStrider { + state, + provider: self.0.provider, + subscriber: self.0.subscriber, + }) + } +} + +impl BlockStriderBuilder { + pub fn with_provider(self, provider: P) -> BlockStriderBuilder { + BlockStriderBuilder(BlockStrider { + state: self.0.state, + provider, + subscriber: self.0.subscriber, + }) + } +} + +impl BlockStriderBuilder { + pub fn with_subscriber( + self, + subscriber: B, + ) -> BlockStriderBuilder { + BlockStriderBuilder(BlockStrider { + state: self.0.state, + provider: self.0.provider, + subscriber, + }) + } +} + +impl BlockStriderBuilder +where + S: BlockStriderState, + P: BlockProvider, + B: BlockSubscriber, +{ + pub fn build(self) -> BlockStrider { + self.0 + } +} + +pub struct BlockStrider { + state: S, + provider: P, + subscriber: B, +} + +impl BlockStrider<(), (), ()> { + pub fn builder() -> BlockStriderBuilder<(), (), ()> { + BlockStriderBuilder(BlockStrider { + state: (), + provider: (), + subscriber: (), + }) + } +} + +impl BlockStrider +where + S: BlockStriderState, + P: BlockProvider, + B: BlockSubscriber, +{ + /// Walks through blocks and handles them. + /// + /// Stops either when the provider is exhausted or it can't provide a requested block. + pub async fn run(self) -> Result<()> { + tracing::info!("block strider loop started"); + + while let Some(master_block) = self.fetch_next_master_block().await { + // TODO: replace with block stuff + let master_id = get_block_id(&master_block); + let shard_hashes = get_shard_hashes(&master_block); + + for hash in shard_hashes { + if !self.state.is_traversed(&hash) { + let block = self.fetch_block(&hash).await?; + + if let Err(e) = self.subscriber.handle_block(&block).await { + tracing::error!("error while handling block: {e:?}"); + // TODO: retry + backoff? + } + + self.state.commit_traversed(hash); + } + } + + self.state.commit_traversed(master_id); + } + + tracing::info!("block strider loop finished"); + Ok(()) + } + + async fn fetch_next_master_block(&self) -> Option { + let last_traversed_master_block = self.state.load_last_traversed_master_block_id(); + loop { + match self + .provider + .get_next_block(&last_traversed_master_block) + .await? + { + Ok(block) => break Some(block), + Err(e) => { + tracing::error!( + ?last_traversed_master_block, + "error while fetching master block: {e:?}", + ); + // TODO: backoff + } + } + } + } + + async fn fetch_block(&self, block_id: &BlockId) -> Result { + loop { + match self.provider.get_block(block_id).await { + Some(Ok(block)) => break Ok(block), + Some(Err(e)) => { + tracing::error!("error while fetching block: {e:?}"); + // TODO: backoff + } + None => { + anyhow::bail!("block not found: {block_id}") + } + } + } + } +} + +pub trait BlockStriderState: Send + Sync + 'static { + fn load_last_traversed_master_block_id(&self) -> BlockId; + fn is_traversed(&self, block_id: &BlockId) -> bool; + fn commit_traversed(&self, block_id: BlockId); +} + +impl BlockStriderState for Box { + fn load_last_traversed_master_block_id(&self) -> BlockId { + ::load_last_traversed_master_block_id(self) + } + + fn is_traversed(&self, block_id: &BlockId) -> bool { + ::is_traversed(self, block_id) + } + + fn commit_traversed(&self, block_id: BlockId) { + ::commit_traversed(self, block_id); + } +} + +/// Block provider *MUST* validate the block before returning it. +pub trait BlockProvider: Send + Sync + 'static { + type GetNextBlockFut<'a>: Future>> + Send + 'a; + type GetBlockFut<'a>: Future>> + Send + 'a; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>; + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>; +} + +impl BlockProvider for Box { + type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>; + type GetBlockFut<'a> = T::GetBlockFut<'a>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + ::get_next_block(self, prev_block_id) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + ::get_block(self, block_id) + } +} + +pub trait BlockSubscriber: Send + Sync + 'static { + type HandleBlockFut: Future> + Send + 'static; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut; +} + +impl BlockSubscriber for Box { + type HandleBlockFut = T::HandleBlockFut; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { + ::handle_block(self, block) + } +} + +fn get_shard_hashes(_block: &Block) -> impl IntoIterator { + vec![].into_iter() +} + +fn get_block_id(_block: &Block) -> BlockId { + unimplemented!() +} + +// === Provider combinators === +struct ChainBlockProvider { + left: T1, + right: T2, + is_right: AtomicBool, +} + +impl BlockProvider for ChainBlockProvider { + type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; + type GetBlockFut<'a> = BoxFuture<'a, Option>>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + Box::pin(async move { + if !self.is_right.load(Ordering::Acquire) { + let res = self.left.get_next_block(prev_block_id).await; + if res.is_some() { + return res; + } + self.is_right.store(true, Ordering::Release); + } + self.right.get_next_block(prev_block_id).await + }) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + Box::pin(async { + let res = self.left.get_block(block_id).await; + if res.is_some() { + return res; + } + self.right.get_block(block_id).await + }) + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index 8b1378917..071271136 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1 +1 @@ - +pub mod block_strider;