From b21fa57fb84bda5f7b2d298af9e4a3a872bc8a83 Mon Sep 17 00:00:00 2001 From: serejkaaa512 <5125402@mail.ru> Date: Tue, 9 Jul 2024 13:12:36 +0300 Subject: [PATCH] feat(collator): add import externals on hot start --- collator/src/collator/mod.rs | 127 +++++++++++++++++++++++++++------ collator/src/collator/types.rs | 7 +- 2 files changed, 108 insertions(+), 26 deletions(-) diff --git a/collator/src/collator/mod.rs b/collator/src/collator/mod.rs index 54f77b18f..eb0d44d20 100644 --- a/collator/src/collator/mod.rs +++ b/collator/src/collator/mod.rs @@ -321,6 +321,20 @@ impl CollatorStdImpl { ); let working_state = Self::build_and_validate_working_state(mc_state, prev_states, &self.state_tracker)?; + if let Some(anchor_id) = working_state + .prev_shard_data + .processed_upto() + .externals + .as_ref() + .map(|upto| upto.processed_to.0) + { + self.import_anchors_on_hot_start( + anchor_id, + working_state.prev_shard_data.gen_chain_time(), + ) + .await?; + } + self.set_working_state(working_state); self.timer = std::time::Instant::now(); @@ -480,8 +494,6 @@ impl CollatorStdImpl { /// /// Returns: (`next_anchor`, `has_externals`) async fn import_next_anchor(&mut self) -> Result<(Arc, bool)> { - // TODO: make real implementation - let labels = [("workchain", self.shard_id.workchain().to_string())]; let _histogram = @@ -489,26 +501,10 @@ impl CollatorStdImpl { let timer = std::time::Instant::now(); - // TODO: use get_next_anchor() only once - let next_anchor = if let Some(prev_anchor_id) = self.last_imported_anchor_id { - self.mpool_adapter.get_next_anchor(prev_anchor_id).await? - } else { - let anchor_id_opt = self - .working_state() - .prev_shard_data - .processed_upto() - .externals - .as_ref() - .map(|upto| upto.processed_to.0); - match anchor_id_opt { - Some(anchor_id) => self - .mpool_adapter - .get_anchor_by_id(anchor_id) - .await? - .unwrap(), - None => self.mpool_adapter.get_next_anchor(0).await?, - } - }; + let next_anchor = self + .mpool_adapter + .get_next_anchor(self.last_imported_anchor_id.unwrap_or_default()) + .await?; let externals_count = next_anchor.externals_count_for(&self.shard_id); let has_externals = externals_count > 0; @@ -543,6 +539,91 @@ impl CollatorStdImpl { Ok((next_anchor, has_externals)) } + /// 1. Get anchor from `externals_processed_upto` + /// 2. Get next anchors until last_chain_time + /// 3. Store anchors in cache and return it + /// + /// Returns: (`next_anchor`, `has_externals`) + async fn import_anchors_on_hot_start( + &mut self, + processed_upto_anchor_id: u32, + last_block_chain_time: u64, + ) -> Result<()> { + let labels = [("workchain", self.shard_id.workchain().to_string())]; + + let _histogram = HistogramGuardWithLabels::begin( + "tycho_collator_import_next_anchors_hot_start_time", + &labels, + ); + + let timer = std::time::Instant::now(); + + let mut next_anchor = self + .mpool_adapter + .get_anchor_by_id(processed_upto_anchor_id) + .await? + .unwrap(); + + let mut anchors_id = vec![]; + let mut anchors_chain_time = vec![]; + let mut anchors_external_count = vec![]; + let mut externals_count = next_anchor.externals_count_for(&self.shard_id); + let has_externals = externals_count > 0; + if has_externals { + self.has_pending_externals = true; + } + + let mut next_anchor_chain_time = next_anchor.chain_time(); + let mut last_anchor_id = processed_upto_anchor_id; + self.anchors_cache + .push_back((processed_upto_anchor_id, CachedMempoolAnchor { + anchor: next_anchor.clone(), + has_externals, + })); + + anchors_id.push(last_anchor_id); + anchors_chain_time.push(next_anchor_chain_time); + anchors_external_count.push(externals_count); + while last_block_chain_time > next_anchor_chain_time { + next_anchor = self.mpool_adapter.get_next_anchor(last_anchor_id).await?; + + externals_count = next_anchor.externals_count_for(&self.shard_id); + let has_externals = externals_count > 0; + if has_externals { + self.has_pending_externals = true; + } + + next_anchor_chain_time = next_anchor.chain_time(); + last_anchor_id = next_anchor.id(); + self.anchors_cache + .push_back((processed_upto_anchor_id, CachedMempoolAnchor { + anchor: next_anchor.clone(), + has_externals, + })); + + anchors_id.push(last_anchor_id); + anchors_chain_time.push(next_anchor_chain_time); + anchors_external_count.push(externals_count); + } + + metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels) + .increment(anchors_external_count.iter().sum::() as u64); + + self.last_imported_anchor_id = Some(last_anchor_id); + self.last_imported_anchor_chain_time = Some(next_anchor_chain_time); + self.last_imported_anchor_author = Some(next_anchor.author()); + + tracing::debug!(target: tracing_targets::COLLATOR, + elapsed = timer.elapsed().as_millis(), + "imported next anchors on hot start (ids: {:?}, chain_times: {:?}, externals: {:?})", + anchors_id, + anchors_chain_time, + anchors_external_count, + ); + + Ok(()) + } + fn get_last_imported_anchor_chain_time(&self) -> u64 { self.last_imported_anchor_chain_time.unwrap() } @@ -656,7 +737,7 @@ impl CollatorStdImpl { tracing::info!(target: tracing_targets::COLLATOR, "there are unprocessed internals from previous block, will collate next block", ); - let next_chain_time = self.working_state().prev_shard_data.gen_chain_time() as u64; + let next_chain_time = self.working_state().prev_shard_data.gen_chain_time(); self.do_collate(next_chain_time, None).await?; } else { // otherwise import next anchor and return it notify to manager diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index 474b4cec6..b4d7fa552 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -188,7 +188,7 @@ pub(super) struct PrevData { pure_states: Vec, pure_state_root: Cell, - gen_chain_time: u32, + gen_chain_time: u64, gen_lt: u64, total_validator_fees: CurrencyCollection, gas_used_from_last_anchor: u64, @@ -221,6 +221,7 @@ impl PrevData { )?]; let gen_utime = observable_states[0].state().gen_utime; + let gen_utime_ms = observable_states[0].state().gen_utime_ms; let gen_lt = observable_states[0].state().gen_lt; let observable_accounts = observable_states[0].state().load_accounts()?; let total_validator_fees = observable_states[0].state().total_validator_fees.clone(); @@ -237,7 +238,7 @@ impl PrevData { pure_states: pure_prev_states, pure_state_root: pure_prev_state_root.clone(), - gen_chain_time: gen_utime, + gen_chain_time: gen_utime as u64 * 1000 + gen_utime_ms as u64, gen_lt, total_validator_fees, gas_used_from_last_anchor, @@ -295,7 +296,7 @@ impl PrevData { &self.pure_state_root } - pub fn gen_chain_time(&self) -> u32 { + pub fn gen_chain_time(&self) -> u64 { self.gen_chain_time }