diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index c5f3108bf..900ff3500 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -22,7 +22,7 @@ use crate::block_strider::state_applier::ShardStateUpdater; use provider::BlockProvider; use state::BlockStriderState; use subscriber::BlockSubscriber; -use tycho_block_util::block::BlockStuff; +use tycho_block_util::block::BlockStuffAug; use tycho_block_util::state::MinRefMcStateTracker; use tycho_storage::Storage; use tycho_util::FastDashMap; @@ -221,7 +221,7 @@ where .boxed() } - async fn fetch_next_master_block(&self) -> Option { + async fn fetch_next_master_block(&self) -> Option { let last_traversed_master_block = self.state.load_last_traversed_master_block_id(); tracing::debug!(?last_traversed_master_block, "Fetching next master block"); loop { @@ -242,7 +242,7 @@ where } } - async fn fetch_block(&self, block_id: &BlockId) -> Result { + async fn fetch_block(&self, block_id: &BlockId) -> Result { loop { match self.provider.get_block(block_id).await { Some(Ok(block)) => break Ok(block), @@ -259,7 +259,7 @@ where } struct BlocksGraph { - block_store_map: FastDashMap, + block_store_map: FastDashMap, connections: FastDashMap, bottom_blocks: Vec, } @@ -273,7 +273,7 @@ impl BlocksGraph { } } - fn store_block(&self, block: BlockStuff) { + fn store_block(&self, block: BlockStuffAug) { self.block_store_map.insert(*block.id(), block); } diff --git a/core/src/block_strider/provider.rs b/core/src/block_strider/provider.rs index 3eb079631..34a65f3f4 100644 --- a/core/src/block_strider/provider.rs +++ b/core/src/block_strider/provider.rs @@ -3,9 +3,9 @@ use futures_util::future::BoxFuture; use std::future::Future; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use tycho_block_util::block::BlockStuff; +use tycho_block_util::block::BlockStuffAug; -pub type OptionalBlockStuff = Option>; +pub type OptionalBlockStuff = Option>; /// Block provider *MUST* validate the block before returning it. pub trait BlockProvider: Send + Sync + 'static { @@ -154,10 +154,13 @@ mod test { .is_none()); } - fn get_empty_block() -> BlockStuff { - let block = ""; - let block = everscale_types::boc::BocRepr::decode_base64(block).unwrap(); - BlockStuff::with_block(get_default_block_id(), block) + fn get_empty_block() -> BlockStuffAug { + let block_data = include_bytes!("../../tests/empty_block.bin"); + let block = everscale_types::boc::BocRepr::decode(block_data).unwrap(); + BlockStuffAug::new( + BlockStuff::with_block(get_default_block_id(), block), + block_data.as_slice(), + ) } fn get_default_block_id() -> BlockId { diff --git a/core/src/block_strider/state_applier.rs b/core/src/block_strider/state_applier.rs index 3fd7aa0c8..e34f356bf 100644 --- a/core/src/block_strider/state_applier.rs +++ b/core/src/block_strider/state_applier.rs @@ -5,7 +5,7 @@ use std::sync::Arc; use anyhow::{Context, Result}; use futures_util::FutureExt; -use tycho_block_util::block::BlockStuff; +use tycho_block_util::block::{BlockStuff, BlockStuffAug}; use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_storage::{BlockHandle, BlockMetaData, Storage}; @@ -43,8 +43,8 @@ where fn handle_block( &self, - block: &BlockStuff, - _state: Option<&ShardStateStuff>, + block: &BlockStuffAug, + _state: Option<&Arc>, ) -> Self::HandleBlockFut { tracing::info!(id = ?block.id(), "applying block"); let block = block.clone(); @@ -53,7 +53,7 @@ where let subscriber = self.state_subscriber.clone(); async move { - let block_h = Self::get_block_handle(&block, &storage)?; + let block_h = Self::get_block_handle(&block, &storage).await?; let (prev_id, _prev_id_2) = block //todo: handle merge .construct_prev_id() @@ -69,7 +69,7 @@ where let new_state = Self::compute_and_store_state_update( &block, &min_ref_mc_state_tracker, - storage, + &storage, &block_h, prev_state, ) @@ -103,6 +103,12 @@ where let elapsed = start.elapsed(); metrics::histogram!("tycho_subscriber_handle_block_seconds").record(elapsed); + block_h.meta().set_is_applied(); + storage + .block_handle_storage() + .store_handle(&block_h) + .context("Failed to store block handle")?; + Ok(()) } .boxed() @@ -113,17 +119,20 @@ impl ShardStateUpdater where S: BlockSubscriber, { - fn get_block_handle(block: &BlockStuff, storage: &Arc) -> Result> { + async fn get_block_handle( + block: &BlockStuffAug, + storage: &Arc, + ) -> Result> { let info = block .block() .info .load() .context("Failed to load block info")?; - let (block_h, _) = storage - .block_handle_storage() - .create_or_load_handle( - block.id(), + let h = storage + .block_storage() + .store_block_data( + block, BlockMetaData { is_key_block: info.key_block, gen_utime: info.gen_utime, @@ -138,18 +147,18 @@ where .context("Failed to process master ref")?, }, ) - .context("Failed to create or load block handle")?; + .await?; - Ok(block_h) + Ok(h.handle) } async fn compute_and_store_state_update( block: &BlockStuff, min_ref_mc_state_tracker: &MinRefMcStateTracker, - storage: Arc, + storage: &Arc, block_h: &Arc, prev_state: Arc, - ) -> Result { + ) -> Result> { let update = block .block() .load_state_update() @@ -169,7 +178,7 @@ where .await .context("Failed to store new state")?; - Ok(new_state) + Ok(Arc::new(new_state)) } } diff --git a/core/src/block_strider/subscriber.rs b/core/src/block_strider/subscriber.rs index f9c5da4bd..d95543a0b 100644 --- a/core/src/block_strider/subscriber.rs +++ b/core/src/block_strider/subscriber.rs @@ -1,7 +1,9 @@ -use futures_util::future; use std::future::Future; +use std::sync::Arc; + +use futures_util::future; -use tycho_block_util::block::BlockStuff; +use tycho_block_util::block::BlockStuffAug; use tycho_block_util::state::ShardStateStuff; pub trait BlockSubscriber: Send + Sync + 'static { @@ -9,8 +11,8 @@ pub trait BlockSubscriber: Send + Sync + 'static { fn handle_block( &self, - block: &BlockStuff, - state: Option<&ShardStateStuff>, + block: &BlockStuffAug, + state: Option<&Arc>, ) -> Self::HandleBlockFut; } @@ -19,8 +21,8 @@ impl BlockSubscriber for Box { fn handle_block( &self, - block: &BlockStuff, - state: Option<&ShardStateStuff>, + block: &BlockStuffAug, + state: Option<&Arc>, ) -> Self::HandleBlockFut { ::handle_block(self, block, state) } @@ -36,8 +38,8 @@ impl BlockSubscriber for FanoutBlockSu fn handle_block( &self, - block: &BlockStuff, - state: Option<&ShardStateStuff>, + block: &BlockStuffAug, + state: Option<&Arc>, ) -> Self::HandleBlockFut { let left = self.left.handle_block(block, state); let right = self.right.handle_block(block, state); @@ -60,8 +62,8 @@ pub mod test { fn handle_block( &self, - block: &BlockStuff, - _state: Option<&ShardStateStuff>, + block: &BlockStuffAug, + _state: Option<&Arc>, ) -> Self::HandleBlockFut { tracing::info!("handling block: {:?}", block.id()); future::ready(Ok(())) diff --git a/core/src/block_strider/test_provider/archive_provider.rs b/core/src/block_strider/test_provider/archive_provider.rs index 3310dcf32..c7fd739e2 100644 --- a/core/src/block_strider/test_provider/archive_provider.rs +++ b/core/src/block_strider/test_provider/archive_provider.rs @@ -10,7 +10,7 @@ use futures_util::FutureExt; use sha2::Digest; use tycho_block_util::archive::{ArchiveEntryId, ArchiveReader}; -use tycho_block_util::block::BlockStuff; +use tycho_block_util::block::{BlockStuff, BlockStuffAug}; use crate::block_strider::provider::{BlockProvider, OptionalBlockStuff}; @@ -33,10 +33,12 @@ impl BlockProvider for ArchiveProvider { } fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { - futures_util::future::ready( - self.get_block_by_id(block_id) - .map(|b| (Ok(BlockStuff::with_block(*block_id, b)))), - ) + futures_util::future::ready(self.get_block_by_id(block_id).map(|b| { + Ok(BlockStuffAug::new( + BlockStuff::with_block(*block_id, b.clone()), + everscale_types::boc::BocRepr::encode(b).unwrap(), + )) + })) .boxed() } } diff --git a/core/src/block_strider/test_provider/mod.rs b/core/src/block_strider/test_provider/mod.rs index 94a4585c6..13db51306 100644 --- a/core/src/block_strider/test_provider/mod.rs +++ b/core/src/block_strider/test_provider/mod.rs @@ -9,7 +9,7 @@ use everscale_types::models::{ }; use everscale_types::prelude::HashBytes; use std::collections::HashMap; -use tycho_block_util::block::BlockStuff; +use tycho_block_util::block::{BlockStuff, BlockStuffAug}; pub mod archive_provider; @@ -26,18 +26,22 @@ impl BlockProvider for TestBlockProvider { .iter() .find(|id| id.seqno == prev_block_id.seqno + 1); futures_util::future::ready(next_id.and_then(|id| { - self.blocks - .get(id) - .map(|b| Ok(BlockStuff::with_block(*id, b.clone()))) + self.blocks.get(id).map(|b| { + Ok(BlockStuffAug::new( + BlockStuff::with_block(*id, b.clone()), + everscale_types::boc::BocRepr::encode(b).unwrap(), + )) + }) })) } fn get_block(&self, id: &BlockId) -> Self::GetBlockFut<'_> { - futures_util::future::ready( - self.blocks - .get(id) - .map(|b| Ok(BlockStuff::with_block(*id, b.clone()))), - ) + futures_util::future::ready(self.blocks.get(id).map(|b| { + Ok(BlockStuffAug::new( + BlockStuff::with_block(*id, b.clone()), + everscale_types::boc::BocRepr::encode(b).unwrap(), + )) + })) } } diff --git a/core/tests/00001 b/core/tests/00001 old mode 100755 new mode 100644 diff --git a/core/tests/empty_block.bin b/core/tests/empty_block.bin new file mode 100644 index 000000000..b18ea1e7c Binary files /dev/null and b/core/tests/empty_block.bin differ