diff --git a/collator/src/collator/do_collate.rs b/collator/src/collator/do_collate.rs index 0f5e8dd34..0614e4b8c 100644 --- a/collator/src/collator/do_collate.rs +++ b/collator/src/collator/do_collate.rs @@ -577,6 +577,8 @@ impl CollatorStdImpl { collation_data: &mut BlockCollationData, continue_from_read_to: bool, ) -> Result<(Vec>, bool)> { + let labels = [("workchain", shard_id.workchain().to_string())]; + tracing::info!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS, "shard: {}, count: {}", shard_id, count, ); @@ -653,7 +655,12 @@ impl CollatorStdImpl { let next_chain_time = collation_data.gen_utime as u64 * 1000 + collation_data.gen_utime_ms as u64; if next_chain_time - anchor.chain_time > expire_timeout { - let iter = anchor.iter_externals(0); + let iter_from = if key == was_read_to.0 { + was_read_to.1 as usize + } else { + 0 + }; + let iter = anchor.iter_externals(iter_from); let mut expired_msgs_count = 0; for ext_msg in iter { tracing::trace!(target: tracing_targets::COLLATOR, @@ -662,11 +669,12 @@ impl CollatorStdImpl { expired_msgs_count += 1; } - let labels = &[("workchain", shard_id.workchain().to_string())]; - metrics::counter!("tycho_do_collate_ext_msgs_expired_count", labels) + metrics::counter!("tycho_do_collate_ext_msgs_expired_count", &labels) .increment(expired_msgs_count); + metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels) + .decrement(expired_msgs_count as f64); - // skip and remove fully expired anchor + // skip and remove expired anchor let _ = anchors_cache.remove(next_idx); tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS, "anchor with key {} fully skipped due to expiration, removed from anchors cache", key, @@ -703,12 +711,14 @@ impl CollatorStdImpl { ); // get iterator and read messages + let mut msgs_read_from_last_anchor = 0; let mut msgs_collected_from_last_anchor = 0; let iter = anchor.iter_externals(msgs_read_offset_in_last_anchor as usize); for ext_msg in iter { tracing::trace!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS, "read ext_msg dst: {}", ext_msg.info.dst, ); + msgs_read_from_last_anchor += 1; if total_msgs_collected < count { msgs_read_offset_in_last_anchor += 1; } @@ -736,6 +746,10 @@ impl CollatorStdImpl { } } } + + metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels) + .decrement(msgs_read_from_last_anchor as f64); + tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS, "{} externals collected from anchor {}, msgs_read_offset_in_last_anchor: {}", msgs_collected_from_last_anchor, key, msgs_read_offset_in_last_anchor, diff --git a/collator/src/collator/mod.rs b/collator/src/collator/mod.rs index f07c0c201..358751ddd 100644 --- a/collator/src/collator/mod.rs +++ b/collator/src/collator/mod.rs @@ -513,10 +513,7 @@ impl CollatorStdImpl { ) -> Result<()> { let labels = [("workchain", self.shard_id.workchain().to_string())]; - let histogram = HistogramGuardWithLabels::begin( - "tycho_collator_import_next_anchors_on_init_time", - &labels, - ); + let timer = std::time::Instant::now(); let mut next_anchor = self .mpool_adapter @@ -524,9 +521,12 @@ impl CollatorStdImpl { .await? .unwrap(); + #[allow(dead_code)] #[derive(Debug)] struct AnchorInfo { - externals_count: usize, + id: MempoolAnchorId, + ct: u64, + externals: usize, } let mut anchors: Vec = vec![]; @@ -545,7 +545,11 @@ impl CollatorStdImpl { has_externals, })); - anchors.push(AnchorInfo { externals_count }); + anchors.push(AnchorInfo { + id: next_anchor.id, + ct: next_anchor.chain_time, + externals: externals_count, + }); while last_block_chain_time > next_anchor_chain_time { next_anchor = self.mpool_adapter.get_next_anchor(last_anchor_id).await?; @@ -563,19 +567,26 @@ impl CollatorStdImpl { has_externals, })); - anchors.push(AnchorInfo { externals_count }); + anchors.push(AnchorInfo { + id: next_anchor.id, + ct: next_anchor.chain_time, + externals: externals_count, + }); } + let imported_count: usize = anchors.iter().map(|a| a.externals).sum(); metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels) - .increment(anchors.iter().map(|a| a.externals_count).sum::() as u64); + .increment(imported_count as u64); + metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels) + .increment(imported_count as f64); self.last_imported_anchor_id = Some(last_anchor_id); self.last_imported_anchor_chain_time = Some(next_anchor_chain_time); self.last_imported_anchor_author = Some(next_anchor.author); tracing::debug!(target: tracing_targets::COLLATOR, - elapsed = histogram.finish().as_millis(), - "Collator (block_id={}): init: imported anchors on init ({:?})", + elapsed = timer.elapsed().as_millis(), + "Collator (block_id={}): init: imported anchors on init: {:?}", self.next_block_id_short, anchors.as_slice() ); diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index c97416971..cc15b0063 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -951,6 +951,11 @@ impl MessageGroups { loop { let group_entry = self.groups.entry(offset).or_default(); + if group_entry.is_full { + offset += 1; + continue; + } + let group_len = group_entry.inner.len(); match group_entry.inner.entry(account_id) { Entry::Vacant(entry) => { diff --git a/scripts/gen-dashboard.py b/scripts/gen-dashboard.py index 76ad7f56a..fe42f3ba1 100644 --- a/scripts/gen-dashboard.py +++ b/scripts/gen-dashboard.py @@ -719,11 +719,21 @@ def collator_message_metrics() -> RowPanel: "All executed msgs count", labels_selectors=['workchain=~"$workchain"'], ), + create_gauge_panel( + "tycho_collator_ext_msgs_imported_queue_size", + "Ext msgs imported queue size", + labels=['workchain=~"$workchain"'], + ), create_counter_panel( "tycho_collator_ext_msgs_imported_count", "Imported Ext msgs count from mempool", labels_selectors=['workchain=~"$workchain"'], ), + create_counter_panel( + "tycho_do_collate_ext_msgs_expired_count", + "Ext msgs expired count", + labels_selectors=['workchain=~"$workchain"'], + ), create_counter_panel( "tycho_do_collate_msgs_read_count_ext", "Read Ext msgs count", @@ -739,11 +749,6 @@ def collator_message_metrics() -> RowPanel: "Ext msgs error count", labels_selectors=['workchain=~"$workchain"'], ), - create_counter_panel( - "tycho_do_collate_ext_msgs_expired_count", - "Ext msgs expired count", - labels_selectors=['workchain=~"$workchain"'], - ), create_counter_panel( "tycho_do_collate_msgs_read_count_int", "Read Int msgs count",