From 25f34d99f210d87fe9193cd909aad35c760e9182 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Mon, 26 Feb 2024 19:13:32 +0100 Subject: [PATCH] refactor(core): change block strider interface --- Cargo.lock | 1 + core/Cargo.toml | 6 +- core/src/block_strider/mod.rs | 274 +++++++++++++++++++++++----------- network/src/util/router.rs | 6 +- util/src/futures.rs | 10 +- 5 files changed, 201 insertions(+), 96 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 292ef38d6..ab42a8d77 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2028,6 +2028,7 @@ version = "0.0.1" dependencies = [ "anyhow", "async-trait", + "castaway", "everscale-types", "futures-util", "tracing", diff --git a/core/Cargo.toml b/core/Cargo.toml index f539a0504..f6a6fc6a0 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -5,12 +5,12 @@ edition = "2021" description = "Basic functionality of peer." [dependencies] -async-trait = "0.1.77" anyhow = "1.0.80" +async-trait = "0.1.77" +castaway = "0.2" everscale-types = "0.1.0-rc.6" -tracing = "0.1.40" futures-util = "0.3.30" - +tracing = "0.1.40" # local deps tycho-network = { path = "../network", version = "=0.0.1" } diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index a53a1e357..d82565432 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -1,132 +1,201 @@ -use async_trait::async_trait; - -use everscale_types::models::{Block, BlockId}; -use futures_util::stream::FuturesUnordered; -use futures_util::StreamExt; use std::future::Future; +use std::sync::atomic::{AtomicBool, Ordering}; -#[async_trait] -pub trait OnStriderStep { - async fn handle_block(&self, block: &Block) -> anyhow::Result<()>; -} - -/// Block provider *MUST* validate the block before returning it. -pub trait BlockProvider { - type GetNextBlockFut: Future; - type GetBlockFut: Future; +use anyhow::Result; +use everscale_types::models::{Block, BlockId}; +use futures_util::future::BoxFuture; - fn get_next_block(&self, prev_block_id: &BlockId) -> Self::GetNextBlockFut; - fn get_block(&self, block_id: &BlockId) -> Self::GetBlockFut; +pub struct BlockStriderBuilder(BlockStrider); - fn status(&self) -> ProviderStatus; +impl BlockStriderBuilder<(), T2, T3> { + pub fn with_state(self, state: S) -> BlockStriderBuilder { + BlockStriderBuilder(BlockStrider { + state, + provider: self.0.provider, + subscriber: self.0.subscriber, + }) + } } -pub trait PersistenceProvider { - fn load_last_traversed_master_block_seqno(&self) -> BlockId; - fn commit_last_traversed_master_block_seqno(&self, block_id: BlockId); - - fn shard_block_traversed(&self, block_id: &BlockId) -> bool; - fn commit_shard_block_traversed(&self, block_id: BlockId); +impl BlockStriderBuilder { + pub fn with_provider(self, provider: P) -> BlockStriderBuilder { + BlockStriderBuilder(BlockStrider { + state: self.0.state, + provider, + subscriber: self.0.subscriber, + }) + } +} - // drop all shard blocks in the same shard with seqno < block_id - fn gc_shard_blocks(&self, block_id: &BlockId); +impl BlockStriderBuilder { + pub fn with_subscriber( + self, + subscriber: B, + ) -> BlockStriderBuilder { + BlockStriderBuilder(BlockStrider { + state: self.0.state, + provider: self.0.provider, + subscriber, + }) + } } -pub enum ProviderResult { - Error(anyhow::Error), - NotFound, // or should provider backoff until the block is found? - Found(Block), +impl BlockStriderBuilder +where + S: BlockStriderState, + P: BlockProvider, + B: BlockSubscriber, +{ + pub fn build(self) -> BlockStrider { + self.0 + } } -#[derive(Debug, PartialEq)] -pub enum ProviderStatus { - Ready, - NotReady, +pub struct BlockStrider { + state: S, + provider: P, + subscriber: B, } -pub struct StriderBuilder { - subscribers: Vec>, - persistence_provider: Provider, // or it also should be a vec? +impl BlockStrider<(), (), ()> { + pub fn builder() -> BlockStriderBuilder<(), (), ()> { + BlockStriderBuilder(BlockStrider { + state: (), + provider: (), + subscriber: (), + }) + } } -impl StriderBuilder +impl BlockStrider where - Provider: PersistenceProvider, + S: BlockStriderState, + P: BlockProvider, + B: BlockSubscriber, { - pub fn new(persistence_provider: Provider) -> Self { - Self { - subscribers: Vec::new(), - persistence_provider, - } - } - - pub fn add_subscriber(&mut self, subscriber: Box) { - self.subscribers.push(subscriber); - } - - // this function gurarantees at least once delivery - pub async fn start(self, block_provider: impl BlockProvider) { - loop { - let master_block = self.fetch_next_master_block(&block_provider).await; + /// Walks through blocks and handles them. + /// + /// Stops either when the provider is exhausted or it can't provide a requested block. + pub async fn run(self) -> Result<()> { + tracing::info!("block strider loop started"); + + while let Some(master_block) = self.fetch_next_master_block().await { + // TODO: replace with block stuff let master_id = get_block_id(&master_block); let shard_hashes = get_shard_hashes(&master_block); + for hash in shard_hashes { - if !self.persistence_provider.shard_block_traversed(&hash) { - let block = self.fetch_block(&hash, &block_provider).await; - let mut subscribers: FuturesUnordered<_> = self - .subscribers - .iter() - .map(|subscriber| subscriber.handle_block(&block)) - .collect(); - // wait for all subscribers to finish - while subscribers.next().await.is_some() {} - self.persistence_provider.commit_shard_block_traversed(hash); + if !self.state.is_traversed(&hash) { + let block = self.fetch_block(&hash).await?; + + if let Err(e) = self.subscriber.handle_block(&block).await { + tracing::error!("error while handling block: {e:?}"); + // TODO: retry + backoff? + } + + self.state.commit_traversed(hash); } } - self.persistence_provider - .commit_last_traversed_master_block_seqno(master_id); + + self.state.commit_traversed(master_id); } + + tracing::info!("block strider loop finished"); + Ok(()) } - async fn fetch_next_master_block(&self, block_provider: &impl BlockProvider) -> Block { - let last_traversed_master_block = self - .persistence_provider - .load_last_traversed_master_block_seqno(); + async fn fetch_next_master_block(&self) -> Option { + let last_traversed_master_block = self.state.load_last_traversed_master_block_id(); loop { - match block_provider + match self + .provider .get_next_block(&last_traversed_master_block) - .await + .await? { - ProviderResult::Error(e) => { + Ok(block) => break Some(block), + Err(e) => { tracing::error!( ?last_traversed_master_block, - "error while fetching master block: {:?}", - e + "error while fetching master block: {e:?}", ); + // TODO: backoff } - ProviderResult::NotFound => { - tracing::info!(?last_traversed_master_block, "master block not found"); - } - ProviderResult::Found(block) => break block, } } } - async fn fetch_block(&self, id: &BlockId, block_provider: &impl BlockProvider) -> Block { + async fn fetch_block(&self, block_id: &BlockId) -> Result { loop { - match block_provider.get_block(id).await { - ProviderResult::Error(e) => { - tracing::error!("error while fetching block: {:?}", e); + match self.provider.get_block(block_id).await { + Some(Ok(block)) => break Ok(block), + Some(Err(e)) => { + tracing::error!("error while fetching block: {e:?}"); + // TODO: backoff } - ProviderResult::NotFound => { - tracing::info!(?id, "no block found"); + None => { + anyhow::bail!("block not found: {block_id}") } - ProviderResult::Found(block) => break block, } } } } +pub trait BlockStriderState: Send + Sync + 'static { + fn load_last_traversed_master_block_id(&self) -> BlockId; + fn is_traversed(&self, block_id: &BlockId) -> bool; + fn commit_traversed(&self, block_id: BlockId); +} + +impl BlockStriderState for Box { + fn load_last_traversed_master_block_id(&self) -> BlockId { + ::load_last_traversed_master_block_id(self) + } + + fn is_traversed(&self, block_id: &BlockId) -> bool { + ::is_traversed(self, block_id) + } + + fn commit_traversed(&self, block_id: BlockId) { + ::commit_traversed(self, block_id); + } +} + +/// Block provider *MUST* validate the block before returning it. +pub trait BlockProvider: Send + Sync + 'static { + type GetNextBlockFut<'a>: Future>> + Send + 'a; + type GetBlockFut<'a>: Future>> + Send + 'a; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>; + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>; +} + +impl BlockProvider for Box { + type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>; + type GetBlockFut<'a> = T::GetBlockFut<'a>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + ::get_next_block(self, prev_block_id) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + ::get_block(self, block_id) + } +} + +pub trait BlockSubscriber: Send + Sync + 'static { + type HandleBlockFut: Future> + Send + 'static; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut; +} + +impl BlockSubscriber for Box { + type HandleBlockFut = T::HandleBlockFut; + + fn handle_block(&self, block: &Block) -> Self::HandleBlockFut { + ::handle_block(self, block) + } +} + fn get_shard_hashes(_block: &Block) -> impl IntoIterator { vec![].into_iter() } @@ -134,3 +203,38 @@ fn get_shard_hashes(_block: &Block) -> impl IntoIterator { fn get_block_id(_block: &Block) -> BlockId { unimplemented!() } + +// === Provider combinators === +struct ChainBlockProvider { + left: T1, + right: T2, + is_right: AtomicBool, +} + +impl BlockProvider for ChainBlockProvider { + type GetNextBlockFut<'a> = BoxFuture<'a, Option>>; + type GetBlockFut<'a> = BoxFuture<'a, Option>>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + Box::pin(async move { + if !self.is_right.load(Ordering::Acquire) { + let res = self.left.get_next_block(prev_block_id).await; + if res.is_some() { + return res; + } + self.is_right.store(true, Ordering::Release); + } + self.right.get_next_block(prev_block_id).await + }) + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + Box::pin(async { + let res = self.left.get_block(block_id).await; + if res.is_some() { + return res; + } + self.right.get_block(block_id).await + }) + } +} diff --git a/network/src/util/router.rs b/network/src/util/router.rs index 7a445643b..1ca9b428e 100644 --- a/network/src/util/router.rs +++ b/network/src/util/router.rs @@ -96,9 +96,9 @@ where Q: Send + 'static, { type QueryResponse = Q; - type OnQueryFuture = BoxFutureOrNoop>; - type OnMessageFuture = BoxFutureOrNoop<()>; - type OnDatagramFuture = BoxFutureOrNoop<()>; + type OnQueryFuture = BoxFutureOrNoop<'static, Option>; + type OnMessageFuture = BoxFutureOrNoop<'static, ()>; + type OnDatagramFuture = BoxFutureOrNoop<'static, ()>; fn on_query(&self, req: Request) -> Self::OnQueryFuture { match find_handler(&req, &self.inner.query_handlers, &self.inner.services) { diff --git a/util/src/futures.rs b/util/src/futures.rs index 66fd9c5fa..d708d22ea 100644 --- a/util/src/futures.rs +++ b/util/src/futures.rs @@ -4,12 +4,12 @@ use std::task::{Context, Poll}; use futures_util::future::BoxFuture; use futures_util::{Future, FutureExt}; -pub enum BoxFutureOrNoop { - Boxed(BoxFuture<'static, T>), +pub enum BoxFutureOrNoop<'a, T> { + Boxed(BoxFuture<'a, T>), Noop, } -impl BoxFutureOrNoop { +impl BoxFutureOrNoop<'static, T> { #[inline] pub fn future(f: F) -> Self where @@ -22,7 +22,7 @@ impl BoxFutureOrNoop { } } -impl Future for BoxFutureOrNoop<()> { +impl<'a> Future for BoxFutureOrNoop<'a, ()> { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { @@ -33,7 +33,7 @@ impl Future for BoxFutureOrNoop<()> { } } -impl Future for BoxFutureOrNoop> { +impl<'a, T> Future for BoxFutureOrNoop<'a, Option> { type Output = Option; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {