Skip to content

Commit

Permalink
refactor(core): change block strider interface
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Feb 26, 2024
1 parent 09678de commit 25f34d9
Show file tree
Hide file tree
Showing 5 changed files with 201 additions and 96 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ edition = "2021"
description = "Basic functionality of peer."

[dependencies]
async-trait = "0.1.77"
anyhow = "1.0.80"
async-trait = "0.1.77"
castaway = "0.2"
everscale-types = "0.1.0-rc.6"
tracing = "0.1.40"
futures-util = "0.3.30"

tracing = "0.1.40"

# local deps
tycho-network = { path = "../network", version = "=0.0.1" }
Expand Down
274 changes: 189 additions & 85 deletions core/src/block_strider/mod.rs
Original file line number Diff line number Diff line change
@@ -1,136 +1,240 @@
use async_trait::async_trait;

use everscale_types::models::{Block, BlockId};
use futures_util::stream::FuturesUnordered;
use futures_util::StreamExt;
use std::future::Future;
use std::sync::atomic::{AtomicBool, Ordering};

#[async_trait]
pub trait OnStriderStep {
async fn handle_block(&self, block: &Block) -> anyhow::Result<()>;
}

/// Block provider *MUST* validate the block before returning it.
pub trait BlockProvider {
type GetNextBlockFut: Future<Output = ProviderResult>;
type GetBlockFut: Future<Output = ProviderResult>;
use anyhow::Result;
use everscale_types::models::{Block, BlockId};
use futures_util::future::BoxFuture;

fn get_next_block(&self, prev_block_id: &BlockId) -> Self::GetNextBlockFut;
fn get_block(&self, block_id: &BlockId) -> Self::GetBlockFut;
pub struct BlockStriderBuilder<S, P, B>(BlockStrider<S, P, B>);

fn status(&self) -> ProviderStatus;
impl<T2, T3> BlockStriderBuilder<(), T2, T3> {
pub fn with_state<S: BlockStriderState>(self, state: S) -> BlockStriderBuilder<S, T2, T3> {
BlockStriderBuilder(BlockStrider {
state,
provider: self.0.provider,
subscriber: self.0.subscriber,
})
}
}

pub trait PersistenceProvider {
fn load_last_traversed_master_block_seqno(&self) -> BlockId;
fn commit_last_traversed_master_block_seqno(&self, block_id: BlockId);

fn shard_block_traversed(&self, block_id: &BlockId) -> bool;
fn commit_shard_block_traversed(&self, block_id: BlockId);
impl<T1, T3> BlockStriderBuilder<T1, (), T3> {
pub fn with_provider<P: BlockProvider>(self, provider: P) -> BlockStriderBuilder<T1, P, T3> {
BlockStriderBuilder(BlockStrider {
state: self.0.state,
provider,
subscriber: self.0.subscriber,
})
}
}

// drop all shard blocks in the same shard with seqno < block_id
fn gc_shard_blocks(&self, block_id: &BlockId);
impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
pub fn with_subscriber<B: BlockStriderState>(
self,
subscriber: B,
) -> BlockStriderBuilder<T1, T2, B> {
BlockStriderBuilder(BlockStrider {
state: self.0.state,
provider: self.0.provider,
subscriber,
})
}
}

pub enum ProviderResult {
Error(anyhow::Error),
NotFound, // or should provider backoff until the block is found?
Found(Block),
impl<S, P, B> BlockStriderBuilder<S, P, B>
where
S: BlockStriderState,
P: BlockProvider,
B: BlockSubscriber,
{
pub fn build(self) -> BlockStrider<S, P, B> {
self.0
}
}

#[derive(Debug, PartialEq)]
pub enum ProviderStatus {
Ready,
NotReady,
pub struct BlockStrider<S, P, B> {
state: S,
provider: P,
subscriber: B,
}

pub struct StriderBuilder<Provider> {
subscribers: Vec<Box<dyn OnStriderStep>>,
persistence_provider: Provider, // or it also should be a vec?
impl BlockStrider<(), (), ()> {
pub fn builder() -> BlockStriderBuilder<(), (), ()> {
BlockStriderBuilder(BlockStrider {
state: (),
provider: (),
subscriber: (),
})
}
}

impl<Provider> StriderBuilder<Provider>
impl<S, P, B> BlockStrider<S, P, B>
where
Provider: PersistenceProvider,
S: BlockStriderState,
P: BlockProvider,
B: BlockSubscriber,
{
pub fn new(persistence_provider: Provider) -> Self {
Self {
subscribers: Vec::new(),
persistence_provider,
}
}

pub fn add_subscriber(&mut self, subscriber: Box<dyn OnStriderStep>) {
self.subscribers.push(subscriber);
}

// this function gurarantees at least once delivery
pub async fn start(self, block_provider: impl BlockProvider) {
loop {
let master_block = self.fetch_next_master_block(&block_provider).await;
/// Walks through blocks and handles them.
///
/// Stops either when the provider is exhausted or it can't provide a requested block.
pub async fn run(self) -> Result<()> {
tracing::info!("block strider loop started");

while let Some(master_block) = self.fetch_next_master_block().await {
// TODO: replace with block stuff
let master_id = get_block_id(&master_block);
let shard_hashes = get_shard_hashes(&master_block);

for hash in shard_hashes {
if !self.persistence_provider.shard_block_traversed(&hash) {
let block = self.fetch_block(&hash, &block_provider).await;
let mut subscribers: FuturesUnordered<_> = self
.subscribers
.iter()
.map(|subscriber| subscriber.handle_block(&block))
.collect();
// wait for all subscribers to finish
while subscribers.next().await.is_some() {}
self.persistence_provider.commit_shard_block_traversed(hash);
if !self.state.is_traversed(&hash) {
let block = self.fetch_block(&hash).await?;

if let Err(e) = self.subscriber.handle_block(&block).await {
tracing::error!("error while handling block: {e:?}");
// TODO: retry + backoff?
}

self.state.commit_traversed(hash);
}
}
self.persistence_provider
.commit_last_traversed_master_block_seqno(master_id);

self.state.commit_traversed(master_id);
}

tracing::info!("block strider loop finished");
Ok(())
}

async fn fetch_next_master_block(&self, block_provider: &impl BlockProvider) -> Block {
let last_traversed_master_block = self
.persistence_provider
.load_last_traversed_master_block_seqno();
async fn fetch_next_master_block(&self) -> Option<Block> {
let last_traversed_master_block = self.state.load_last_traversed_master_block_id();
loop {
match block_provider
match self
.provider
.get_next_block(&last_traversed_master_block)
.await
.await?
{
ProviderResult::Error(e) => {
Ok(block) => break Some(block),
Err(e) => {
tracing::error!(
?last_traversed_master_block,
"error while fetching master block: {:?}",
e
"error while fetching master block: {e:?}",
);
// TODO: backoff
}
ProviderResult::NotFound => {
tracing::info!(?last_traversed_master_block, "master block not found");
}
ProviderResult::Found(block) => break block,
}
}
}

async fn fetch_block(&self, id: &BlockId, block_provider: &impl BlockProvider) -> Block {
async fn fetch_block(&self, block_id: &BlockId) -> Result<Block> {
loop {
match block_provider.get_block(id).await {
ProviderResult::Error(e) => {
tracing::error!("error while fetching block: {:?}", e);
match self.provider.get_block(block_id).await {
Some(Ok(block)) => break Ok(block),
Some(Err(e)) => {
tracing::error!("error while fetching block: {e:?}");
// TODO: backoff
}
ProviderResult::NotFound => {
tracing::info!(?id, "no block found");
None => {
anyhow::bail!("block not found: {block_id}")
}
ProviderResult::Found(block) => break block,
}
}
}
}

pub trait BlockStriderState: Send + Sync + 'static {
fn load_last_traversed_master_block_id(&self) -> BlockId;
fn is_traversed(&self, block_id: &BlockId) -> bool;
fn commit_traversed(&self, block_id: BlockId);
}

impl<T: BlockStriderState> BlockStriderState for Box<T> {
fn load_last_traversed_master_block_id(&self) -> BlockId {
<T as BlockStriderState>::load_last_traversed_master_block_id(self)
}

fn is_traversed(&self, block_id: &BlockId) -> bool {
<T as BlockStriderState>::is_traversed(self, block_id)
}

fn commit_traversed(&self, block_id: BlockId) {
<T as BlockStriderState>::commit_traversed(self, block_id);
}
}

/// Block provider *MUST* validate the block before returning it.
pub trait BlockProvider: Send + Sync + 'static {
type GetNextBlockFut<'a>: Future<Output = Option<Result<Block>>> + Send + 'a;
type GetBlockFut<'a>: Future<Output = Option<Result<Block>>> + Send + 'a;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>;
fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a>;
}

impl<T: BlockProvider> BlockProvider for Box<T> {
type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
type GetBlockFut<'a> = T::GetBlockFut<'a>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
<T as BlockProvider>::get_next_block(self, prev_block_id)
}

fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
<T as BlockProvider>::get_block(self, block_id)
}
}

pub trait BlockSubscriber: Send + Sync + 'static {
type HandleBlockFut: Future<Output = Result<()>> + Send + 'static;

fn handle_block(&self, block: &Block) -> Self::HandleBlockFut;
}

impl<T: BlockSubscriber> BlockSubscriber for Box<T> {
type HandleBlockFut = T::HandleBlockFut;

fn handle_block(&self, block: &Block) -> Self::HandleBlockFut {
<T as BlockSubscriber>::handle_block(self, block)
}
}

fn get_shard_hashes(_block: &Block) -> impl IntoIterator<Item = BlockId> {
vec![].into_iter()
}

fn get_block_id(_block: &Block) -> BlockId {
unimplemented!()
}

// === Provider combinators ===
struct ChainBlockProvider<T1, T2> {
left: T1,
right: T2,
is_right: AtomicBool,
}

impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for ChainBlockProvider<T1, T2> {
type GetNextBlockFut<'a> = BoxFuture<'a, Option<Result<Block>>>;
type GetBlockFut<'a> = BoxFuture<'a, Option<Result<Block>>>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
Box::pin(async move {
if !self.is_right.load(Ordering::Acquire) {
let res = self.left.get_next_block(prev_block_id).await;
if res.is_some() {
return res;
}
self.is_right.store(true, Ordering::Release);
}
self.right.get_next_block(prev_block_id).await
})
}

fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
Box::pin(async {
let res = self.left.get_block(block_id).await;
if res.is_some() {
return res;
}
self.right.get_block(block_id).await
})
}
}
6 changes: 3 additions & 3 deletions network/src/util/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,9 @@ where
Q: Send + 'static,
{
type QueryResponse = Q;
type OnQueryFuture = BoxFutureOrNoop<Option<Self::QueryResponse>>;
type OnMessageFuture = BoxFutureOrNoop<()>;
type OnDatagramFuture = BoxFutureOrNoop<()>;
type OnQueryFuture = BoxFutureOrNoop<'static, Option<Self::QueryResponse>>;
type OnMessageFuture = BoxFutureOrNoop<'static, ()>;
type OnDatagramFuture = BoxFutureOrNoop<'static, ()>;

fn on_query(&self, req: Request) -> Self::OnQueryFuture {
match find_handler(&req, &self.inner.query_handlers, &self.inner.services) {
Expand Down
10 changes: 5 additions & 5 deletions util/src/futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ use std::task::{Context, Poll};
use futures_util::future::BoxFuture;
use futures_util::{Future, FutureExt};

pub enum BoxFutureOrNoop<T> {
Boxed(BoxFuture<'static, T>),
pub enum BoxFutureOrNoop<'a, T> {
Boxed(BoxFuture<'a, T>),
Noop,
}

impl<T: 'static> BoxFutureOrNoop<T> {
impl<T: 'static> BoxFutureOrNoop<'static, T> {
#[inline]
pub fn future<F>(f: F) -> Self
where
Expand All @@ -22,7 +22,7 @@ impl<T: 'static> BoxFutureOrNoop<T> {
}
}

impl Future for BoxFutureOrNoop<()> {
impl<'a> Future for BoxFutureOrNoop<'a, ()> {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand All @@ -33,7 +33,7 @@ impl Future for BoxFutureOrNoop<()> {
}
}

impl<T> Future for BoxFutureOrNoop<Option<T>> {
impl<'a, T> Future for BoxFutureOrNoop<'a, Option<T>> {
type Output = Option<T>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Expand Down

0 comments on commit 25f34d9

Please sign in to comment.