Skip to content

Commit

Permalink
refactor(core): move block-strider traits to separate files
Browse files Browse the repository at this point in the history
  • Loading branch information
0xdeafbeef committed Feb 27, 2024
1 parent f151205 commit c8ca671
Show file tree
Hide file tree
Showing 7 changed files with 252 additions and 99 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
[workspace]
members = [
"consensus",
"core",
"network",
"simulator",
"storage",
"util",
"collator",
]
resolver = "2"
members = ["consensus", "core", "network", "simulator", "storage", "util", "collator"]

[profile.release]
debug = true
Expand Down Expand Up @@ -73,8 +81,8 @@ rest_pat_in_fully_bound_structs = "warn"
same_functions_in_if_condition = "warn"
semicolon_if_nothing_returned = "warn"
single_match_else = "warn"
string_add_assign = "warn"
string_add = "warn"
string_add_assign = "warn"
string_lit_as_bytes = "warn"
string_to_string = "warn"
todo = "warn"
Expand Down
5 changes: 3 additions & 2 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
description = "Basic functionality of peer."
edition = "2021"
name = "tycho-core"
version = "0.0.1"
edition = "2021"
description = "Basic functionality of peer."

[dependencies]
anyhow = "1.0.80"
Expand All @@ -11,6 +11,7 @@ castaway = "0.2"
everscale-types = "0.1.0-rc.6"
futures-util = "0.3.30"
tracing = "0.1.40"
tokio = { version = "1.36.0", features = ["full"] }

# local deps
tycho-network = { path = "../network", version = "=0.0.1" }
Expand Down
103 changes: 8 additions & 95 deletions core/src/block_strider/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};

use anyhow::Result;
use everscale_types::models::{Block, BlockId};
use futures_util::future::BoxFuture;

pub mod provider;
pub mod state;
pub mod subscriber;

use provider::BlockProvider;
use state::BlockStriderState;
use subscriber::BlockSubscriber;

pub struct BlockStriderBuilder<S, P, B>(BlockStrider<S, P, B>);

Expand Down Expand Up @@ -140,101 +144,10 @@ where
}
}

pub trait BlockStriderState: Send + Sync + 'static {
fn load_last_traversed_master_block_id(&self) -> BlockId;
fn is_traversed(&self, block_id: &BlockId) -> bool;
fn commit_traversed(&self, block_id: BlockId);
}

impl<T: BlockStriderState> BlockStriderState for Box<T> {
fn load_last_traversed_master_block_id(&self) -> BlockId {
<T as BlockStriderState>::load_last_traversed_master_block_id(self)
}

fn is_traversed(&self, block_id: &BlockId) -> bool {
<T as BlockStriderState>::is_traversed(self, block_id)
}

fn commit_traversed(&self, block_id: BlockId) {
<T as BlockStriderState>::commit_traversed(self, block_id);
}
}

/// Block provider *MUST* validate the block before returning it.
pub trait BlockProvider: Send + Sync + 'static {
type GetNextBlockFut<'a>: Future<Output = Option<Result<Block>>> + Send + 'a;
type GetBlockFut<'a>: Future<Output = Option<Result<Block>>> + Send + 'a;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>;
fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>;
}

impl<T: BlockProvider> BlockProvider for Box<T> {
type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
type GetBlockFut<'a> = T::GetBlockFut<'a>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
<T as BlockProvider>::get_next_block(self, prev_block_id)
}

fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
<T as BlockProvider>::get_block(self, block_id)
}
}

pub trait BlockSubscriber: Send + Sync + 'static {
type HandleBlockFut: Future<Output = Result<()>> + Send + 'static;

fn handle_block(&self, block: &Block) -> Self::HandleBlockFut;
}

impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
type HandleBlockFut = T::HandleBlockFut;

fn handle_block(&self, block: &Block) -> Self::HandleBlockFut {
<T as BlockSubscriber>::handle_block(self, block)
}
}

fn get_shard_hashes(_block: &Block) -> impl IntoIterator<Item = BlockId> {
vec![].into_iter()
}

fn get_block_id(_block: &Block) -> BlockId {
unimplemented!()
}

// === Provider combinators ===
struct ChainBlockProvider<T1, T2> {
left: T1,
right: T2,
is_right: AtomicBool,
}

impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for ChainBlockProvider<T1, T2> {
type GetNextBlockFut<'a> = BoxFuture<'a, Option<Result<Block>>>;
type GetBlockFut<'a> = BoxFuture<'a, Option<Result<Block>>>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
Box::pin(async move {
if !self.is_right.load(Ordering::Acquire) {
let res = self.left.get_next_block(prev_block_id).await;
if res.is_some() {
return res;
}
self.is_right.store(true, Ordering::Release);
}
self.right.get_next_block(prev_block_id).await
})
}

fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
Box::pin(async {
let res = self.left.get_block(block_id).await;
if res.is_some() {
return res;
}
self.right.get_block(block_id).await
})
}
}
162 changes: 162 additions & 0 deletions core/src/block_strider/provider.rs

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions core/src/block_strider/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
use everscale_types::models::BlockId;

pub trait BlockStriderState: Send + Sync + 'static {
fn load_last_traversed_master_block_id(&self) -> BlockId;
fn is_traversed(&self, block_id: &BlockId) -> bool;
fn commit_traversed(&self, block_id: BlockId);
}

impl<T: BlockStriderState> BlockStriderState for Box<T> {
fn load_last_traversed_master_block_id(&self) -> BlockId {
<T as BlockStriderState>::load_last_traversed_master_block_id(self)
}

fn is_traversed(&self, block_id: &BlockId) -> bool {
<T as BlockStriderState>::is_traversed(self, block_id)
}

fn commit_traversed(&self, block_id: BlockId) {
<T as BlockStriderState>::commit_traversed(self, block_id);
}
}
36 changes: 36 additions & 0 deletions core/src/block_strider/subscriber.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use everscale_types::models::Block;
use futures_util::future;
use std::future::Future;

pub trait BlockSubscriber: Send + Sync + 'static {
type HandleBlockFut: Future<Output = anyhow::Result<()>> + Send + 'static;

fn handle_block(&self, block: &Block) -> Self::HandleBlockFut;
}

impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
type HandleBlockFut = T::HandleBlockFut;

fn handle_block(&self, block: &Block) -> Self::HandleBlockFut {
<T as BlockSubscriber>::handle_block(self, block)
}
}

pub struct FanoutBlockSubscriber<T1, T2> {
pub left: T1,
pub right: T2,
}

impl<T1: BlockSubscriber, T2: BlockSubscriber> BlockSubscriber for FanoutBlockSubscriber<T1, T2> {
type HandleBlockFut = future::BoxFuture<'static, anyhow::Result<()>>;

fn handle_block(&self, block: &Block) -> Self::HandleBlockFut {
let left = self.left.handle_block(block);
let right = self.right.handle_block(block);

Box::pin(async move {
let (l, r) = future::join(left, right).await;
l.and(r)
})
}
}

0 comments on commit c8ca671

Please sign in to comment.