Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add block strider #11

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,17 @@ edition = "2021"
description = "Basic functionality of peer."

[dependencies]
# crates.io deps
anyhow = "1.0.80"
async-trait = "0.1.77"
castaway = "0.2"
everscale-types = "0.1.0-rc.6"
futures-util = "0.3.30"
tracing = "0.1.40"

# local deps
tycho-network = { path = "../network", version = "=0.0.1" }
tycho-storage = { path = "../storage", version = "=0.0.1" }
tycho-util = { path = "../util", version = "=0.0.1" }

[lints]
workspace= true
workspace = true
240 changes: 240 additions & 0 deletions core/src/block_strider/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};

use anyhow::Result;
use everscale_types::models::{Block, BlockId};
use futures_util::future::BoxFuture;

pub struct BlockStriderBuilder<S, P, B>(BlockStrider<S, P, B>);

impl<T2, T3> BlockStriderBuilder<(), T2, T3> {
pub fn with_state<S: BlockStriderState>(self, state: S) -> BlockStriderBuilder<S, T2, T3> {
BlockStriderBuilder(BlockStrider {
state,
provider: self.0.provider,
subscriber: self.0.subscriber,
})
}
}

impl<T1, T3> BlockStriderBuilder<T1, (), T3> {
pub fn with_provider<P: BlockProvider>(self, provider: P) -> BlockStriderBuilder<T1, P, T3> {
BlockStriderBuilder(BlockStrider {
state: self.0.state,
provider,
subscriber: self.0.subscriber,
})
}
}

impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
pub fn with_subscriber<B: BlockStriderState>(
self,
subscriber: B,
) -> BlockStriderBuilder<T1, T2, B> {
BlockStriderBuilder(BlockStrider {
state: self.0.state,
provider: self.0.provider,
subscriber,
})
}
}

impl<S, P, B> BlockStriderBuilder<S, P, B>
where
S: BlockStriderState,
P: BlockProvider,
B: BlockSubscriber,
{
pub fn build(self) -> BlockStrider<S, P, B> {
self.0
}
}

pub struct BlockStrider<S, P, B> {
state: S,
provider: P,
subscriber: B,
}

impl BlockStrider<(), (), ()> {
pub fn builder() -> BlockStriderBuilder<(), (), ()> {
BlockStriderBuilder(BlockStrider {
state: (),
provider: (),
subscriber: (),
})
}
}

impl<S, P, B> BlockStrider<S, P, B>
where
S: BlockStriderState,
P: BlockProvider,
B: BlockSubscriber,
{
/// Walks through blocks and handles them.
///
/// Stops either when the provider is exhausted or it can't provide a requested block.
pub async fn run(self) -> Result<()> {
tracing::info!("block strider loop started");

while let Some(master_block) = self.fetch_next_master_block().await {
// TODO: replace with block stuff
let master_id = get_block_id(&master_block);
let shard_hashes = get_shard_hashes(&master_block);

for hash in shard_hashes {
if !self.state.is_traversed(&hash) {
let block = self.fetch_block(&hash).await?;

if let Err(e) = self.subscriber.handle_block(&block).await {
tracing::error!("error while handling block: {e:?}");
// TODO: retry + backoff?
}

self.state.commit_traversed(hash);
}
}

self.state.commit_traversed(master_id);
}

tracing::info!("block strider loop finished");
Ok(())
}

async fn fetch_next_master_block(&self) -> Option<Block> {
let last_traversed_master_block = self.state.load_last_traversed_master_block_id();
loop {
match self
.provider
.get_next_block(&last_traversed_master_block)
.await?
{
Ok(block) => break Some(block),
Err(e) => {
tracing::error!(
?last_traversed_master_block,
"error while fetching master block: {e:?}",
);
// TODO: backoff
}
}
}
}

async fn fetch_block(&self, block_id: &BlockId) -> Result<Block> {
loop {
match self.provider.get_block(block_id).await {
Some(Ok(block)) => break Ok(block),
Some(Err(e)) => {
tracing::error!("error while fetching block: {e:?}");
// TODO: backoff
}
None => {
anyhow::bail!("block not found: {block_id}")
}
}
}
}
}

pub trait BlockStriderState: Send + Sync + 'static {
fn load_last_traversed_master_block_id(&self) -> BlockId;
fn is_traversed(&self, block_id: &BlockId) -> bool;
fn commit_traversed(&self, block_id: BlockId);
}

impl<T: BlockStriderState> BlockStriderState for Box<T> {
fn load_last_traversed_master_block_id(&self) -> BlockId {
<T as BlockStriderState>::load_last_traversed_master_block_id(self)
}

fn is_traversed(&self, block_id: &BlockId) -> bool {
<T as BlockStriderState>::is_traversed(self, block_id)
}

fn commit_traversed(&self, block_id: BlockId) {
<T as BlockStriderState>::commit_traversed(self, block_id);
}
}

/// Block provider *MUST* validate the block before returning it.
pub trait BlockProvider: Send + Sync + 'static {
type GetNextBlockFut<'a>: Future<Output = Option<Result<Block>>> + Send + 'a;
type GetBlockFut<'a>: Future<Output = Option<Result<Block>>> + Send + 'a;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>;
fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>;
}

impl<T: BlockProvider> BlockProvider for Box<T> {
type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
type GetBlockFut<'a> = T::GetBlockFut<'a>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
<T as BlockProvider>::get_next_block(self, prev_block_id)
}

fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
<T as BlockProvider>::get_block(self, block_id)
}
}

pub trait BlockSubscriber: Send + Sync + 'static {
type HandleBlockFut: Future<Output = Result<()>> + Send + 'static;

fn handle_block(&self, block: &Block) -> Self::HandleBlockFut;
}

impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
type HandleBlockFut = T::HandleBlockFut;

fn handle_block(&self, block: &Block) -> Self::HandleBlockFut {
<T as BlockSubscriber>::handle_block(self, block)
}
}

fn get_shard_hashes(_block: &Block) -> impl IntoIterator<Item = BlockId> {
vec![].into_iter()
}

fn get_block_id(_block: &Block) -> BlockId {
unimplemented!()
}

// === Provider combinators ===
struct ChainBlockProvider<T1, T2> {
left: T1,
right: T2,
is_right: AtomicBool,
}

impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for ChainBlockProvider<T1, T2> {
type GetNextBlockFut<'a> = BoxFuture<'a, Option<Result<Block>>>;
type GetBlockFut<'a> = BoxFuture<'a, Option<Result<Block>>>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
Box::pin(async move {
if !self.is_right.load(Ordering::Acquire) {
let res = self.left.get_next_block(prev_block_id).await;
if res.is_some() {
return res;
}
self.is_right.store(true, Ordering::Release);
}
self.right.get_next_block(prev_block_id).await
})
}

fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
Box::pin(async {
let res = self.left.get_block(block_id).await;
if res.is_some() {
return res;
}
self.right.get_block(block_id).await
})
}
}
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1 +1 @@

pub mod block_strider;
Loading