From 3a24cb8c05ff963697755abd8c3c3a363d54d89f Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Wed, 26 Jun 2024 17:15:57 +0000 Subject: [PATCH 1/8] feature(collation-manager): prepared for async execution --- collator/src/manager/mod.rs | 590 +++++++++++++++++++--------------- collator/src/manager/types.rs | 13 +- collator/src/utils/shard.rs | 26 +- util/src/lib.rs | 2 + 4 files changed, 359 insertions(+), 272 deletions(-) diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 9106d7ee3..9d307106b 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -7,13 +7,14 @@ use anyhow::{anyhow, bail, Result}; use async_trait::async_trait; use everscale_crypto::ed25519::KeyPair; use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; +use parking_lot::{Mutex, RwLock}; use tycho_block_util::block::ValidatorSubsetInfo; use tycho_block_util::state::ShardStateStuff; use tycho_util::metrics::HistogramGuard; -use tycho_util::FastHashMap; +use tycho_util::{DashMapEntry, FastDashMap, FastHashMap, FastHashSet}; use self::types::{ - BlockCacheKey, BlockCandidateContainer, BlockCandidateToSend, BlocksCache, + BlockCacheKey, BlockCandidateContainer, BlockCandidateToSend, BlocksCache, ChainTimesSyncState, McBlockSubgraphToSend, SendSyncStatus, }; use self::utils::find_us_in_collators_set; @@ -79,20 +80,18 @@ where collator_factory: CF, validator: Arc, - active_collation_sessions: FastHashMap>, - collation_sessions_to_finish: FastHashMap>, - active_collators: FastHashMap, - collators_to_stop: FastHashMap, + active_collation_sessions: RwLock>>, + collation_sessions_to_finish: FastDashMap>, + active_collators: FastDashMap>, + collators_to_stop: FastDashMap>, blocks_cache: BlocksCache, - last_processed_mc_block_id: Option, + last_processed_mc_block_id: Mutex>, /// id of last master block collated by ourselves - last_collated_mc_block_id: Option, - /// chain time for next master block to be collated - next_mc_block_chain_time: u64, + last_collated_mc_block_id: Mutex>, - last_collated_chain_times_by_shards: FastHashMap>, + chain_times_sync_state: Mutex, #[cfg(any(test, feature = "test"))] test_validators_keypairs: Vec>, @@ -113,6 +112,17 @@ where } } +#[async_trait] +impl MempoolEventListener for CollationManager +where + CF: CollatorFactory, + V: Validator, +{ + async fn on_new_anchor(&self, anchor: Arc) -> Result<()> { + self.process_new_anchor_from_mempool(anchor).await + } +} + #[async_trait] impl StateNodeEventListener for AsyncQueuedDispatcher> where @@ -144,6 +154,32 @@ where } } +#[async_trait] +impl StateNodeEventListener for CollationManager +where + CF: CollatorFactory, + V: Validator, +{ + async fn on_block_accepted(&self, _block_id: &BlockId) -> Result<()> { + // TODO: remove accepted block from cache + // STUB: do nothing, currently we remove block from cache when it sent to state node + Ok(()) + } + + async fn on_block_accepted_external(&self, state: &ShardStateStuff) -> Result<()> { + // TODO: should store block info from blockchain if it was not already collated + // and validated by ourself. Will use this info for faster validation further: + // will consider that just collated block is already validated if it have the + // same root hash and file hash + if state.block_id().is_masterchain() { + let mc_data = McData::load_from_state(state)?; + self.process_mc_block_from_bc(mc_data).await + } else { + Ok(()) + } + } +} + #[async_trait] impl CollatorEventListener for AsyncQueuedDispatcher> where @@ -182,6 +218,32 @@ where } } +#[async_trait] +impl CollatorEventListener for CollationManager +where + CF: CollatorFactory, + V: Validator, +{ + async fn on_skipped_anchor( + &self, + next_block_id_short: BlockIdShort, + anchor: Arc, + force_mc_block: bool, + ) -> Result<()> { + self.process_skipped_anchor(next_block_id_short, anchor, force_mc_block) + .await + } + + async fn on_block_candidate(&self, collation_result: BlockCollationResult) -> Result<()> { + self.process_collated_block_candidate(collation_result) + .await + } + + async fn on_collator_stopped(&self, stop_key: CollationSessionId) -> Result<()> { + self.process_collator_stopped(stop_key).await + } +} + impl CollationManager where CF: CollatorFactory, @@ -227,18 +289,17 @@ where mq_adapter: mq_adapter.clone(), collator_factory, validator, - active_collation_sessions: FastHashMap::default(), - collation_sessions_to_finish: FastHashMap::default(), - active_collators: FastHashMap::default(), - collators_to_stop: FastHashMap::default(), - blocks_cache: BlocksCache::default(), + active_collation_sessions: Default::default(), + collation_sessions_to_finish: Default::default(), + active_collators: Default::default(), + collators_to_stop: Default::default(), - last_processed_mc_block_id: None, - last_collated_mc_block_id: None, - next_mc_block_chain_time: 0, + blocks_cache: BlocksCache::default(), - last_collated_chain_times_by_shards: FastHashMap::default(), + last_processed_mc_block_id: Default::default(), + last_collated_mc_block_id: Default::default(), + chain_times_sync_state: Default::default(), #[cfg(any(test, feature = "test"))] test_validators_keypairs, @@ -276,23 +337,25 @@ where } /// Return last collated master block id - fn last_collated_mc_block_id(&self) -> Option<&BlockId> { - self.last_collated_mc_block_id.as_ref() + fn get_last_collated_mc_block_id(&self) -> Option { + *self.last_collated_mc_block_id.lock() } - fn set_last_collated_mc_block_id(&mut self, block_id: BlockId) { - self.last_collated_mc_block_id = Some(block_id); + fn set_last_collated_mc_block_id(&self, block_id: BlockId) { + let mut guard = self.last_collated_mc_block_id.lock(); + guard.replace(block_id); } - fn last_processed_mc_block_id(&self) -> Option<&BlockId> { - self.last_processed_mc_block_id.as_ref() - } - fn set_last_processed_mc_block_id(&mut self, block_id: BlockId) { - self.last_processed_mc_block_id = Some(block_id); + fn get_last_processed_mc_block_id(&self) -> Option { + *self.last_processed_mc_block_id.lock() } /// Prunes the cache of last collated chain times - fn process_last_collated_mc_block_chain_time(&mut self, chain_time: u64) { - for (_, last_collated_chain_times) in self.last_collated_chain_times_by_shards.iter_mut() { + fn process_last_collated_mc_block_chain_time(&self, chain_time: u64) { + let mut chain_times_guard = self.chain_times_sync_state.lock(); + for (_, last_collated_chain_times) in chain_times_guard + .last_collated_chain_times_by_shards + .iter_mut() + { last_collated_chain_times.retain(|(ct, _)| ct > &chain_time); } } @@ -300,10 +363,7 @@ where /// (TODO) Check sync status between mempool and blockchain state /// and pause collation when we are far behind other nodesб /// jusct sync blcoks from blockchain - pub async fn process_new_anchor_from_mempool( - &mut self, - _anchor: Arc, - ) -> Result<()> { + pub async fn process_new_anchor_from_mempool(&self, _anchor: Arc) -> Result<()> { // TODO: make real implementation, currently does nothing Ok(()) } @@ -348,14 +408,14 @@ where /// 2. Skip if it was already processed before /// 3. Skip if waiting for the first own master block collation less then `max_mc_block_delta_from_bc_to_await_own` fn check_should_process_mc_block_from_bc(&self, mc_block_id: &BlockId) -> bool { - let last_collated_mc_block_id_opt = self.last_collated_mc_block_id(); - let last_processed_mc_block_id_opt = self.last_processed_mc_block_id(); + let last_collated_mc_block_id_opt = self.get_last_collated_mc_block_id(); + let last_processed_mc_block_id_opt = self.get_last_processed_mc_block_id(); if last_collated_mc_block_id_opt.is_some() { // when we have last own collated master block then skip if incoming one is equal // or not far ahead from last own collated // then will wait for next own collated master block let (seqno_delta, is_equal) = - Self::compare_mc_block_with(mc_block_id, self.last_collated_mc_block_id()); + Self::compare_mc_block_with(mc_block_id, last_collated_mc_block_id_opt.as_ref()); if is_equal || seqno_delta <= self.config.max_mc_block_delta_from_bc_to_await_own { tracing::info!( target: tracing_targets::COLLATION_MANAGER, @@ -383,8 +443,10 @@ where // then we should wait for the first own collated master block // but not more then `max_mc_block_delta_from_bc_to_await_own` if last_processed_mc_block_id_opt.is_some() { - let (seqno_delta, is_equal) = - Self::compare_mc_block_with(mc_block_id, last_processed_mc_block_id_opt); + let (seqno_delta, is_equal) = Self::compare_mc_block_with( + mc_block_id, + last_processed_mc_block_id_opt.as_ref(), + ); let already_processed_before = is_equal || seqno_delta < 0; if already_processed_before { tracing::info!( @@ -413,6 +475,19 @@ where true } + fn check_should_process_and_update_last_processed_mc_block(&self, block_id: &BlockId) -> bool { + let mut guard = self.last_processed_mc_block_id.lock(); + let last_processed_mc_block_id_opt = guard.as_ref(); + let (seqno_delta, is_equal) = + Self::compare_mc_block_with(block_id, last_processed_mc_block_id_opt); + if seqno_delta < 0 || is_equal { + false + } else { + guard.replace(*block_id); + true + } + } + /// Returns: (seqno delta from other, true - if equal) fn compare_mc_block_with( mc_block_id: &BlockId, @@ -438,25 +513,6 @@ where (seqno_delta, is_equal) } - /// * TRUE - provided `mc_block_id` is before or equal to last processed - /// * FALSE - provided `mc_block_id` is ahead of last processed - fn _check_if_mc_block_not_ahead_last_processed(&self, mc_block_id: &BlockId) -> bool { - // TODO: consider block shard? - let last_processed_mc_block_id_opt = self.last_processed_mc_block_id(); - let is_not_ahead = matches!(last_processed_mc_block_id_opt, Some(last_processed_mc_block_id) - if mc_block_id.seqno < last_processed_mc_block_id.seqno - || mc_block_id == last_processed_mc_block_id); - if is_not_ahead { - tracing::info!( - target: tracing_targets::COLLATION_MANAGER, - "mc block ({}) is NOT AHEAD of last processed ({:?})", - mc_block_id.as_short_id(), - self.last_processed_mc_block_id().map(|b| b.as_short_id()), - ); - } - is_not_ahead - } - /// Check if collation sessions initialized and try to force refresh them if they not. /// This needed when start from zerostate. State node adapter will be initialized after /// zerostate load and won't fire `[StateNodeListener::on_mc_block_event()]` for the 1 block. @@ -465,7 +521,7 @@ where pub async fn check_refresh_collation_sessions(&self) -> Result<()> { // the sessions list is not enpty so the collation process was already started from // actual state or incoming master block from blockchain - if !self.active_collation_sessions.is_empty() { + if !self.active_collation_sessions.read().is_empty() { tracing::info!(target: tracing_targets::COLLATION_MANAGER, "Collation sessions already activated"); return Ok(()); } @@ -499,7 +555,7 @@ where /// then start missing sessions for these shards, or refresh existing. /// For each shard run collation process if current node is included in collators subset. #[tracing::instrument(skip_all, fields(mc_block_id = %mc_data.block_id.as_short_id()))] - pub async fn refresh_collation_sessions(&mut self, mc_data: Arc) -> Result<()> { + pub async fn refresh_collation_sessions(&self, mc_data: Arc) -> Result<()> { tracing::debug!( target: tracing_targets::COLLATION_MANAGER, "Trying to refresh collation sessions by mc state for block ({})...", @@ -526,18 +582,15 @@ where // do not re-process this master block if it is lower then last processed or equal to it // but process a new version of block with the same seqno - let processing_mc_block_id = mc_data.block_id; - let (seqno_delta, is_equal) = - Self::compare_mc_block_with(&processing_mc_block_id, self.last_processed_mc_block_id()); - if seqno_delta < 0 || is_equal { + if !self.check_should_process_and_update_last_processed_mc_block(&mc_data.block_id) { return Ok(()); } - tracing::debug!(target: tracing_targets::COLLATION_MANAGER, "mc_state_extra: {:?}", mc_data); + tracing::debug!(target: tracing_targets::COLLATION_MANAGER, "mc_data: {:?}", mc_data); // get new shards info from updated master state let mut new_shards_info = HashMap::new(); - new_shards_info.insert(ShardIdent::MASTERCHAIN, vec![processing_mc_block_id]); + new_shards_info.insert(ShardIdent::MASTERCHAIN, vec![mc_data.block_id]); for shard in mc_data.shards.iter() { let (shard_id, descr) = shard?; let top_block = BlockId { @@ -551,16 +604,21 @@ where } // update shards in msgs queue - let current_shards_ids = self.active_collation_sessions.keys().collect(); + let active_shards_ids: Vec<_> = self + .active_collation_sessions + .read() + .keys() + .cloned() + .collect(); let new_shards_ids: Vec<&ShardIdent> = new_shards_info.keys().collect(); tracing::debug!( target: tracing_targets::COLLATION_MANAGER, "Detecting split/merge actions to move from current shards {:?} to new shards {:?}...", - current_shards_ids, + active_shards_ids.as_slice(), new_shards_ids ); - let split_merge_actions = calc_split_merge_actions(current_shards_ids, new_shards_ids)?; + let split_merge_actions = calc_split_merge_actions(&active_shards_ids, new_shards_ids)?; if !split_merge_actions.is_empty() { tracing::info!( target: tracing_targets::COLLATION_MANAGER, @@ -582,31 +640,43 @@ where let mut sessions_to_start = vec![]; let mut to_finish_sessions = HashMap::new(); let mut to_stop_collators = HashMap::new(); - for shard_info in new_shards_info { - if let Some(existing_session) = - self.active_collation_sessions.remove_entry(&shard_info.0) - { - if existing_session.1.seqno() >= new_session_seqno { - sessions_to_keep.insert(shard_info.0, existing_session.1); - } else { - sessions_to_start.push(shard_info); - to_finish_sessions - .insert((existing_session.0, new_session_seqno), existing_session.1); + { + let mut active_collation_sessions_guard = self.active_collation_sessions.write(); + let mut missed_shards_ids: FastHashSet<_> = active_shards_ids.into_iter().collect(); + for shard_info in new_shards_info { + missed_shards_ids.remove(&shard_info.0); + match active_collation_sessions_guard.entry(shard_info.0) { + Entry::Occupied(entry) => { + let existing_session = entry.get().clone(); + if existing_session.seqno() >= new_session_seqno { + sessions_to_keep.insert(shard_info.0, existing_session); + } else { + to_finish_sessions + .insert((shard_info.0, new_session_seqno), existing_session); + sessions_to_start.push(shard_info); + entry.remove(); + } + } + Entry::Vacant(_) => { + sessions_to_start.push(shard_info); + } } - } else { - sessions_to_start.push(shard_info); } - } - // if we still have some active sessions that do not match with new shards - // then we need to finish them and stop their collators - for current_active_session in self.active_collation_sessions.drain() { - to_finish_sessions.insert( - (current_active_session.0, new_session_seqno), - current_active_session.1, - ); - if let Some(collator) = self.active_collators.remove(¤t_active_session.0) { - to_stop_collators.insert((current_active_session.0, new_session_seqno), collator); + // if we still have some active sessions that do not match with new shards + // then we need to finish them and stop their collators + for shard_id in missed_shards_ids { + // TODO: we should remove session from active and add to finished in one atomic operation + // to not to miss session on processing block candidate that could be from old session + if let Some(current_active_session) = + active_collation_sessions_guard.remove(&shard_id) + { + to_finish_sessions + .insert((shard_id, new_session_seqno), current_active_session); + if let Some(collator) = self.active_collators.remove(&shard_id) { + to_stop_collators.insert((shard_id, new_session_seqno), collator.1); + } + } } } @@ -626,15 +696,13 @@ where let cc_config = mc_data.config.get_catchain_config()?; // update master state in existing collators and resume collation - for (shard_id, session_info) in sessions_to_keep { - self.active_collation_sessions - .insert(shard_id, session_info); - + for (shard_id, _) in sessions_to_keep { // if there is no active collator then current node does not collate this shard // so we do not need to do anything let Some(collator) = self.active_collators.get(&shard_id) else { continue; }; + let collator = collator.value().clone(); if shard_id.is_masterchain() { tracing::debug!( @@ -709,7 +777,7 @@ where if let Some(_local_pubkey) = local_pubkey_opt { let prev_seqno = prev_blocks_ids.iter().map(|b| b.seqno).max().unwrap_or(0); - if let Entry::Vacant(entry) = self.active_collators.entry(shard_id) { + if let DashMapEntry::Vacant(entry) = self.active_collators.entry(shard_id) { tracing::info!( target: tracing_targets::COLLATION_MANAGER, "There is no active collator for collation session {}. Will start it", @@ -729,7 +797,7 @@ where mc_data: mc_data.clone(), }) .await; - entry.insert(collator); + entry.insert(Arc::new(collator)); } // notify validator, it will start overlay initialization @@ -749,12 +817,13 @@ where shard_id, ); if let Some(collator) = self.active_collators.remove(&shard_id) { - to_stop_collators.insert((shard_id, new_session_seqno), collator); + to_stop_collators.insert((shard_id, new_session_seqno), collator.1); } } // TODO: possibly do not need to store collation sessions if we do not collate in them self.active_collation_sessions + .write() .insert(shard_id, new_session_info); } @@ -793,9 +862,6 @@ where self.collators_to_stop.insert(stop_key, collator); } - // store last processed master block id to avoid processing it again - self.set_last_processed_mc_block_id(processing_mc_block_id); - Ok(()) // finally we will have initialized `active_collation_sessions` and `active_collators` @@ -804,7 +870,7 @@ where /// Execute collation session finalization routines pub async fn finish_collation_session( - &mut self, + &self, _session_info: Arc, finish_key: CollationSessionId, ) -> Result<()> { @@ -816,7 +882,7 @@ where } /// Remove stopped collator from cache - pub async fn process_collator_stopped(&mut self, stop_key: CollationSessionId) -> Result<()> { + pub async fn process_collator_stopped(&self, stop_key: CollationSessionId) -> Result<()> { tracing::debug!(target: tracing_targets::COLLATION_MANAGER, "process_collator_stopped: {:?}", stop_key, ); @@ -839,7 +905,7 @@ where ), )] pub async fn process_collated_block_candidate( - &mut self, + &self, collation_result: BlockCollationResult, ) -> Result<()> { tracing::debug!(target: tracing_targets::COLLATION_MANAGER, @@ -854,6 +920,7 @@ where // find session related to this block by shard let session_info = self .active_collation_sessions + .read() .get(&block_id.shard) .ok_or(anyhow!( "There is no active collation session for the shard that block belongs to" @@ -963,7 +1030,7 @@ where #[tracing::instrument(skip_all, fields(block_id = %next_block_id_short, ct = anchor.chain_time, force_mc_block))] async fn process_skipped_anchor( - &mut self, + &self, next_block_id_short: BlockIdShort, anchor: Arc, force_mc_block: bool, @@ -984,7 +1051,7 @@ where /// 2. If true, schedule master block collation /// 3. If no, schedule next collation attempt in current shard async fn collate_mc_block_by_interval_or_continue_shard_collation( - &mut self, + &self, shard_id: ShardIdent, chain_time: u64, force_mc_block: bool, @@ -1025,7 +1092,7 @@ where /// Returns: (`should_collate_mc_block`, `next_mc_block_chain_time`) /// * `next_mc_block_chain_time.is_some()` when master collation condition met in every shard fn update_last_collated_chain_time_and_check_should_collate_mc_block( - &mut self, + &self, shard_id: ShardIdent, chain_time: u64, force_mc_block: bool, @@ -1038,7 +1105,8 @@ where "tycho_collator_update_last_collated_chain_time_and_check_should_collate_mc_block_time", ); - let last_collated_chain_times = self + let mut chain_times_guard = self.chain_times_sync_state.lock(); + let last_collated_chain_times = chain_times_guard .last_collated_chain_times_by_shards .entry(shard_id) .or_default(); @@ -1057,7 +1125,7 @@ where } else { // check if master block interval elapsed in current shard let chain_time_elapsed = chain_time - .checked_sub(self.next_mc_block_chain_time) + .checked_sub(chain_times_guard.mc_block_latest_chain_time) .unwrap_or_default(); let mc_block_interval_elapsed = chain_time_elapsed > mc_block_min_interval_ms; if mc_block_interval_elapsed { @@ -1081,16 +1149,23 @@ where // if master block should be collated in every shard let mut first_elapsed_chain_times = vec![]; let mut should_collate_in_every_shard = true; - for active_shard in self.active_collation_sessions.keys() { - if let Some(last_collated_chain_times) = - self.last_collated_chain_times_by_shards.get(active_shard) + let active_shards = self + .active_collation_sessions + .read() + .keys() + .cloned() + .collect::>(); + for active_shard in active_shards { + if let Some(last_collated_chain_times) = chain_times_guard + .last_collated_chain_times_by_shards + .get(&active_shard) { if let Some((chain_time_that_elapsed, _)) = last_collated_chain_times .iter() .find(|(chain_time, force)| { *force || (*chain_time) - .checked_sub(self.next_mc_block_chain_time) + .checked_sub(chain_times_guard.mc_block_latest_chain_time) .unwrap_or_default() > mc_block_min_interval_ms }) @@ -1142,8 +1217,7 @@ where .blocks_cache .shards .iter() - .filter_map(|(_, shard_cache)| shard_cache.last_key_value().map(|(_, v)| v.key())) - .cloned() + .filter_map(|shard_cache| shard_cache.value().last_key_value().map(|(_, v)| *v.key())) .collect::>(); let mut result = HashMap::new(); @@ -1203,7 +1277,7 @@ where /// (TODO) Enqueue master block collation task. Will determine top shard blocks for this collation async fn enqueue_mc_block_collation( - &mut self, + &self, next_mc_block_chain_time: u64, trigger_block_id_opt: Option, ) -> Result<()> { @@ -1227,18 +1301,13 @@ where trigger_block_id_opt, )?; - // TODO: We should somehow collect externals for masterchain during the shard blocks collation - // or pull them directly when collating master - - self.next_mc_block_chain_time = next_mc_block_chain_time; - mc_collator .equeue_do_collate(next_mc_block_chain_time, top_shard_blocks_info) .await?; tracing::debug!(target: tracing_targets::COLLATION_MANAGER, "Master block collation enqueued: (block_id={} ct={})", - self.last_collated_mc_block_id() + self.get_last_collated_mc_block_id() .map(|id| BlockIdShort { shard: id.shard, seqno: id.seqno + 1 }.to_string()) .unwrap_or_default(), next_mc_block_chain_time, @@ -1275,7 +1344,7 @@ where /// 2. Execute processing for master or shard block #[tracing::instrument(skip_all, fields(block_id = %block_id.as_short_id()))] pub async fn process_validated_block( - &mut self, + &self, block_id: BlockId, status: ValidationStatus, ) -> Result<()> { @@ -1314,7 +1383,7 @@ where } /// Store block in a cache structure that allow to append signatures - fn store_candidate(&mut self, candidate: Box) -> Result<()> { + fn store_candidate(&self, candidate: Box) -> Result<()> { // TODO: in future we may store to cache a block received from blockchain before, // then it will exist in cache when we try to store collated candidate // but the `root_hash` may differ, so we have to handle such a case @@ -1329,7 +1398,7 @@ where .cloned() .collect::>(); while let Some(prev_shard_block_key) = prev_shard_blocks_keys.pop_front() { - if let Some(shard_cache) = self + if let Some(mut shard_cache) = self .blocks_cache .shards .get_mut(&prev_shard_block_key.shard) @@ -1359,7 +1428,7 @@ where ); } } else { - let shard_cache = self + let mut shard_cache = self .blocks_cache .shards .entry(block_container.key().shard) @@ -1379,73 +1448,93 @@ where /// Find block candidate in cache, append signatures info and return updated fn store_block_validation_result( - &mut self, + &self, block_id: BlockId, validation_result: ValidationStatus, ) -> bool { - if let Some(block_container) = if block_id.is_masterchain() { - self.blocks_cache.master.get_mut(&block_id.as_short_id()) - } else { - self.blocks_cache - .shards - .get_mut(&block_id.shard) - .and_then(|shard_cache| shard_cache.get_mut(&block_id.seqno)) - } { - let (is_valid, already_synced, signatures) = match validation_result { - ValidationStatus::Skipped => (true, true, Default::default()), - ValidationStatus::Complete(signatures) => (true, false, signatures), - }; - - block_container.set_validation_result(is_valid, already_synced, signatures); + let (is_valid, already_synced, signatures) = match validation_result { + ValidationStatus::Skipped => (true, true, Default::default()), + ValidationStatus::Complete(signatures) => (true, false, signatures), + }; - true - } else { - false + if block_id.is_masterchain() { + if let Some(mut block_container) = + self.blocks_cache.master.get_mut(&block_id.as_short_id()) + { + block_container.set_validation_result(is_valid, already_synced, signatures); + return true; + } + } else if let Some(mut shard_cache) = self.blocks_cache.shards.get_mut(&block_id.shard) { + if let Some(block_container) = shard_cache.get_mut(&block_id.seqno) { + block_container.set_validation_result(is_valid, already_synced, signatures); + return true; + } } + false } - /// Find shard block in cache and then get containing master block if link exists - fn find_containing_mc_block( - &self, - shard_block_id: &BlockId, - ) -> Option<&BlockCandidateContainer> { + /// Find shard block in cache and then get containing master block id if link exists + fn find_containing_mc_block(&self, shard_block_id: &BlockId) -> Option<(BlockId, bool)> { // TODO: handle when master block link exist but there is not block itself - if let Some(mc_block_key) = self - .blocks_cache - .shards - .get(&shard_block_id.shard) - .and_then(|shard_cache| shard_cache.get(&shard_block_id.seqno)) - .and_then(|sbc| sbc.containing_mc_block) - { - self.blocks_cache.master.get(&mc_block_key) - } else { - None + if let Some(shard_cache) = self.blocks_cache.shards.get(&shard_block_id.shard) { + if let Some(mc_block_key) = shard_cache + .value() + .get(&shard_block_id.seqno) + .and_then(|sbc| sbc.containing_mc_block) + { + let res = self + .blocks_cache + .master + .get(&mc_block_key) + .map(|block_container| { + (*block_container.block_id(), block_container.is_valid()) + }); + return res; + } } + None } /// Find all shard blocks that form master block subgraph. /// Then extract and return them if all are valid - fn extract_mc_block_subgraph_if_valid( - &mut self, + async fn extract_mc_block_subgraph_if_valid( + &self, block_id: &BlockId, ) -> Result> { // 1. Find current master block - let mc_block_container = self - .blocks_cache - .master - .get_mut(&block_id.as_short_id()) - .ok_or_else(|| { - anyhow!( - "Master block ({}) not found in cache!", - block_id.as_short_id() - ) - })?; - if !mc_block_container.is_valid() { - return Ok(None); - } + let (mc_block_container_key, mc_block_candidate_to_send, mut prev_shard_blocks_keys) = { + let mut mc_block_container = self + .blocks_cache + .master + .get_mut(&block_id.as_short_id()) + .ok_or_else(|| { + anyhow!( + "Master block ({}) not found in cache!", + block_id.as_short_id() + ) + })?; + if !mc_block_container.is_valid() { + return Ok(None); + } + + let mc_block_candidate_to_send = mc_block_container.extract_entry_for_sending()?; + + let prev_shard_blocks_keys = mc_block_container + .top_shard_blocks_keys() + .iter() + .cloned() + .collect::>(); + + ( + *mc_block_container.key(), + mc_block_candidate_to_send, + prev_shard_blocks_keys, + ) + }; + let mut subgraph = McBlockSubgraphToSend { mc_block: BlockCandidateToSend { - entry: mc_block_container.extract_entry_for_sending()?, + entry: mc_block_candidate_to_send, send_sync_status: SendSyncStatus::Sending, }, shard_blocks: vec![], @@ -1453,13 +1542,9 @@ where // 3. By the top shard blocks info find shard blocks of current master block // 4. Recursively find prev shard blocks until the end or top shard blocks of prev master reached - let mut prev_shard_blocks_keys = mc_block_container - .top_shard_blocks_keys() - .iter() - .cloned() - .collect::>(); + let mut not_all_blocks_valid = false; while let Some(prev_shard_block_key) = prev_shard_blocks_keys.pop_front() { - let shard_cache = self + let mut shard_cache = self .blocks_cache .shards .get_mut(&prev_shard_block_key.shard) @@ -1468,19 +1553,12 @@ where })?; if let Some(shard_block_container) = shard_cache.get_mut(&prev_shard_block_key.seqno) { // if shard block included in current master block subgraph - if matches!(shard_block_container.containing_mc_block, Some(containing_mc_block_key) if &containing_mc_block_key == mc_block_container.key()) + if matches!(shard_block_container.containing_mc_block, Some(containing_mc_block_key) if containing_mc_block_key == mc_block_container_key) { - // 5. If master block and all shard blocks valid the extract them from entries and return + // 5. If master block and all shard blocks valid then extract them from entries and return if !shard_block_container.is_valid() { - tracing::debug!( - target: tracing_targets::COLLATION_MANAGER, - "Not all blocks are valid in master block ({}) subgraph", - block_id.as_short_id(), - ); - let mut blocks_to_restore = vec![subgraph.mc_block]; - blocks_to_restore.append(&mut subgraph.shard_blocks); - self.restore_blocks_in_cache(blocks_to_restore)?; - return Ok(None); + not_all_blocks_valid = true; + break; } subgraph.shard_blocks.push(BlockCandidateToSend { entry: shard_block_container.extract_entry_for_sending()?, @@ -1495,59 +1573,68 @@ where } } - let _tracing_shard_blocks_descr = subgraph - .shard_blocks - .iter() - .map(|sb| sb.entry.key.to_string()) - .collect::>(); + if not_all_blocks_valid { + let mut blocks_to_restore = vec![subgraph.mc_block]; + blocks_to_restore.append(&mut subgraph.shard_blocks); + self.restore_blocks_in_cache_async(blocks_to_restore) + .await?; + return Ok(None); + } + tracing::debug!( target: tracing_targets::COLLATION_MANAGER, "Extracted valid master block ({}) subgraph for sending to sync: {:?}", block_id.as_short_id(), - _tracing_shard_blocks_descr.as_slice(), + subgraph + .shard_blocks + .iter() + .map(|sb| sb.entry.key.to_string()) + .collect::>().as_slice(), ); Ok(Some(subgraph)) } /// Remove block entries from cache and compact cache - async fn cleanup_blocks_from_cache(&mut self, blocks_keys: Vec) -> Result<()> { - let _tracing_blocks_descr = blocks_keys - .iter() - .map(|key| key.to_string()) - .collect::>(); + async fn cleanup_blocks_from_cache(&self, blocks_keys: Vec) -> Result<()> { tracing::debug!( target: tracing_targets::COLLATION_MANAGER, "Cleaning up blocks from cache: {:?}", - _tracing_blocks_descr.as_slice(), + blocks_keys + .iter() + .map(|key| key.to_string()) + .collect::>() + .as_slice(), ); - for block_key in blocks_keys { + for block_key in blocks_keys.iter() { if block_key.shard.is_masterchain() { - self.blocks_cache.master.remove(&block_key); - } else if let Some(shard_cache) = self.blocks_cache.shards.get_mut(&block_key.shard) { + self.blocks_cache.master.remove(block_key); + } else if let Some(mut shard_cache) = self.blocks_cache.shards.get_mut(&block_key.shard) + { shard_cache.remove(&block_key.seqno); } } tracing::debug!( target: tracing_targets::COLLATION_MANAGER, "Blocks cleaned up from cache: {:?}", - _tracing_blocks_descr.as_slice(), + blocks_keys + .iter() + .map(|key| key.to_string()) + .collect::>() + .as_slice(), ); Ok(()) } async fn restore_blocks_in_cache_async( - &mut self, + &self, blocks_to_restore: Vec, ) -> Result<()> { self.restore_blocks_in_cache(blocks_to_restore) } /// Find and restore block entries in cache updating sync statuses - fn restore_blocks_in_cache( - &mut self, - blocks_to_restore: Vec, - ) -> Result<()> { + fn restore_blocks_in_cache(&self, blocks_to_restore: Vec) -> Result<()> { let _tracing_blocks_descr = blocks_to_restore .iter() .map(|b| b.entry.key.to_string()) @@ -1559,24 +1646,32 @@ where ); for block in blocks_to_restore { // find block in cache - let block_container = if block.entry.key.shard.is_masterchain() { - self.blocks_cache + if block.entry.key.shard.is_masterchain() { + let mut block_container = self + .blocks_cache .master .get_mut(&block.entry.key) .ok_or_else(|| { anyhow!("Master block ({}) not found in cache!", block.entry.key) - })? + })?; + block_container.restore_entry(block.entry, block.send_sync_status)?; } else { - self.blocks_cache + let mut shard_cache = self + .blocks_cache .shards .get_mut(&block.entry.key.shard) - .and_then(|shard_cache| shard_cache.get_mut(&block.entry.key.seqno)) .ok_or_else(|| { + anyhow!( + "Shard blocks map ({}) not found in cache!", + block.entry.key.shard + ) + })?; + let block_container = + shard_cache.get_mut(&block.entry.key.seqno).ok_or_else(|| { anyhow!("Shard block ({}) not found in cache!", block.entry.key) - })? + })?; + block_container.restore_entry(block.entry, block.send_sync_status)?; }; - // restore entry and update sync status - block_container.restore_entry(block.entry, block.send_sync_status)?; } tracing::debug!( target: tracing_targets::COLLATION_MANAGER, @@ -1589,7 +1684,7 @@ where /// Process validated and valid master block /// 1. Check if all included shard blocks validated, return if not /// 2. Send master and shard blocks to state node to sync - async fn process_valid_master_block(&mut self, block_id: &BlockId) -> Result<()> { + async fn process_valid_master_block(&self, block_id: &BlockId) -> Result<()> { tracing::debug!( target: tracing_targets::COLLATION_MANAGER, "Start processing validated and valid master block ({})...", @@ -1603,7 +1698,7 @@ where let mut sync_elapsed = Default::default(); // extract master block with all shard blocks if valid, and process them - if let Some(mc_block_subgraph) = self.extract_mc_block_subgraph_if_valid(block_id)? { + if let Some(mc_block_subgraph) = self.extract_mc_block_subgraph_if_valid(block_id).await? { extract_elapsed = histogram_extract.finish(); let timer = std::time::Instant::now(); @@ -1611,23 +1706,14 @@ where blocks_to_send.reverse(); blocks_to_send.push(mc_block_subgraph.mc_block); - // spawn async task to send all shard and master blocks - let _join_handle = tokio::spawn({ - let dispatcher = (*self.dispatcher).clone(); - let mq_adapter = self.mq_adapter.clone(); - let state_node_adapter = self.state_node_adapter.clone(); - async move { - Self::send_blocks_to_sync( - dispatcher, - mq_adapter, - state_node_adapter, - blocks_to_send, - ) - .await - } - }); - // TODO: make proper panic and error processing without waiting for spawned task - // join_handle.await??; + // send all shard and master blocks + self.send_blocks_to_sync( + (*self.dispatcher).clone(), + self.mq_adapter.clone(), + self.state_node_adapter.clone(), + blocks_to_send, + ) + .await?; sync_elapsed = timer.elapsed(); } else { @@ -1649,10 +1735,9 @@ where } /// 1. Try find master block info and execute [`CollationProcessor::process_valid_master_block`] - async fn process_valid_shard_block(&mut self, block_id: &BlockId) -> Result<()> { - if let Some(mc_block_container) = self.find_containing_mc_block(block_id) { - let mc_block_id = *mc_block_container.block_id(); - if mc_block_container.is_valid() { + async fn process_valid_shard_block(&self, block_id: &BlockId) -> Result<()> { + if let Some((mc_block_id, is_valid)) = self.find_containing_mc_block(block_id) { + if is_valid { tracing::debug!( target: tracing_targets::COLLATION_MANAGER, "Found containing master block ({}) for just validated shard block ({}) in cache", @@ -1684,6 +1769,7 @@ where /// 4. Return all blocks to cache if got error (separate task will try to resend further) /// 5. Return `Error` if it seems to be unrecoverable async fn send_blocks_to_sync( + &self, dispatcher: AsyncQueuedDispatcher, mq_adapter: Arc, state_node_adapter: Arc, @@ -1783,12 +1869,7 @@ where "All blocks were successfully sent to sync. Will cleanup them from cache: {:?}", _tracing_sent_blocks_descr.as_slice(), ); - dispatcher - .enqueue_task(method_to_async_task_closure!( - cleanup_blocks_from_cache, - sent_blocks_keys - )) - .await?; + self.cleanup_blocks_from_cache(sent_blocks_keys).await?; } commit_diffs_elapsed = histogram.finish(); @@ -1801,12 +1882,7 @@ where _tracing_blocks_to_send_descr.as_slice(), ); // queue blocks restore task - dispatcher - .enqueue_task(method_to_async_task_closure!( - restore_blocks_in_cache_async, - blocks_to_send - )) - .await?; + self.restore_blocks_in_cache_async(blocks_to_send).await?; // TODO: should implement resending for restored blocks } diff --git a/collator/src/manager/types.rs b/collator/src/manager/types.rs index b12fe10d3..b434557a4 100644 --- a/collator/src/manager/types.rs +++ b/collator/src/manager/types.rs @@ -3,17 +3,24 @@ use std::collections::BTreeMap; use anyhow::{anyhow, bail, Result}; use everscale_types::models::{Block, BlockId, BlockIdShort, ShardIdent}; use tycho_network::PeerId; -use tycho_util::FastHashMap; +use tycho_util::{FastDashMap, FastHashMap}; use crate::types::{ArcSignature, BlockCandidate, BlockStuffForSync}; pub(super) type BlockCacheKey = BlockIdShort; pub(super) type BlockSeqno = u32; +#[derive(Default)] +pub(super) struct ChainTimesSyncState { + /// latest known chain time for master block: last imported or next to be collated + pub mc_block_latest_chain_time: u64, + pub last_collated_chain_times_by_shards: FastHashMap>, +} + #[derive(Default)] pub(super) struct BlocksCache { - pub master: BTreeMap, - pub shards: FastHashMap>, + pub master: FastDashMap, + pub shards: FastDashMap>, } pub struct BlockCandidateEntry { diff --git a/collator/src/utils/shard.rs b/collator/src/utils/shard.rs index 3940a5437..cde72e641 100644 --- a/collator/src/utils/shard.rs +++ b/collator/src/utils/shard.rs @@ -22,7 +22,7 @@ enum CalcSplitMergeStep<'a> { /// Calculate the list of split/merge actions that are needed /// to move from the current shards set to a new pub fn calc_split_merge_actions( - from_current_shards: Vec<&ShardIdent>, + from_current_shards: &[ShardIdent], to_new_shards: Vec<&ShardIdent>, ) -> Result> { // TODO: not the best code, possibly needs refactoring @@ -35,7 +35,7 @@ pub fn calc_split_merge_actions( planned_actions.extend( from_current_shards .iter() - .map(|&sh| CalcSplitMergeStep::CheckAction(*sh, None, None)), + .map(|&sh| CalcSplitMergeStep::CheckAction(sh, None, None)), ); } @@ -137,26 +137,28 @@ mod tests { println!("shard split 2 {}", shard_a0); println!("shard split 2 {}", shard_e0); - let shards_1 = vec![&shard_80]; - let actions = calc_split_merge_actions(vec![], shards_1.clone()).unwrap(); + let shards_1_r = vec![&shard_80]; + let shards_1_l = &[shard_80]; + let actions = calc_split_merge_actions(&[], shards_1_r.clone()).unwrap(); println!("split/merge actions from [] to [1]: {:?}", actions); - let shards_4 = vec![&shard_20, &shard_60, &shard_a0, &shard_e0]; - let actions = calc_split_merge_actions(vec![], shards_4.clone()).unwrap(); + let shards_4_r = vec![&shard_20, &shard_60, &shard_a0, &shard_e0]; + let actions = calc_split_merge_actions(&[], shards_4_r.clone()).unwrap(); println!("split/merge actions from [] to [4]: {:?}", actions); - let actions = calc_split_merge_actions(shards_1.clone(), shards_4.clone()).unwrap(); + let actions = calc_split_merge_actions(shards_1_l, shards_4_r.clone()).unwrap(); println!("split/merge actions from [1] to [4]: {:?}", actions); - let shards_2 = vec![&shard_40, &shard_c0]; - let actions = calc_split_merge_actions(shards_2.clone(), shards_4.clone()).unwrap(); + let shards_2_l = &[shard_40, shard_c0]; + let actions = calc_split_merge_actions(shards_2_l, shards_4_r.clone()).unwrap(); println!("split/merge actions from [2] to [4]: {:?}", actions); - let shards_3 = vec![&shard_40, &shard_a0, &shard_e0]; - let actions = calc_split_merge_actions(shards_2.clone(), shards_3.clone()).unwrap(); + let shards_3_r = vec![&shard_40, &shard_a0, &shard_e0]; + let shards_3_l = &[shard_40, shard_a0, shard_e0]; + let actions = calc_split_merge_actions(shards_2_l, shards_3_r.clone()).unwrap(); println!("split/merge actions from [2] to [3]: {:?}", actions); - let actions = calc_split_merge_actions(shards_3.clone(), shards_4.clone()).unwrap(); + let actions = calc_split_merge_actions(shards_3_l, shards_4_r.clone()).unwrap(); println!("split/merge actions from [3] to [4]: {:?}", actions); } } diff --git a/util/src/lib.rs b/util/src/lib.rs index e23bc79b3..0b9b5da82 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -50,6 +50,8 @@ mod util { pub(crate) mod wake_list; } +pub use dashmap::mapref::entry::Entry as DashMapEntry; + pub type FastDashMap = dashmap::DashMap; pub type FastDashSet = dashmap::DashSet; pub type FastHashMap = HashMap; From d978f850abda625a684b911c09363d088cdcf7b4 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Thu, 27 Jun 2024 18:00:53 +0000 Subject: [PATCH 2/8] feature(collation-manager): implemented async dispatcher to spawn tasks execution in parallel --- collator/src/collator/mod.rs | 16 +-- collator/src/manager/mod.rs | 69 +++++------ collator/src/utils/async_dispatcher.rs | 115 ++++++++++++++++++ collator/src/utils/async_queued_dispatcher.rs | 8 +- collator/src/utils/mod.rs | 1 + 5 files changed, 157 insertions(+), 52 deletions(-) create mode 100644 collator/src/utils/async_dispatcher.rs diff --git a/collator/src/collator/mod.rs b/collator/src/collator/mod.rs index 826ec6a5b..4906974b4 100644 --- a/collator/src/collator/mod.rs +++ b/collator/src/collator/mod.rs @@ -23,9 +23,9 @@ use crate::types::{ TopBlockDescription, }; use crate::utils::async_queued_dispatcher::{ - AsyncQueuedDispatcher, STANDARD_DISPATCHER_QUEUE_BUFFER_SIZE, + AsyncQueuedDispatcher, STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE, }; -use crate::{method_to_async_task_closure, tracing_targets}; +use crate::{method_to_queued_async_closure, tracing_targets}; mod build_block; mod do_collate; @@ -135,7 +135,7 @@ impl Collator for AsyncQueuedDispatcher { } async fn equeue_update_mc_data_and_try_collate(&self, mc_data: Arc) -> Result<()> { - self.enqueue_task(method_to_async_task_closure!( + self.enqueue_task(method_to_queued_async_closure!( update_mc_data_and_try_collate, mc_data )) @@ -143,7 +143,7 @@ impl Collator for AsyncQueuedDispatcher { } async fn equeue_try_collate(&self) -> Result<()> { - self.enqueue_task(method_to_async_task_closure!(try_collate,)) + self.enqueue_task(method_to_queued_async_closure!(try_collate,)) .await } @@ -152,7 +152,7 @@ impl Collator for AsyncQueuedDispatcher { next_chain_time: u64, top_shard_blocks_info: Vec, ) -> Result<()> { - self.enqueue_task(method_to_async_task_closure!( + self.enqueue_task(method_to_queued_async_closure!( wait_state_and_do_collate, next_chain_time, top_shard_blocks_info @@ -219,7 +219,7 @@ impl CollatorStdImpl { // create dispatcher for own async tasks queue let (dispatcher, receiver) = - AsyncQueuedDispatcher::new(STANDARD_DISPATCHER_QUEUE_BUFFER_SIZE); + AsyncQueuedDispatcher::new(STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE); let exec_manager = ExecutionManager::new( shard_id, @@ -268,7 +268,7 @@ impl CollatorStdImpl { // equeue first initialization task // sending to the receiver here cannot return Error because it is guaranteed not closed or dropped dispatcher - .enqueue_task(method_to_async_task_closure!( + .enqueue_task(method_to_queued_async_closure!( init, prev_blocks_ids, mc_data, @@ -343,7 +343,7 @@ impl CollatorStdImpl { // enqueue collation attempt of next block self.dispatcher - .enqueue_task(method_to_async_task_closure!(try_collate,)) + .enqueue_task(method_to_queued_async_closure!(try_collate,)) .await?; tracing::info!(target: tracing_targets::COLLATOR, "Collator (block_id={}): init: collation attempt enqueued", self.next_block_id_short, diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 9d307106b..047c13bc9 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -26,13 +26,11 @@ use crate::types::{ BlockCandidate, BlockCollationResult, CollationConfig, CollationSessionId, CollationSessionInfo, McData, ProofFunds, TopBlockDescription, }; -use crate::utils::async_queued_dispatcher::{ - AsyncQueuedDispatcher, STANDARD_DISPATCHER_QUEUE_BUFFER_SIZE, -}; +use crate::utils::async_dispatcher::{AsyncDispatcher, STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE}; use crate::utils::schedule_async_action; use crate::utils::shard::calc_split_merge_actions; use crate::validator::{AddSession, ValidationStatus, Validator}; -use crate::{method_to_async_task_closure, tracing_targets}; +use crate::{method_to_async_closure, tracing_targets}; mod types; mod utils; @@ -41,14 +39,14 @@ pub struct RunningCollationManager where CF: CollatorFactory, { - dispatcher: AsyncQueuedDispatcher>, + dispatcher: AsyncDispatcher>, state_node_adapter: Arc, mpool_adapter: Arc, mq_adapter: Arc, } impl RunningCollationManager { - pub fn dispatcher(&self) -> &AsyncQueuedDispatcher> { + pub fn dispatcher(&self) -> &AsyncDispatcher> { &self.dispatcher } @@ -72,7 +70,7 @@ where keypair: Arc, config: Arc, - dispatcher: Arc>, + dispatcher: Arc>, state_node_adapter: Arc, mpool_adapter: Arc, mq_adapter: Arc, @@ -98,13 +96,13 @@ where } #[async_trait] -impl MempoolEventListener for AsyncQueuedDispatcher> +impl MempoolEventListener for AsyncDispatcher> where CF: CollatorFactory, V: Validator, { async fn on_new_anchor(&self, anchor: Arc) -> Result<()> { - self.enqueue_task(method_to_async_task_closure!( + self.spawn_task(method_to_async_closure!( process_new_anchor_from_mempool, anchor )) @@ -124,7 +122,7 @@ where } #[async_trait] -impl StateNodeEventListener for AsyncQueuedDispatcher> +impl StateNodeEventListener for AsyncDispatcher> where CF: CollatorFactory, V: Validator, @@ -142,12 +140,8 @@ where // same root hash and file hash if state.block_id().is_masterchain() { let mc_data = McData::load_from_state(state)?; - - self.enqueue_task(method_to_async_task_closure!( - process_mc_block_from_bc, - mc_data - )) - .await + self.spawn_task(method_to_async_closure!(process_mc_block_from_bc, mc_data)) + .await } else { Ok(()) } @@ -181,7 +175,7 @@ where } #[async_trait] -impl CollatorEventListener for AsyncQueuedDispatcher> +impl CollatorEventListener for AsyncDispatcher> where CF: CollatorFactory, V: Validator, @@ -192,7 +186,7 @@ where anchor: Arc, force_mc_block: bool, ) -> Result<()> { - self.enqueue_task(method_to_async_task_closure!( + self.spawn_task(method_to_async_closure!( process_skipped_anchor, next_block_id_short, anchor, @@ -202,7 +196,7 @@ where } async fn on_block_candidate(&self, collation_result: BlockCollationResult) -> Result<()> { - self.enqueue_task(method_to_async_task_closure!( + self.spawn_task(method_to_async_closure!( process_collated_block_candidate, collation_result )) @@ -210,11 +204,8 @@ where } async fn on_collator_stopped(&self, stop_key: CollationSessionId) -> Result<()> { - self.enqueue_task(method_to_async_task_closure!( - process_collator_stopped, - stop_key - )) - .await + self.spawn_task(method_to_async_closure!(process_collator_stopped, stop_key)) + .await } } @@ -266,9 +257,11 @@ where { tracing::info!(target: tracing_targets::COLLATION_MANAGER, "Creating collation manager..."); - // create dispatcher for own async tasks queue - let (dispatcher, receiver) = - AsyncQueuedDispatcher::new(STANDARD_DISPATCHER_QUEUE_BUFFER_SIZE); + // create dispatcher for own tasks + let (dispatcher, spawn_receiver) = AsyncDispatcher::new( + "collation_manager_async_dispatcher", + STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE, + ); let arc_dispatcher = Arc::new(dispatcher.clone()); // create state node adapter @@ -304,8 +297,8 @@ where #[cfg(any(test, feature = "test"))] test_validators_keypairs, }; - AsyncQueuedDispatcher::run(processor, receiver); - tracing::trace!(target: tracing_targets::COLLATION_MANAGER, "Tasks queue dispatcher started"); + arc_dispatcher.run(Arc::new(processor), spawn_receiver); + tracing::trace!(target: tracing_targets::COLLATION_MANAGER, "Tasks dispatchers started"); // start other async processes @@ -317,9 +310,7 @@ where tokio::time::Duration::from_secs(10), || async move { arc_dispatcher - .enqueue_task(method_to_async_task_closure!( - check_refresh_collation_sessions, - )) + .spawn_task(method_to_async_closure!(check_refresh_collation_sessions,)) .await }, "CollationProcessor::check_refresh_collation_sessions()".into(), @@ -385,17 +376,17 @@ where // when state received execute master block processing routines let mpool_adapter = self.mpool_adapter.clone(); - let dispatcher = self.dispatcher.clone(); tracing::info!( target: tracing_targets::COLLATION_MANAGER, "Processing requested mc state for block ({})...", mc_data.block_id.as_short_id() ); + Self::notify_mempool_about_mc_block(mpool_adapter, &mc_data.block_id).await?; - dispatcher - .enqueue_task(method_to_async_task_closure!( + self.dispatcher + .spawn_task(method_to_async_closure!( refresh_collation_sessions, mc_data )) @@ -840,7 +831,7 @@ where self.collation_sessions_to_finish .insert(finish_key, session_info.clone()); self.dispatcher - .enqueue_task(method_to_async_task_closure!( + .spawn_task(method_to_async_closure!( finish_collation_session, session_info, finish_key @@ -950,7 +941,7 @@ where let status = validator.validate(session_seqno, &block_id).await.unwrap(); _ = dispatcher - .enqueue_task(method_to_async_task_closure!( + .spawn_task(method_to_async_closure!( process_validated_block, block_id, status @@ -991,7 +982,7 @@ where Self::notify_mempool_about_mc_block(self.mpool_adapter.clone(), &block_id).await?; self.dispatcher - .enqueue_task(method_to_async_task_closure!( + .spawn_task(method_to_async_closure!( refresh_collation_sessions, mc_data )) @@ -1708,7 +1699,6 @@ where // send all shard and master blocks self.send_blocks_to_sync( - (*self.dispatcher).clone(), self.mq_adapter.clone(), self.state_node_adapter.clone(), blocks_to_send, @@ -1770,7 +1760,6 @@ where /// 5. Return `Error` if it seems to be unrecoverable async fn send_blocks_to_sync( &self, - dispatcher: AsyncQueuedDispatcher, mq_adapter: Arc, state_node_adapter: Arc, mut blocks_to_send: Vec, diff --git a/collator/src/utils/async_dispatcher.rs b/collator/src/utils/async_dispatcher.rs new file mode 100644 index 000000000..d2b087386 --- /dev/null +++ b/collator/src/utils/async_dispatcher.rs @@ -0,0 +1,115 @@ +use std::pin::Pin; +use std::sync::Arc; + +use anyhow::{anyhow, Result}; +use futures_util::future::Future; +use futures_util::stream::FuturesUnordered; +use futures_util::StreamExt; +use tokio::sync::mpsc; + +pub const STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE: usize = 100; + +pub type Task = Box) -> Fut + Send>; +pub type Fut = Pin> + Send>>; + +pub struct AsyncDispatcher { + descr: String, + spawn_sender: mpsc::Sender>, +} + +impl Clone for AsyncDispatcher { + fn clone(&self) -> Self { + Self { + descr: self.descr.clone(), + spawn_sender: self.spawn_sender.clone(), + } + } +} + +impl AsyncDispatcher +where + W: Send + Sync + 'static, +{ + pub fn new(descr: &str, queue_buffer_size: usize) -> (Self, mpsc::Receiver>) { + let (spawn_sender, spawn_receiver) = mpsc::channel::>(queue_buffer_size); + let dispatcher = Self { + descr: descr.to_owned(), + spawn_sender, + }; + (dispatcher, spawn_receiver) + } + + pub fn run(&self, worker: Arc, mut spawn_receiver: mpsc::Receiver>) { + let dispatcher_descr = self.descr.clone(); + tokio::spawn(async move { + let mut futures = FuturesUnordered::new(); + loop { + tokio::select! { + task_opt = spawn_receiver.recv() => match task_opt { + Some(task) => { + let worker = worker.clone(); + futures.push(async move { task(worker).await }); + } + None => { + panic!("async dispatcher: {}: tasks spawn channel closed!", dispatcher_descr) + } + }, + task_res = async { + if futures.is_empty() { + futures_util::future::pending::>().await + } else { + futures.next().await.unwrap() + } + } => { + if let Err(err) = task_res { + panic!( + "async dispatcher: {}: task result error! {:?}", + dispatcher_descr, err, + ) + } + } + } + } + }); + } + + pub async fn spawn_task(&self, task: AsyncTask) -> Result<()> + where + AsyncTask: FnOnce(Arc) -> Fut + Send + 'static, + { + self.spawn_sender + .send(Box::new(task)) + .await + .map_err(|err| { + anyhow!( + "async dispatcher: {}: tasks receiver dropped {err:?}", + self.descr, + ) + })?; + Ok(()) + } + + pub fn spawn_task_blocking(&self, task: AsyncTask) -> Result<()> + where + AsyncTask: FnOnce(Arc) -> Fut + Send + 'static, + { + self.spawn_sender + .blocking_send(Box::new(task)) + .map_err(|err| { + anyhow!( + "async dispatcher: {}: tasks receiver dropped {err:?}", + self.descr, + ) + })?; + Ok(()) + } +} + +#[macro_export] +macro_rules! method_to_async_closure { + ($method:ident, $($arg:expr),*) => { + move |worker| { + Box::pin(async move { worker.$method($($arg),*).await }) + } + }; +} diff --git a/collator/src/utils/async_queued_dispatcher.rs b/collator/src/utils/async_queued_dispatcher.rs index a310522d4..0e55edd46 100644 --- a/collator/src/utils/async_queued_dispatcher.rs +++ b/collator/src/utils/async_queued_dispatcher.rs @@ -7,7 +7,7 @@ use tokio::sync::{mpsc, oneshot}; use super::task_descr::{TaskDesc, TaskResponder}; use crate::tracing_targets; -pub const STANDARD_DISPATCHER_QUEUE_BUFFER_SIZE: usize = 100; +pub const STANDARD_QUEUED_DISPATCHER_BUFFER_SIZE: usize = 100; type AsyncTaskDesc = TaskDesc< dyn FnOnce(W) -> Pin)> + Send>> + Send, @@ -189,7 +189,7 @@ where } #[macro_export] -macro_rules! method_to_async_task_closure { +macro_rules! method_to_queued_async_closure { ($method:ident, $($arg:expr),*) => { (stringify!($method), #[allow(unused_mut)] @@ -205,7 +205,7 @@ macro_rules! method_to_async_task_closure { #[cfg(test)] #[tokio::test] async fn test() { - use crate::method_to_async_task_closure; + use crate::method_to_queued_async_closure; struct Worker {} impl Worker { @@ -220,7 +220,7 @@ async fn test() { // use marco to just call a worker method let _ = dispatcher - .enqueue_task(method_to_async_task_closure!(action, "test1")) + .enqueue_task(method_to_queued_async_closure!(action, "test1")) .await; // or build a closure by yourself diff --git a/collator/src/utils/mod.rs b/collator/src/utils/mod.rs index b8538dbc0..0b820a7cb 100644 --- a/collator/src/utils/mod.rs +++ b/collator/src/utils/mod.rs @@ -1,4 +1,5 @@ mod async_action; +pub mod async_dispatcher; pub mod async_queued_dispatcher; mod enum_try_into; pub mod shard; From 2ccaa1672d90eb7a4829d7c49bbb0db574564763 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Fri, 28 Jun 2024 10:20:40 +0000 Subject: [PATCH 3/8] refactor(collator): removed some clippy warnings --- collator/src/collator/types.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index 883cbfc60..0e9866e0b 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -831,7 +831,7 @@ impl ShardAccountStuff { ); anyhow::ensure!( - old_lib_descr.publishers.get(&self.account_addr)?.is_none(), + old_lib_descr.publishers.get(self.account_addr)?.is_none(), "cannot add public library {key} of account {} because this public library's \ LibDescr record already lists this account as a publisher", self.account_addr, From 9f6162345cab4ae05616a9c66cbf3378e7ac5623 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Fri, 28 Jun 2024 12:10:36 +0000 Subject: [PATCH 4/8] fix(tycho-collator): fixed tests and clippy warnings --- .../src/collator/tests/do_collate_tests.rs | 20 +++++++++++++++++++ collator/src/collator/types.rs | 1 - 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/collator/src/collator/tests/do_collate_tests.rs b/collator/src/collator/tests/do_collate_tests.rs index 994bd3ee1..cbd1022bc 100644 --- a/collator/src/collator/tests/do_collate_tests.rs +++ b/collator/src/collator/tests/do_collate_tests.rs @@ -8,6 +8,26 @@ use crate::collator::CollatorStdImpl; use crate::mempool::make_stub_anchor; use crate::test_utils::try_init_test_tracing; +fn get_test_block_limits() -> BlockLimits { + BlockLimits { + bytes: everscale_types::models::BlockParamLimits { + underload: 1_000_000, + soft_limit: 2_000_000, + hard_limit: 3_000_000, + }, + gas: everscale_types::models::BlockParamLimits { + underload: 1_000_000, + soft_limit: 2_000_000, + hard_limit: 3_000_000, + }, + lt_delta: everscale_types::models::BlockParamLimits { + underload: 1_000_000, + soft_limit: 2_000_000, + hard_limit: 3_000_000, + }, + } +} + #[test] fn test_read_next_externals() { try_init_test_tracing(tracing_subscriber::filter::LevelFilter::TRACE); diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index 0e9866e0b..aba43a65c 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -500,7 +500,6 @@ impl BlockLimitStats { #[derive(Debug, Clone, Copy, Eq, Ord, PartialEq, PartialOrd)] pub enum BlockLimitsLevel { - Underload, Soft, Hard, } From da9838887306ffdb410756b0e6f9f8a80cf91115 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Fri, 28 Jun 2024 12:11:23 +0000 Subject: [PATCH 5/8] refactor(collation-manager): simplified queued dispatcher --- collator/src/utils/async_queued_dispatcher.rs | 20 +++++++++++--- collator/src/utils/task_descr.rs | 26 ++++++++++++------- 2 files changed, 32 insertions(+), 14 deletions(-) diff --git a/collator/src/utils/async_queued_dispatcher.rs b/collator/src/utils/async_queued_dispatcher.rs index 0e55edd46..1e7e0b6aa 100644 --- a/collator/src/utils/async_queued_dispatcher.rs +++ b/collator/src/utils/async_queued_dispatcher.rs @@ -1,7 +1,11 @@ use std::future::Future; use std::pin::Pin; +use std::sync::atomic::Ordering; +use std::sync::Arc; +use std::usize; use anyhow::{anyhow, Result}; +use metrics::atomics::AtomicU64; use tokio::sync::{mpsc, oneshot}; use super::task_descr::{TaskDesc, TaskResponder}; @@ -15,12 +19,14 @@ type AsyncTaskDesc = TaskDesc< >; pub struct AsyncQueuedDispatcher { + task_id_counter: Arc, tasks_queue: mpsc::Sender>, } impl Clone for AsyncQueuedDispatcher { fn clone(&self) -> Self { Self { + task_id_counter: self.task_id_counter.clone(), tasks_queue: self.tasks_queue.clone(), } } @@ -34,6 +40,7 @@ where pub fn new(queue_buffer_size: usize) -> (Self, mpsc::Receiver>) { let (sender, receiver) = mpsc::channel::>(queue_buffer_size); let dispatcher = Self { + task_id_counter: Arc::new(AtomicU64::default()), tasks_queue: sender, }; (dispatcher, receiver) @@ -85,6 +92,7 @@ where Self::run(worker, receiver); Self { + task_id_counter: Arc::new(AtomicU64::default()), tasks_queue: sender, } } @@ -121,7 +129,8 @@ where impl FnOnce(W) -> Pin)> + Send>> + Send + 'static, ), ) -> Result<()> { - let task = AsyncTaskDesc::::create(task_descr, Box::new(task_fn)); + let id = self.task_id_counter.fetch_add(1, Ordering::Release); + let task = AsyncTaskDesc::::create(id, task_descr, Box::new(task_fn)); self._enqueue_task(task).await } @@ -132,7 +141,8 @@ where impl FnOnce(W) -> Pin)> + Send>> + Send + 'static, ), ) -> Result<()> { - let task = AsyncTaskDesc::::create(task_descr, Box::new(task_fn)); + let id = self.task_id_counter.fetch_add(1, Ordering::Release); + let task = AsyncTaskDesc::::create(id, task_descr, Box::new(task_fn)); self._enqueue_task_blocking(task) } @@ -143,8 +153,9 @@ where impl FnOnce(W) -> Pin)> + Send>> + Send + 'static, ), ) -> Result>> { + let id = self.task_id_counter.fetch_add(1, Ordering::Release); let (task, receiver) = - AsyncTaskDesc::::create_with_responder(task_descr, Box::new(task_fn)); + AsyncTaskDesc::::create_with_responder(id, task_descr, Box::new(task_fn)); let (task_id, task_descr) = (task.id(), task.get_descr()); tracing::trace!( target: tracing_targets::ASYNC_QUEUE_DISPATCHER, @@ -169,8 +180,9 @@ where impl FnOnce(W) -> Pin)> + Send>> + Send + 'static, ), ) -> Result { + let id = self.task_id_counter.fetch_add(1, Ordering::Release); let (task, receiver) = - AsyncTaskDesc::::create_with_responder(task_descr, Box::new(task_fn)); + AsyncTaskDesc::::create_with_responder(id, task_descr, Box::new(task_fn)); let (task_id, task_descr) = (task.id(), task.get_descr()); tracing::trace!( target: tracing_targets::ASYNC_QUEUE_DISPATCHER, diff --git a/collator/src/utils/task_descr.rs b/collator/src/utils/task_descr.rs index 7596fd6e6..6fa192e88 100644 --- a/collator/src/utils/task_descr.rs +++ b/collator/src/utils/task_descr.rs @@ -1,34 +1,33 @@ use std::future::Future; use tokio::sync::oneshot; -use tycho_util::time::now_millis; pub struct TaskDesc { id: u64, descr: String, - closure: Box, // closure for execution - _creation_time: std::time::SystemTime, // time of task creation + closure: Box, // closure for execution responder: Option>, } impl TaskDesc { - pub fn create(descr: &str, closure: Box) -> Self { - // TODO: better to use global atomic counter + pub fn create(id: u64, descr: &str, closure: Box) -> Self { Self { - id: now_millis(), + id, descr: descr.into(), closure, - _creation_time: std::time::SystemTime::now(), responder: None, } } - pub fn create_with_responder(descr: &str, closure: Box) -> (Self, oneshot::Receiver) { + pub fn create_with_responder( + id: u64, + descr: &str, + closure: Box, + ) -> (Self, oneshot::Receiver) { let (sender, receiver) = oneshot::channel::(); let task = Self { - id: now_millis(), + id, descr: descr.into(), closure, - _creation_time: std::time::SystemTime::now(), responder: Some(sender), }; (task, receiver) @@ -189,6 +188,7 @@ mod tests { #[test] fn void_task_without_responder() { let task = TaskDesc::create( + 1, "task descr", Box::new(|| { println!("task executed"); @@ -211,6 +211,7 @@ mod tests { #[tokio::test] async fn void_task_with_responder() { let (task, receiver) = TaskDesc::create_with_responder( + 1, "task descr", Box::new(|| { println!("task executed"); @@ -238,6 +239,7 @@ mod tests { #[test] fn returning_task_without_responder() { let task = TaskDesc::create( + 1, "task descr", Box::new(|| { println!("task executed"); @@ -264,6 +266,7 @@ mod tests { #[tokio::test] async fn returning_task_with_responder() { let (task, receiver) = TaskDesc::create_with_responder( + 1, "task descr", Box::new(|| { println!("task executed"); @@ -302,6 +305,7 @@ mod tests { async fn returning_task_with_responder_and_dropped_receiver() { let task = { let (task, _receiver) = TaskDesc::create_with_responder( + 1, "task descr", Box::new(|| { println!("task executed"); @@ -339,6 +343,7 @@ mod tests { #[tokio::test] async fn async_void_task_with_responder() { let (task, receiver) = TaskDesc::create_with_responder( + 1, "task descr", Box::new(|| { println!("task executed"); @@ -368,6 +373,7 @@ mod tests { #[tokio::test] async fn returning_void_task_with_responder() { let (task, receiver) = TaskDesc::create_with_responder( + 1, "task descr", Box::new(|| { println!("task executed"); From e8f8fe81f4616184c2ebf2ee731da68c806924e5 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Mon, 15 Jul 2024 14:39:38 +0000 Subject: [PATCH 6/8] refactor(collation-manager): removed unused implementations --- collator/src/manager/mod.rs | 65 +------------------------------------ 1 file changed, 1 insertion(+), 64 deletions(-) diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 047c13bc9..428c0f582 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -110,17 +110,6 @@ where } } -#[async_trait] -impl MempoolEventListener for CollationManager -where - CF: CollatorFactory, - V: Validator, -{ - async fn on_new_anchor(&self, anchor: Arc) -> Result<()> { - self.process_new_anchor_from_mempool(anchor).await - } -} - #[async_trait] impl StateNodeEventListener for AsyncDispatcher> where @@ -148,32 +137,6 @@ where } } -#[async_trait] -impl StateNodeEventListener for CollationManager -where - CF: CollatorFactory, - V: Validator, -{ - async fn on_block_accepted(&self, _block_id: &BlockId) -> Result<()> { - // TODO: remove accepted block from cache - // STUB: do nothing, currently we remove block from cache when it sent to state node - Ok(()) - } - - async fn on_block_accepted_external(&self, state: &ShardStateStuff) -> Result<()> { - // TODO: should store block info from blockchain if it was not already collated - // and validated by ourself. Will use this info for faster validation further: - // will consider that just collated block is already validated if it have the - // same root hash and file hash - if state.block_id().is_masterchain() { - let mc_data = McData::load_from_state(state)?; - self.process_mc_block_from_bc(mc_data).await - } else { - Ok(()) - } - } -} - #[async_trait] impl CollatorEventListener for AsyncDispatcher> where @@ -209,32 +172,6 @@ where } } -#[async_trait] -impl CollatorEventListener for CollationManager -where - CF: CollatorFactory, - V: Validator, -{ - async fn on_skipped_anchor( - &self, - next_block_id_short: BlockIdShort, - anchor: Arc, - force_mc_block: bool, - ) -> Result<()> { - self.process_skipped_anchor(next_block_id_short, anchor, force_mc_block) - .await - } - - async fn on_block_candidate(&self, collation_result: BlockCollationResult) -> Result<()> { - self.process_collated_block_candidate(collation_result) - .await - } - - async fn on_collator_stopped(&self, stop_key: CollationSessionId) -> Result<()> { - self.process_collator_stopped(stop_key).await - } -} - impl CollationManager where CF: CollatorFactory, @@ -430,7 +367,7 @@ where } else { // When we do not have last own collated master block then check last processed master block // If None then we should process incoming master block anyway to init collation process - // If we have already processed some previous incoming master block and colaltions were started + // If we have already processed some previous incoming master block and collations were started // then we should wait for the first own collated master block // but not more then `max_mc_block_delta_from_bc_to_await_own` if last_processed_mc_block_id_opt.is_some() { From 69a1df1fe8905b6f675c4ffda5c051aad5af7677 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Fri, 26 Jul 2024 04:39:15 +0000 Subject: [PATCH 7/8] fix(collation-manager): fix deadlocks --- Cargo.toml | 2 +- collator/src/manager/mod.rs | 69 ++++++++++++++----------------------- 2 files changed, 27 insertions(+), 44 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2862ba18c..033eaffce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -55,7 +55,7 @@ libc = "0.2" metrics = "0.23" metrics-exporter-prometheus = "0.15" moka = { version = "0.12", features = ["sync"] } -parking_lot = "0.12.1" +parking_lot = { version = "0.12.1", features = ["send_guard"] } parking_lot_core = "0.9.9" pin-project-lite = "0.2" pkcs8 = "0.10" diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 428c0f582..bfcc5c009 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -11,7 +11,7 @@ use parking_lot::{Mutex, RwLock}; use tycho_block_util::block::ValidatorSubsetInfo; use tycho_block_util::state::ShardStateStuff; use tycho_util::metrics::HistogramGuard; -use tycho_util::{DashMapEntry, FastDashMap, FastHashMap, FastHashSet}; +use tycho_util::{FastDashMap, FastHashMap, FastHashSet}; use self::types::{ BlockCacheKey, BlockCandidateContainer, BlockCandidateToSend, BlocksCache, ChainTimesSyncState, @@ -322,12 +322,7 @@ where Self::notify_mempool_about_mc_block(mpool_adapter, &mc_data.block_id).await?; - self.dispatcher - .spawn_task(method_to_async_closure!( - refresh_collation_sessions, - mc_data - )) - .await?; + self.refresh_collation_sessions(mc_data).await?; Ok(()) } @@ -627,10 +622,9 @@ where for (shard_id, _) in sessions_to_keep { // if there is no active collator then current node does not collate this shard // so we do not need to do anything - let Some(collator) = self.active_collators.get(&shard_id) else { + let Some(collator) = self.active_collators.get(&shard_id).map(|r| r.clone()) else { continue; }; - let collator = collator.value().clone(); if shard_id.is_masterchain() { tracing::debug!( @@ -705,7 +699,7 @@ where if let Some(_local_pubkey) = local_pubkey_opt { let prev_seqno = prev_blocks_ids.iter().map(|b| b.seqno).max().unwrap_or(0); - if let DashMapEntry::Vacant(entry) = self.active_collators.entry(shard_id) { + if !self.active_collators.contains_key(&shard_id) { tracing::info!( target: tracing_targets::COLLATION_MANAGER, "There is no active collator for collation session {}. Will start it", @@ -725,7 +719,8 @@ where mc_data: mc_data.clone(), }) .await; - entry.insert(Arc::new(collator)); + + self.active_collators.insert(shard_id, Arc::new(collator)); } // notify validator, it will start overlay initialization @@ -767,12 +762,7 @@ where for (finish_key, session_info) in to_finish_sessions { self.collation_sessions_to_finish .insert(finish_key, session_info.clone()); - self.dispatcher - .spawn_task(method_to_async_closure!( - finish_collation_session, - session_info, - finish_key - )) + self.finish_collation_session(session_info, finish_key) .await?; } @@ -846,14 +836,16 @@ where let block_id = *collation_result.candidate.block.id(); // find session related to this block by shard - let session_info = self + let Some(session_info) = self .active_collation_sessions .read() .get(&block_id.shard) - .ok_or(anyhow!( + .cloned() + else { + anyhow::bail!( "There is no active collation session for the shard that block belongs to" - ))? - .clone(); + ); + }; let candidate_chain_time = collation_result.candidate.chain_time; @@ -918,12 +910,7 @@ where Self::notify_mempool_about_mc_block(self.mpool_adapter.clone(), &block_id).await?; - self.dispatcher - .spawn_task(method_to_async_closure!( - refresh_collation_sessions, - mc_data - )) - .await?; + self.refresh_collation_sessions(mc_data).await?; } } else { // when candidate is shard @@ -1214,7 +1201,11 @@ where let _histogram = HistogramGuard::begin("tycho_collator_enqueue_mc_block_collation_time"); // get masterchain collator if exists - let Some(mc_collator) = self.active_collators.get(&ShardIdent::MASTERCHAIN) else { + let Some(mc_collator) = self + .active_collators + .get(&ShardIdent::MASTERCHAIN) + .map(|r| r.clone()) + else { bail!("Masterchain collator is not started yet!"); }; @@ -1246,7 +1237,7 @@ where async fn enqueue_try_collate(&self, shard_id: &ShardIdent) -> Result<()> { // get collator if exists - let Some(collator) = self.active_collators.get(shard_id) else { + let Some(collator) = self.active_collators.get(shard_id).map(|r| r.clone()) else { tracing::warn!( target: tracing_targets::COLLATION_MANAGER, "Node does not collate blocks for shard {}", @@ -1425,7 +1416,7 @@ where /// Find all shard blocks that form master block subgraph. /// Then extract and return them if all are valid - async fn extract_mc_block_subgraph_if_valid( + fn extract_mc_block_subgraph_if_valid( &self, block_id: &BlockId, ) -> Result> { @@ -1504,8 +1495,7 @@ where if not_all_blocks_valid { let mut blocks_to_restore = vec![subgraph.mc_block]; blocks_to_restore.append(&mut subgraph.shard_blocks); - self.restore_blocks_in_cache_async(blocks_to_restore) - .await?; + self.restore_blocks_in_cache(blocks_to_restore)?; return Ok(None); } @@ -1524,7 +1514,7 @@ where } /// Remove block entries from cache and compact cache - async fn cleanup_blocks_from_cache(&self, blocks_keys: Vec) -> Result<()> { + fn cleanup_blocks_from_cache(&self, blocks_keys: Vec) -> Result<()> { tracing::debug!( target: tracing_targets::COLLATION_MANAGER, "Cleaning up blocks from cache: {:?}", @@ -1554,13 +1544,6 @@ where Ok(()) } - async fn restore_blocks_in_cache_async( - &self, - blocks_to_restore: Vec, - ) -> Result<()> { - self.restore_blocks_in_cache(blocks_to_restore) - } - /// Find and restore block entries in cache updating sync statuses fn restore_blocks_in_cache(&self, blocks_to_restore: Vec) -> Result<()> { let _tracing_blocks_descr = blocks_to_restore @@ -1626,7 +1609,7 @@ where let mut sync_elapsed = Default::default(); // extract master block with all shard blocks if valid, and process them - if let Some(mc_block_subgraph) = self.extract_mc_block_subgraph_if_valid(block_id).await? { + if let Some(mc_block_subgraph) = self.extract_mc_block_subgraph_if_valid(block_id)? { extract_elapsed = histogram_extract.finish(); let timer = std::time::Instant::now(); @@ -1795,7 +1778,7 @@ where "All blocks were successfully sent to sync. Will cleanup them from cache: {:?}", _tracing_sent_blocks_descr.as_slice(), ); - self.cleanup_blocks_from_cache(sent_blocks_keys).await?; + self.cleanup_blocks_from_cache(sent_blocks_keys)?; } commit_diffs_elapsed = histogram.finish(); @@ -1808,7 +1791,7 @@ where _tracing_blocks_to_send_descr.as_slice(), ); // queue blocks restore task - self.restore_blocks_in_cache_async(blocks_to_send).await?; + self.restore_blocks_in_cache(blocks_to_send)?; // TODO: should implement resending for restored blocks } From e0c07703557b8ae8a7530fd4abff39bbb89301f2 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Sun, 28 Jul 2024 08:48:35 +0000 Subject: [PATCH 8/8] fix(collation-manager): update mc_block_latest_chain_time when it calculated --- collator/src/manager/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index bfcc5c009..fd08e65a8 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -1110,6 +1110,7 @@ where will collate next master block with chain time {}ms", mc_block_min_interval_ms, max_first_chain_time_that_elapsed, ); + chain_times_guard.mc_block_latest_chain_time = max_first_chain_time_that_elapsed; return ( should_collate_mc_block, Some(max_first_chain_time_that_elapsed),