Skip to content

Commit

Permalink
feat(collator): ext msgs queue size metric
Browse files Browse the repository at this point in the history
  • Loading branch information
SmaGMan committed Jul 17, 2024
1 parent 853ea83 commit a88366e
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 19 deletions.
22 changes: 18 additions & 4 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,8 @@ impl CollatorStdImpl {
collation_data: &mut BlockCollationData,
continue_from_read_to: bool,
) -> Result<(Vec<Box<ParsedMessage>>, bool)> {
let labels = [("workchain", shard_id.workchain().to_string())];

tracing::info!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"shard: {}, count: {}", shard_id, count,
);
Expand Down Expand Up @@ -653,7 +655,12 @@ impl CollatorStdImpl {
let next_chain_time =
collation_data.gen_utime as u64 * 1000 + collation_data.gen_utime_ms as u64;
if next_chain_time - anchor.chain_time > expire_timeout {
let iter = anchor.iter_externals(0);
let iter_from = if key == was_read_to.0 {
was_read_to.1 as usize
} else {
0
};
let iter = anchor.iter_externals(iter_from);
let mut expired_msgs_count = 0;
for ext_msg in iter {
tracing::trace!(target: tracing_targets::COLLATOR,
Expand All @@ -662,11 +669,12 @@ impl CollatorStdImpl {
expired_msgs_count += 1;
}

let labels = &[("workchain", shard_id.workchain().to_string())];
metrics::counter!("tycho_do_collate_ext_msgs_expired_count", labels)
metrics::counter!("tycho_do_collate_ext_msgs_expired_count", &labels)
.increment(expired_msgs_count);
metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
.decrement(expired_msgs_count as f64);

// skip and remove fully expired anchor
// skip and remove expired anchor
let _ = anchors_cache.remove(next_idx);
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"anchor with key {} fully skipped due to expiration, removed from anchors cache", key,
Expand Down Expand Up @@ -703,12 +711,14 @@ impl CollatorStdImpl {
);

// get iterator and read messages
let mut msgs_read_from_last_anchor = 0;
let mut msgs_collected_from_last_anchor = 0;
let iter = anchor.iter_externals(msgs_read_offset_in_last_anchor as usize);
for ext_msg in iter {
tracing::trace!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"read ext_msg dst: {}", ext_msg.info.dst,
);
msgs_read_from_last_anchor += 1;
if total_msgs_collected < count {
msgs_read_offset_in_last_anchor += 1;
}
Expand Down Expand Up @@ -736,6 +746,10 @@ impl CollatorStdImpl {
}
}
}

metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
.decrement(msgs_read_from_last_anchor as f64);

tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"{} externals collected from anchor {}, msgs_read_offset_in_last_anchor: {}",
msgs_collected_from_last_anchor, key, msgs_read_offset_in_last_anchor,
Expand Down
31 changes: 21 additions & 10 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,20 +513,20 @@ impl CollatorStdImpl {
) -> Result<()> {
let labels = [("workchain", self.shard_id.workchain().to_string())];

let histogram = HistogramGuardWithLabels::begin(
"tycho_collator_import_next_anchors_on_init_time",
&labels,
);
let timer = std::time::Instant::now();

let mut next_anchor = self
.mpool_adapter
.get_anchor_by_id(processed_upto_anchor_id)
.await?
.unwrap();

#[allow(dead_code)]
#[derive(Debug)]
struct AnchorInfo {
externals_count: usize,
id: MempoolAnchorId,
ct: u64,
externals: usize,
}

let mut anchors: Vec<AnchorInfo> = vec![];
Expand All @@ -545,7 +545,11 @@ impl CollatorStdImpl {
has_externals,
}));

anchors.push(AnchorInfo { externals_count });
anchors.push(AnchorInfo {
id: next_anchor.id,
ct: next_anchor.chain_time,
externals: externals_count,
});
while last_block_chain_time > next_anchor_chain_time {
next_anchor = self.mpool_adapter.get_next_anchor(last_anchor_id).await?;

Expand All @@ -563,19 +567,26 @@ impl CollatorStdImpl {
has_externals,
}));

anchors.push(AnchorInfo { externals_count });
anchors.push(AnchorInfo {
id: next_anchor.id,
ct: next_anchor.chain_time,
externals: externals_count,
});
}

let imported_count: usize = anchors.iter().map(|a| a.externals).sum();
metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels)
.increment(anchors.iter().map(|a| a.externals_count).sum::<usize>() as u64);
.increment(imported_count as u64);
metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
.increment(imported_count as f64);

self.last_imported_anchor_id = Some(last_anchor_id);
self.last_imported_anchor_chain_time = Some(next_anchor_chain_time);
self.last_imported_anchor_author = Some(next_anchor.author);

tracing::debug!(target: tracing_targets::COLLATOR,
elapsed = histogram.finish().as_millis(),
"Collator (block_id={}): init: imported anchors on init ({:?})",
elapsed = timer.elapsed().as_millis(),
"Collator (block_id={}): init: imported anchors on init: {:?}",
self.next_block_id_short,
anchors.as_slice()
);
Expand Down
5 changes: 5 additions & 0 deletions collator/src/collator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,11 @@ impl MessageGroups {
loop {
let group_entry = self.groups.entry(offset).or_default();

if group_entry.is_full {
offset += 1;
continue;
}

let group_len = group_entry.inner.len();
match group_entry.inner.entry(account_id) {
Entry::Vacant(entry) => {
Expand Down
15 changes: 10 additions & 5 deletions scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,11 +719,21 @@ def collator_message_metrics() -> RowPanel:
"All executed msgs count",
labels_selectors=['workchain=~"$workchain"'],
),
create_gauge_panel(
"tycho_collator_ext_msgs_imported_queue_size",
"Ext msgs imported queue size",
labels=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_collator_ext_msgs_imported_count",
"Imported Ext msgs count from mempool",
labels_selectors=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_do_collate_ext_msgs_expired_count",
"Ext msgs expired count",
labels_selectors=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_do_collate_msgs_read_count_ext",
"Read Ext msgs count",
Expand All @@ -739,11 +749,6 @@ def collator_message_metrics() -> RowPanel:
"Ext msgs error count",
labels_selectors=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_do_collate_ext_msgs_expired_count",
"Ext msgs expired count",
labels_selectors=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_do_collate_msgs_read_count_int",
"Read Int msgs count",
Expand Down

0 comments on commit a88366e

Please sign in to comment.