diff --git a/collator/src/collator/do_collate.rs b/collator/src/collator/do_collate.rs index fd569710c..0614e4b8c 100644 --- a/collator/src/collator/do_collate.rs +++ b/collator/src/collator/do_collate.rs @@ -309,6 +309,9 @@ impl CollatorStdImpl { let read_ext_messages_elapsed = exec_manager.read_ext_messages_total_elapsed(); metrics::histogram!("tycho_do_collate_read_ext_msgs_time", labels) .record(read_ext_messages_elapsed); + let add_to_message_groups_elapsed = exec_manager.add_to_message_groups_total_elapsed(); + metrics::histogram!("tycho_do_collate_add_to_msg_groups_time", labels) + .record(add_to_message_groups_elapsed); metrics::histogram!("tycho_do_collate_exec_msgs_total_time", labels) .record(execute_msgs_total_elapsed); @@ -526,6 +529,7 @@ impl CollatorStdImpl { read_existing = %format_duration(read_existing_messages_elapsed), read_ext = %format_duration(read_ext_messages_elapsed), read_new = %format_duration(read_new_messages_elapsed), + add_to_groups = %format_duration(add_to_message_groups_elapsed), exec_msgs_total = %format_duration(execute_msgs_total_elapsed), process_txs_total = %format_duration(process_txs_total_elapsed), @@ -573,6 +577,8 @@ impl CollatorStdImpl { collation_data: &mut BlockCollationData, continue_from_read_to: bool, ) -> Result<(Vec>, bool)> { + let labels = [("workchain", shard_id.workchain().to_string())]; + tracing::info!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS, "shard: {}, count: {}", shard_id, count, ); @@ -649,7 +655,12 @@ impl CollatorStdImpl { let next_chain_time = collation_data.gen_utime as u64 * 1000 + collation_data.gen_utime_ms as u64; if next_chain_time - anchor.chain_time > expire_timeout { - let iter = anchor.iter_externals(0); + let iter_from = if key == was_read_to.0 { + was_read_to.1 as usize + } else { + 0 + }; + let iter = anchor.iter_externals(iter_from); let mut expired_msgs_count = 0; for ext_msg in iter { tracing::trace!(target: tracing_targets::COLLATOR, @@ -658,11 +669,12 @@ impl CollatorStdImpl { expired_msgs_count += 1; } - let labels = &[("workchain", shard_id.workchain().to_string())]; - metrics::counter!("tycho_do_collate_ext_msgs_expired_count", labels) + metrics::counter!("tycho_do_collate_ext_msgs_expired_count", &labels) .increment(expired_msgs_count); + metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels) + .decrement(expired_msgs_count as f64); - // skip and remove fully expired anchor + // skip and remove expired anchor let _ = anchors_cache.remove(next_idx); tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS, "anchor with key {} fully skipped due to expiration, removed from anchors cache", key, @@ -699,12 +711,14 @@ impl CollatorStdImpl { ); // get iterator and read messages + let mut msgs_read_from_last_anchor = 0; let mut msgs_collected_from_last_anchor = 0; let iter = anchor.iter_externals(msgs_read_offset_in_last_anchor as usize); for ext_msg in iter { tracing::trace!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS, "read ext_msg dst: {}", ext_msg.info.dst, ); + msgs_read_from_last_anchor += 1; if total_msgs_collected < count { msgs_read_offset_in_last_anchor += 1; } @@ -732,6 +746,10 @@ impl CollatorStdImpl { } } } + + metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels) + .decrement(msgs_read_from_last_anchor as f64); + tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS, "{} externals collected from anchor {}, msgs_read_offset_in_last_anchor: {}", msgs_collected_from_last_anchor, key, msgs_read_offset_in_last_anchor, diff --git a/collator/src/collator/execution_manager.rs b/collator/src/collator/execution_manager.rs index 326dd073c..ae3d5966d 100644 --- a/collator/src/collator/execution_manager.rs +++ b/collator/src/collator/execution_manager.rs @@ -51,6 +51,8 @@ pub(super) struct ExecutionManager { read_new_messages_total_elapsed: Duration, /// sum total time of reading external messages read_ext_messages_total_elapsed: Duration, + /// sum total time of adding messages to groups + add_to_message_groups_total_elapsed: Duration, } pub(super) struct MessagesExecutor { @@ -92,6 +94,7 @@ impl ExecutionManager { read_existing_messages_total_elapsed: Duration::ZERO, read_new_messages_total_elapsed: Duration::ZERO, read_ext_messages_total_elapsed: Duration::ZERO, + add_to_message_groups_total_elapsed: Duration::ZERO, } } @@ -106,6 +109,7 @@ impl ExecutionManager { self.read_existing_messages_total_elapsed = Duration::ZERO; self.read_new_messages_total_elapsed = Duration::ZERO; self.read_ext_messages_total_elapsed = Duration::ZERO; + self.add_to_message_groups_total_elapsed = Duration::ZERO; let current_iterator_positions = std::mem::take(&mut self.current_iterator_positions); QueueIteratorAdapter::new( @@ -137,6 +141,10 @@ impl ExecutionManager { self.read_ext_messages_total_elapsed } + pub fn add_to_message_groups_total_elapsed(&self) -> Duration { + self.add_to_message_groups_total_elapsed + } + #[tracing::instrument(skip_all)] pub async fn get_next_message_group( &mut self, @@ -166,6 +174,7 @@ impl ExecutionManager { } let timer = std::time::Instant::now(); + let mut add_to_groups_elapsed = Duration::ZERO; // read messages from iterator and fill messages groups // until the first group fully loaded @@ -176,6 +185,7 @@ impl ExecutionManager { existing_internals_read_count += 1; + let timer_add_to_groups = std::time::Instant::now(); self.message_groups.add_message(Box::new(ParsedMessage { info: MsgInfo::Int(int_msg.message_with_source.message.info.clone()), dst_in_current_shard: true, @@ -185,6 +195,7 @@ impl ExecutionManager { same_shard: int_msg.message_with_source.shard_id == self.shard_id, }), })); + add_to_groups_elapsed += timer_add_to_groups.elapsed(); if self.message_groups.messages_count() >= self.messages_buffer_limit { tracing::debug!(target: tracing_targets::COLLATOR, @@ -217,6 +228,8 @@ impl ExecutionManager { } self.read_existing_messages_total_elapsed += timer.elapsed(); + self.read_existing_messages_total_elapsed -= add_to_groups_elapsed; + self.add_to_message_groups_total_elapsed += add_to_groups_elapsed; // when message_groups buffer is empty and no more existing internals in current iterator // then set all read messages as processed @@ -257,6 +270,7 @@ impl ExecutionManager { if group_opt.is_none() && self.process_ext_messages && !self.process_new_messages { let timer = std::time::Instant::now(); + let mut add_to_groups_elapsed = Duration::ZERO; let mut externals_read_count = 0; while collator.has_pending_externals { @@ -269,9 +283,11 @@ impl ExecutionManager { externals_read_count += ext_msgs.len() as u64; + let timer_add_to_groups = std::time::Instant::now(); for ext_msg in ext_msgs { self.message_groups.add_message(ext_msg); } + add_to_groups_elapsed += timer_add_to_groups.elapsed(); if self.message_groups.messages_count() >= self.messages_buffer_limit { tracing::debug!(target: tracing_targets::COLLATOR, @@ -304,6 +320,8 @@ impl ExecutionManager { } self.read_ext_messages_total_elapsed += timer.elapsed(); + self.read_ext_messages_total_elapsed -= add_to_groups_elapsed; + self.add_to_message_groups_total_elapsed += add_to_groups_elapsed; if self.message_groups.is_empty() && !collator.has_pending_externals { tracing::debug!(target: tracing_targets::COLLATOR, @@ -332,14 +350,15 @@ impl ExecutionManager { .try_update_new_messages_read_to(max_new_message_key_to_current_shard)?; let timer = std::time::Instant::now(); + let mut add_to_groups_elapsed = Duration::ZERO; let mut new_internals_read_count = 0; while let Some(int_msg) = mq_iterator_adapter.next_new_message()? { assert!(int_msg.is_new); new_internals_read_count += 1; - collation_data.read_new_msgs_from_iterator += 1; + let timer_add_to_groups = std::time::Instant::now(); self.message_groups.add_message(Box::new(ParsedMessage { info: MsgInfo::Int(int_msg.message_with_source.message.info.clone()), dst_in_current_shard: true, @@ -347,6 +366,7 @@ impl ExecutionManager { special_origin: None, dequeued: None, })); + add_to_groups_elapsed += timer_add_to_groups.elapsed(); if self.message_groups.messages_count() >= self.messages_buffer_limit { tracing::debug!(target: tracing_targets::COLLATOR, @@ -382,6 +402,8 @@ impl ExecutionManager { } self.read_new_messages_total_elapsed += timer.elapsed(); + self.read_new_messages_total_elapsed -= add_to_groups_elapsed; + self.add_to_message_groups_total_elapsed += add_to_groups_elapsed; // actually, we process all message groups with new messages in one step, // so we update internals processed_upto each step diff --git a/collator/src/collator/mod.rs b/collator/src/collator/mod.rs index f07c0c201..358751ddd 100644 --- a/collator/src/collator/mod.rs +++ b/collator/src/collator/mod.rs @@ -513,10 +513,7 @@ impl CollatorStdImpl { ) -> Result<()> { let labels = [("workchain", self.shard_id.workchain().to_string())]; - let histogram = HistogramGuardWithLabels::begin( - "tycho_collator_import_next_anchors_on_init_time", - &labels, - ); + let timer = std::time::Instant::now(); let mut next_anchor = self .mpool_adapter @@ -524,9 +521,12 @@ impl CollatorStdImpl { .await? .unwrap(); + #[allow(dead_code)] #[derive(Debug)] struct AnchorInfo { - externals_count: usize, + id: MempoolAnchorId, + ct: u64, + externals: usize, } let mut anchors: Vec = vec![]; @@ -545,7 +545,11 @@ impl CollatorStdImpl { has_externals, })); - anchors.push(AnchorInfo { externals_count }); + anchors.push(AnchorInfo { + id: next_anchor.id, + ct: next_anchor.chain_time, + externals: externals_count, + }); while last_block_chain_time > next_anchor_chain_time { next_anchor = self.mpool_adapter.get_next_anchor(last_anchor_id).await?; @@ -563,19 +567,26 @@ impl CollatorStdImpl { has_externals, })); - anchors.push(AnchorInfo { externals_count }); + anchors.push(AnchorInfo { + id: next_anchor.id, + ct: next_anchor.chain_time, + externals: externals_count, + }); } + let imported_count: usize = anchors.iter().map(|a| a.externals).sum(); metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels) - .increment(anchors.iter().map(|a| a.externals_count).sum::() as u64); + .increment(imported_count as u64); + metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels) + .increment(imported_count as f64); self.last_imported_anchor_id = Some(last_anchor_id); self.last_imported_anchor_chain_time = Some(next_anchor_chain_time); self.last_imported_anchor_author = Some(next_anchor.author); tracing::debug!(target: tracing_targets::COLLATOR, - elapsed = histogram.finish().as_millis(), - "Collator (block_id={}): init: imported anchors on init ({:?})", + elapsed = timer.elapsed().as_millis(), + "Collator (block_id={}): init: imported anchors on init: {:?}", self.next_block_id_short, anchors.as_slice() ); diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index c97416971..cc15b0063 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -951,6 +951,11 @@ impl MessageGroups { loop { let group_entry = self.groups.entry(offset).or_default(); + if group_entry.is_full { + offset += 1; + continue; + } + let group_len = group_entry.inner.len(); match group_entry.inner.entry(account_id) { Entry::Vacant(entry) => { diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index eb220ccd3..fe42f3ba1 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -719,11 +719,21 @@ def collator_message_metrics() -> RowPanel: "All executed msgs count", labels_selectors=['workchain=~"$workchain"'], ), + create_gauge_panel( + "tycho_collator_ext_msgs_imported_queue_size", + "Ext msgs imported queue size", + labels=['workchain=~"$workchain"'], + ), create_counter_panel( "tycho_collator_ext_msgs_imported_count", "Imported Ext msgs count from mempool", labels_selectors=['workchain=~"$workchain"'], ), + create_counter_panel( + "tycho_do_collate_ext_msgs_expired_count", + "Ext msgs expired count", + labels_selectors=['workchain=~"$workchain"'], + ), create_counter_panel( "tycho_do_collate_msgs_read_count_ext", "Read Ext msgs count", @@ -739,11 +749,6 @@ def collator_message_metrics() -> RowPanel: "Ext msgs error count", labels_selectors=['workchain=~"$workchain"'], ), - create_counter_panel( - "tycho_do_collate_ext_msgs_expired_count", - "Ext msgs expired count", - labels_selectors=['workchain=~"$workchain"'], - ), create_counter_panel( "tycho_do_collate_msgs_read_count_int", "Read Int msgs count", @@ -862,6 +867,11 @@ def collator_core_operations_metrics() -> RowPanel: "Execution time: incl Fill messages: read new", labels=['workchain=~"$workchain"'], ), + create_heatmap_panel( + "tycho_do_collate_add_to_msg_groups_time", + "Execution time: incl Fill messages: add to msg groups", + labels=['workchain=~"$workchain"'], + ), create_heatmap_panel( "tycho_do_collate_exec_msgs_total_time", "Execution time: incl Execute messages",