diff --git a/cli/src/node/mod.rs b/cli/src/node/mod.rs index 17be65627..45a330281 100644 --- a/cli/src/node/mod.rs +++ b/cli/src/node/mod.rs @@ -629,6 +629,7 @@ impl Node { }, }, CollatorStdImplFactory, + self.state_tracker.clone(), #[cfg(test)] vec![], ); diff --git a/collator/src/collator/do_collate.rs b/collator/src/collator/do_collate.rs index 7b90ac0d4..e816787fa 100644 --- a/collator/src/collator/do_collate.rs +++ b/collator/src/collator/do_collate.rs @@ -547,10 +547,20 @@ impl CollatorStdImpl { }) .await?; - // update PrevData in working state - self.update_working_state(new_state_stuff)?; + // update PrevData [and McData] in working state + let new_mc_state_stuff_opt = if self.shard_id.is_masterchain() { + Some(new_state_stuff.clone()) + } else { + None + }; + self.next_block_id_short.seqno += 1; + self.update_working_state(new_mc_state_stuff_opt, vec![new_state_stuff], false)?; self.update_working_state_pending_internals(Some(has_pending_internals))?; + tracing::debug!(target: tracing_targets::COLLATOR, + "working state updated from just collated block", + ); + handle_block_candidate_elapsed = histogram.finish(); } diff --git a/collator/src/collator/mod.rs b/collator/src/collator/mod.rs index 04c41654b..b234933f7 100644 --- a/collator/src/collator/mod.rs +++ b/collator/src/collator/mod.rs @@ -301,7 +301,7 @@ impl CollatorStdImpl { let (mc_state, prev_states) = Self::load_init_states( self.state_node_adapter.clone(), self.shard_id, - prev_blocks_ids, + &prev_blocks_ids, mc_state, ) .await?; @@ -333,33 +333,29 @@ impl CollatorStdImpl { Ok(()) } - /// Update working state from new block and state after block collation - fn update_working_state(&mut self, new_state_stuff: ShardStateStuff) -> Result<()> { - self.next_block_id_short = BlockIdShort { - shard: new_state_stuff.block_id().shard, - seqno: new_state_stuff.block_id().seqno + 1, - }; - + /// Update working state + fn update_working_state( + &mut self, + mc_state: Option, + prev_states: Vec, + prev_states_loaded_from_storage: bool, + ) -> Result<()> { let working_state_mut = self .working_state .as_mut() .expect("should `init` collator before calling `update_working_state`"); - if new_state_stuff.block_id().shard.is_masterchain() { - let new_mc_data = McData::build(new_state_stuff.clone())?; + if let Some(new_mc_state) = mc_state { + let new_mc_data = McData::build(new_mc_state)?; working_state_mut.mc_data = new_mc_data; } - let prev_states = vec![new_state_stuff]; Self::check_prev_states_and_master(&working_state_mut.mc_data, &prev_states)?; let (new_prev_shard_data, usage_tree) = PrevData::build(prev_states, &self.state_tracker)?; working_state_mut.prev_shard_data = new_prev_shard_data; + working_state_mut.prev_states_loaded_from_storage = prev_states_loaded_from_storage; working_state_mut.usage_tree = usage_tree; - tracing::debug!(target: tracing_targets::COLLATOR, - "working state updated from just collated block", - ); - Ok(()) } @@ -377,39 +373,56 @@ impl CollatorStdImpl { Ok(()) } - /// Update McData in working state - #[tracing::instrument(skip_all, fields(block_id = %self.next_block_id_short))] - fn update_mc_data(&mut self, mc_state: ShardStateStuff) -> Result<()> { + /// Update McData and reload prev state from storage to reduce prev state runtime size + #[tracing::instrument(skip_all, fields( + block_id = %self.next_block_id_short, + mc_block_id = %new_mc_state.block_id().as_short_id(), + ))] + async fn update_mc_data(&mut self, new_mc_state: ShardStateStuff) -> Result<()> { let labels = [("workchain", self.shard_id.workchain().to_string())]; - let _histogram = HistogramGuardWithLabels::begin("tycho_collator_update_mc_data_time", &labels); - let mc_state_block_id_short = mc_state.block_id().as_short_id(); + let prev_states_loaded_from_storage = self.working_state().prev_states_loaded_from_storage; - let new_mc_data = McData::build(mc_state)?; + if prev_states_loaded_from_storage { + let working_state_mut = self + .working_state + .as_mut() + .expect("should `init` collator before calling `update_mc_data`"); - let working_state_mut = self - .working_state - .as_mut() - .expect("should `init` collator before calling `update_mc_data`"); + working_state_mut.mc_data = McData::build(new_mc_state)?; + + tracing::debug!(target: tracing_targets::COLLATOR, + "only McData updated in working state from new master state", + ); + } else { + let prev_blocks_ids = self.working_state().prev_shard_data.blocks_ids(); + let (mc_state, prev_states) = Self::load_init_states( + self.state_node_adapter.clone(), + self.shard_id, + prev_blocks_ids, + new_mc_state, + ) + .await?; - working_state_mut.mc_data = new_mc_data; + self.update_working_state(Some(mc_state), prev_states, true)?; - tracing::debug!(target: tracing_targets::COLLATOR, - "McData updated in working state from new master state (block_id={})", - mc_state_block_id_short, - ); + tracing::debug!(target: tracing_targets::COLLATOR, + "McData updated in working state from new master state and \ + shard prev state reloaded from storage", + ); + } Ok(()) } - /// Load required initial states: - /// master state + list of previous shard states + /// Load required previous shard states (for shardchain) + /// or just return master state (for masterchain) async fn load_init_states( state_node_adapter: Arc, shard_id: ShardIdent, - prev_blocks_ids: Vec, + prev_blocks_ids: &[BlockId], mc_state: ShardStateStuff, ) -> Result<(ShardStateStuff, Vec)> { // if current shard is a masterchain then can take current master state @@ -421,10 +434,10 @@ impl CollatorStdImpl { let mut prev_states = vec![]; for prev_block_id in prev_blocks_ids { // request state for prev block and wait for response - let state = state_node_adapter.load_state(&prev_block_id).await?; + let state = state_node_adapter.load_state(prev_block_id).await?; tracing::info!( target: tracing_targets::COLLATOR, - "To init working state loaded prev shard state for prev_block_id {}", + "Prev shard state loaded for prev_block_id {}", prev_block_id.as_short_id(), ); prev_states.push(state); @@ -444,8 +457,6 @@ impl CollatorStdImpl { prev_states: Vec, state_tracker: &MinRefMcStateTracker, ) -> Result { - // TODO: make real implementation - let mc_data = McData::build(mc_state)?; Self::check_prev_states_and_master(&mc_data, &prev_states)?; let (prev_shard_data, usage_tree) = PrevData::build(prev_states, state_tracker)?; @@ -453,6 +464,7 @@ impl CollatorStdImpl { let working_state = WorkingState { mc_data, prev_shard_data, + prev_states_loaded_from_storage: true, usage_tree, has_pending_internals: None, }; @@ -606,7 +618,7 @@ impl CollatorStdImpl { } async fn update_mc_data_and_try_collate(&mut self, mc_state: ShardStateStuff) -> Result<()> { - self.update_mc_data(mc_state)?; + self.update_mc_data(mc_state).await?; self.update_working_state_pending_internals(None)?; self.try_collate_next_shard_block_impl().await } diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index 8777931ed..5367de2f3 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -94,6 +94,7 @@ use crate::types::ProofFunds; pub(super) struct WorkingState { pub mc_data: McData, pub prev_shard_data: PrevData, + pub prev_states_loaded_from_storage: bool, pub usage_tree: UsageTree, pub has_pending_internals: Option, } diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 14a4c84ea..911a3be3b 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -7,10 +7,12 @@ use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use everscale_crypto::ed25519::KeyPair; use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; +use humantime::format_duration; use tycho_block_util::block::ValidatorSubsetInfo; use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_util::metrics::HistogramGuard; use tycho_util::FastHashMap; +use types::BlockCandidateEntry; use self::types::{ BlockCacheKey, BlockCandidateContainer, BlockCandidateToSend, BlocksCache, @@ -219,6 +221,7 @@ where mpool_adapter_factory: MPF, validator_factory: VF, collator_factory: CF, + state_tracker: MinRefMcStateTracker, #[cfg(any(test, feature = "test"))] test_validators_keypairs: Vec>, ) -> RunningCollationManager where @@ -258,7 +261,7 @@ where mq_adapter: mq_adapter.clone(), collator_factory, validator, - state_tracker: MinRefMcStateTracker::default(), + state_tracker, active_collation_sessions: FastHashMap::default(), collation_sessions_to_finish: FastHashMap::default(), active_collators: FastHashMap::default(), @@ -878,8 +881,14 @@ where "Start processing block candidate", ); - let _histogram = - HistogramGuard::begin("tycho_collator_process_collated_block_candidate_time"); + let candidate_chain_time = collation_result.candidate.chain_time; + let candidate_id = collation_result.candidate.block_id; + + let labels = &[("workchain", candidate_id.shard.workchain().to_string())]; + let _histogram = HistogramGuard::begin_with_labels( + "tycho_collator_process_collated_block_candidate_time", + labels, + ); // find session related to this block by shard let session_info = self @@ -890,8 +899,39 @@ where ))? .clone(); - let candidate_chain_time = collation_result.candidate.chain_time; - let candidate_id = collation_result.candidate.block_id; + // pre-accept shard block to store new shard state + if !candidate_id.shard.is_masterchain() { + let histogram = HistogramGuard::begin_with_labels( + "tycho_collator_process_collated_block_candidate_pre_accept_total_time", + labels, + ); + + let histogram_build_block_stuff = HistogramGuard::begin_with_labels( + "tycho_collator_process_collated_block_candidate_pre_accept_build_block_stuff_time", + labels, + ); + let block_for_pre_accept = build_block_stuff_for_sync(&BlockCandidateEntry { + key: candidate_id.as_short_id(), + candidate: collation_result.candidate.clone(), + signatures: Default::default(), + })?; + let build_block_stuff_elapsed = histogram_build_block_stuff.finish(); + + let histogram_build_block_stuff = HistogramGuard::begin_with_labels( + "tycho_collator_process_collated_block_candidate_pre_accept_time", + labels, + ); + self.state_node_adapter + .pre_accept_block(block_for_pre_accept, &collation_result.new_state_stuff) + .await?; + + tracing::debug!(target: tracing_targets::COLLATION_MANAGER, + total_time = %format_duration(histogram.finish()), + build_block_stuff_time = %format_duration(build_block_stuff_elapsed), + pre_accept_time = %format_duration(histogram_build_block_stuff.finish()), + "Block pre-accepted", + ); + } tracing::debug!(target: tracing_targets::COLLATION_MANAGER, "Saving block candidate to cache...", diff --git a/collator/src/state_node.rs b/collator/src/state_node.rs index 4b529c934..7bc733ae2 100644 --- a/collator/src/state_node.rs +++ b/collator/src/state_node.rs @@ -55,6 +55,14 @@ pub trait StateNodeAdapter: Send + Sync + 'static { async fn load_block(&self, block_id: &BlockId) -> Result>; /// Return block handle by it's id from node local state async fn load_block_handle(&self, block_id: &BlockId) -> Result>; + + /// Just store updated state + async fn pre_accept_block( + &self, + block: BlockStuffForSync, + new_state: &ShardStateStuff, + ) -> Result<()>; + /// Accept block: /// 1. (TODO) Broadcast block to blockchain network /// 2. Provide block to the block strider @@ -125,6 +133,40 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl { Ok(self.storage.block_handle_storage().load_handle(block_id)) } + async fn pre_accept_block( + &self, + block: BlockStuffForSync, + new_state: &ShardStateStuff, + ) -> Result<()> { + tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Pre-accepting block {}", block.block_id.as_short_id()); + + // Store block and load handle + let block_storage = self.storage.block_storage(); + let block_stuff = &block.block_stuff_aug.data; + let archive_data = &block.block_stuff_aug.archive_data; + let info = block_stuff.load_info()?; + let mc_ref_seqno = info.min_ref_mc_seqno + 1; + let store_res = block_storage + .store_block_data(block_stuff, archive_data, BlockMetaData { + is_key_block: info.key_block, + gen_utime: info.gen_utime, + mc_ref_seqno, + }) + .await?; + let handle = store_res.handle; + + // Save state + let state_storage = self.storage.shard_state_storage(); + state_storage + .store_state(&handle, new_state) + .await + .context("Failed to store new state")?; + + tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Block pre-accepted {}", block.block_id.as_short_id()); + + Ok(()) + } + async fn accept_block(&self, block: BlockStuffForSync) -> Result<()> { tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Block accepted: {}", block.block_id.as_short_id()); diff --git a/collator/tests/collation_tests.rs b/collator/tests/collation_tests.rs index cfe104ff3..41c1e2a90 100644 --- a/collator/tests/collation_tests.rs +++ b/collator/tests/collation_tests.rs @@ -136,6 +136,8 @@ async fn test_collation_process_on_stubs() { let queue = queue_factory.create(); let message_queue_adapter = MessageQueueAdapterStdImpl::new(queue); + let state_tracker = MinRefMcStateTracker::default(); + let manager = CollationManager::start( node_1_keypair.clone(), config, @@ -147,6 +149,7 @@ async fn test_collation_process_on_stubs() { config: validator_config, }, CollatorStdImplFactory, + state_tracker.clone(), #[cfg(feature = "test")] vec![ node_1_keypair, @@ -164,11 +167,7 @@ async fn test_collation_process_on_stubs() { zerostate_id, storage.clone(), )) - .with_state_subscriber( - MinRefMcStateTracker::default(), - storage.clone(), - state_node_adapter, - ) + .with_state_subscriber(state_tracker.clone(), storage.clone(), state_node_adapter) .build(); let strider_handle = block_strider.run(); diff --git a/core/src/block_strider/state_applier.rs b/core/src/block_strider/state_applier.rs index f3d62e41e..f8b7160d0 100644 --- a/core/src/block_strider/state_applier.rs +++ b/core/src/block_strider/state_applier.rs @@ -49,10 +49,15 @@ where let state_storage = self.inner.storage.shard_state_storage(); // Load handle - let handle = self - .get_block_handle(&cx.mc_block_id, &cx.block, &cx.archive_data) + let (handle, new_or_updated) = self + .get_block_handle(cx.mc_block_id.seqno, &cx.block, &cx.archive_data) .await?; + // Do not need to store new state if block was not updated + if !new_or_updated { + // TODO: build ShardApplierPrepared from existing data from storage and return + } + // Load previous states let (prev_root_cell, handles) = { let (prev_id, prev_id_alt) = cx @@ -163,10 +168,10 @@ where async fn get_block_handle( &self, - mc_block_id: &BlockId, + mc_ref_seqno: u32, block: &BlockStuff, archive_data: &ArchiveData, - ) -> Result { + ) -> Result<(BlockHandle, bool)> { let block_storage = self.inner.storage.block_storage(); let info = block.load_info()?; @@ -174,11 +179,11 @@ where .store_block_data(block, archive_data, BlockMetaData { is_key_block: info.key_block, gen_utime: info.gen_utime, - mc_ref_seqno: mc_block_id.seqno, + mc_ref_seqno, }) .await?; - Ok(res.handle) + Ok((res.handle, res.new || res.updated)) } async fn compute_and_store_state_update( diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index c6b900394..c6c82d63f 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -748,6 +748,22 @@ def collator_do_collate() -> RowPanel: create_heatmap_panel( "tycho_collator_process_collated_block_candidate_time", "process collated block candidate", + labels=['workchain=~"$workchain"'], + ), + create_heatmap_panel( + "tycho_collator_process_collated_block_candidate_pre_accept_total_time", + "incl. pre-accept shard candidate: total", + labels=['workchain=~"$workchain"'], + ), + create_heatmap_panel( + "tycho_collator_process_collated_block_candidate_pre_accept_build_block_stuff_time", + "incl. pre-accept shard candidate: build block stuff for sync", + labels=['workchain=~"$workchain"'], + ), + create_heatmap_panel( + "tycho_collator_process_collated_block_candidate_pre_accept_time", + "incl. pre-accept shard candidate: store", + labels=['workchain=~"$workchain"'], ), create_heatmap_panel( "tycho_collator_update_last_collated_chain_time_and_check_should_collate_mc_block_time",