Skip to content

Commit

Permalink
feat(block-strider): state apply implimentation
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon authored and 0xdeafbeef committed Apr 17, 2024
1 parent 34aa301 commit 3c785d9
Show file tree
Hide file tree
Showing 19 changed files with 557 additions and 68 deletions.
2 changes: 2 additions & 0 deletions .clippy.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
allow-print-in-tests = true
allow-dbg-in-tests = true
10 changes: 6 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [

[workspace.dependencies]
# crates.io deps
aarc = "0.2"
ahash = "0.8"
anyhow = "1.0.79"
arc-swap = "1.6.0"
Expand Down Expand Up @@ -99,7 +100,7 @@ debug = true
# NOTE: use crates.io dependency when it is released
# https://github.com/sagebind/castaway/issues/18
castaway = { git = "https://github.com/sagebind/castaway.git" }
everscale-types = { git = "https://github.com/broxus/everscale-types.git", branch = "0xdeafbeef/push-yntmntzvxrlu" }
everscale-types = { git = "https://github.com/broxus/everscale-types.git", branch = "0xdeafbeef/push-xrvxlsnspsok" }

[workspace.lints.rust]
future_incompatible = "warn"
Expand Down Expand Up @@ -158,6 +159,8 @@ needless_for_each = "warn"
option_option = "warn"
path_buf_push_overwrite = "warn"
ptr_as_ptr = "warn"
print_stdout = "warn"
print_stderr = "warn"
rc_mutex = "warn"
ref_option_ref = "warn"
rest_pat_in_fully_bound_structs = "warn"
Expand Down
2 changes: 1 addition & 1 deletion block-util/src/block/top_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ impl TopBlocks {
self.contains_shard_seqno(&block_id.shard, block_id.seqno)
}

/// Checks whether the given pair of [`ton_block::ShardIdent`] and seqno
/// Checks whether the given pair of [`ShardIdent`] and seqno
/// is equal to or greater than the last block for the given shard.
///
/// NOTE: Specified shard could be split or merged
Expand Down
9 changes: 7 additions & 2 deletions block-util/src/state/shard_state_stuff.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,18 @@ impl ShardStateStuff {
let file_hash = sha2::Sha256::digest(bytes);
anyhow::ensure!(
id.file_hash.as_slice() == file_hash.as_slice(),
"file_hash mismatch for {id}"
"file_hash mismatch. Expected: {}, got: {}",
hex::encode(file_hash),
id.file_hash,
);

let root = Boc::decode(bytes)?;
anyhow::ensure!(
&id.root_hash == root.repr_hash(),
"root_hash mismatch for {id}"
"root_hash mismatch for {id}. Expected: {expected}, got: {got}",
id = id,
expected = id.root_hash,
got = root.repr_hash(),
);

Self::new(
Expand Down
5 changes: 4 additions & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,15 @@ license.workspace = true

[dependencies]
anyhow = { workspace = true }
async-trait = { workspace = true }
castaway = { workspace = true }
everscale-types = { workspace = true }
futures-util = { workspace = true }
itertools = { workspace = true }
parking_lot = { workspace = true }
tokio = { workspace = true, features = ["rt"] }
tracing = { workspace = true }
thiserror = { workspace = true }
sha2 = { workspace = true }

# local deps
tycho-block-util = { workspace = true }
Expand All @@ -27,6 +28,8 @@ tycho-util = { workspace = true }

[dev-dependencies]
tycho-util = { workspace = true, features = ["test"] }
tempfile = { workspace = true }
tracing-test = { workspace = true }

[lints]
workspace = true
103 changes: 60 additions & 43 deletions core/src/block_strider/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,24 @@ use futures_util::future::BoxFuture;
use futures_util::stream::FuturesOrdered;
use futures_util::{FutureExt, TryStreamExt};
use itertools::Itertools;
use std::sync::Arc;

pub mod provider;
pub mod state;
pub mod subscriber;

mod state_applier;

#[cfg(test)]
mod test_provider;
pub mod test_provider;

use crate::block_strider::state_applier::ShardStateUpdater;
use provider::BlockProvider;
use state::BlockStriderState;
use subscriber::BlockSubscriber;
use tycho_block_util::block::BlockStuff;
use tycho_block_util::state::MinRefMcStateTracker;
use tycho_storage::Storage;
use tycho_util::FastDashMap;

pub struct BlockStriderBuilder<S, P, B>(BlockStrider<S, P, B>);
Expand Down Expand Up @@ -59,9 +65,25 @@ where
P: BlockProvider,
B: BlockSubscriber,
{
pub fn build(self) -> BlockStrider<S, P, B> {
pub(crate) fn build(self) -> BlockStrider<S, P, B> {
self.0
}

pub(crate) fn build_with_state_applier(
self,
min_ref_mc_state_tracker: MinRefMcStateTracker,
storage: Arc<Storage>,
) -> BlockStrider<S, P, ShardStateUpdater<B>> {
BlockStrider {
state: self.0.state,
provider: self.0.provider,
subscriber: ShardStateUpdater::new(
min_ref_mc_state_tracker,
storage,
self.0.subscriber,
),
}
}
}

pub struct BlockStrider<S, P, B> {
Expand Down Expand Up @@ -133,61 +155,57 @@ where
) -> BoxFuture<'a, Result<Vec<BlockId>>> {
async move {
let mut prev_shard_block_id = shard_block_id;
let mut traversed_blocks = Vec::new();

while !self.state.is_traversed(&shard_block_id) {
if shard_block_id.seqno == 0 {
break;
}

tracing::debug!(id=?shard_block_id, "Finding prev shard blocks");
while shard_block_id.seqno > 0 && !self.state.is_traversed(&shard_block_id) {
prev_shard_block_id = shard_block_id;

let block = self
.fetch_block(&shard_block_id)
.await
.expect("provider failed to fetch shard block");
tracing::debug!(id=?block.id(), "Fetched shard block");
let info = block.block().load_info()?;
shard_block_id = match info.load_prev_ref()? {

match info.load_prev_ref()? {
PrevBlockRef::Single(id) => {
let id = BlockId {
shard: info.shard,
seqno: id.seqno,
root_hash: id.root_hash,
file_hash: id.file_hash,
let shard = if info.after_split {
info.shard
.merge()
.expect("Merge should succeed after split")
} else {
info.shard
};
blocks.add_connection(id, shard_block_id);
id
shard_block_id = id.as_block_id(shard);
blocks.add_connection(shard_block_id, prev_shard_block_id);
}
PrevBlockRef::AfterMerge { left, right } => {
let (left_shard, right_shard) =
info.shard.split().expect("split on unsplitable shard");
let left = BlockId {
shard: left_shard,
seqno: left.seqno,
root_hash: left.root_hash,
file_hash: left.file_hash,
};
let right = BlockId {
shard: right_shard,
seqno: right.seqno,
root_hash: right.root_hash,
file_hash: right.file_hash,
};
blocks.add_connection(left, shard_block_id);
blocks.add_connection(right, shard_block_id);

return futures_util::try_join!(
self.find_prev_shard_blocks(left, blocks),
self.find_prev_shard_blocks(right, blocks)
)
.map(|(mut left, right)| {
left.extend(right);
left
});
let left_block_id = left.as_block_id(left_shard);
let right_block_id = right.as_block_id(right_shard);
blocks.add_connection(left_block_id, prev_shard_block_id);
blocks.add_connection(right_block_id, prev_shard_block_id);

let left_blocks =
self.find_prev_shard_blocks(left_block_id, blocks).await?;
let right_blocks =
self.find_prev_shard_blocks(right_block_id, blocks).await?;
traversed_blocks.extend(left_blocks);
traversed_blocks.extend(right_blocks);
break;
}
};
}

blocks.store_block(block);
}
Ok(vec![prev_shard_block_id])

if prev_shard_block_id.seqno > 0 {
traversed_blocks.push(prev_shard_block_id);
}

Ok(traversed_blocks)
}
.boxed()
}
Expand Down Expand Up @@ -273,7 +291,7 @@ impl BlocksGraph {
.get(block_id)
.expect("should be in map");
subscriber
.handle_block(&block)
.handle_block(&block, None)
.await
.expect("subscriber failed");
state.commit_traversed(*block_id);
Expand All @@ -296,9 +314,8 @@ mod test {
use crate::block_strider::BlockStrider;

#[tokio::test]
#[tracing_test::traced_test]
async fn test_block_strider() {
tycho_util::test::init_logger("test_block_strider");

let provider = TestBlockProvider::new(3);
provider.validate();

Expand Down
28 changes: 28 additions & 0 deletions core/src/block_strider/state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use std::sync::Arc;

use everscale_types::models::BlockId;

use tycho_storage::Storage;

pub trait BlockStriderState: Send + Sync + 'static {
fn load_last_traversed_master_block_id(&self) -> BlockId;
fn is_traversed(&self, block_id: &BlockId) -> bool;
Expand All @@ -20,6 +24,30 @@ impl<T: BlockStriderState> BlockStriderState for Box<T> {
}
}

impl BlockStriderState for Arc<Storage> {
fn load_last_traversed_master_block_id(&self) -> BlockId {
self.node_state()
.load_last_mc_block_id()
.expect("Db is not initialized")
}

fn is_traversed(&self, block_id: &BlockId) -> bool {
self.block_handle_storage()
.load_handle(block_id)
.expect("db is dead")
.is_some()
}

fn commit_traversed(&self, block_id: BlockId) {
if block_id.is_masterchain() {
self.node_state()
.store_last_mc_block_id(&block_id)
.expect("db is dead");
}
// other blocks are stored with state applier: todo rework this?
}
}

#[cfg(test)]
pub struct InMemoryBlockStriderState {
last_traversed_master_block_id: parking_lot::Mutex<BlockId>,
Expand Down
Loading

0 comments on commit 3c785d9

Please sign in to comment.