Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test/read msgs metrics #176

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 22 additions & 4 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,9 @@ impl CollatorStdImpl {
let read_ext_messages_elapsed = exec_manager.read_ext_messages_total_elapsed();
metrics::histogram!("tycho_do_collate_read_ext_msgs_time", labels)
.record(read_ext_messages_elapsed);
let add_to_message_groups_elapsed = exec_manager.add_to_message_groups_total_elapsed();
metrics::histogram!("tycho_do_collate_add_to_msg_groups_time", labels)
.record(add_to_message_groups_elapsed);

metrics::histogram!("tycho_do_collate_exec_msgs_total_time", labels)
.record(execute_msgs_total_elapsed);
Expand Down Expand Up @@ -526,6 +529,7 @@ impl CollatorStdImpl {
read_existing = %format_duration(read_existing_messages_elapsed),
read_ext = %format_duration(read_ext_messages_elapsed),
read_new = %format_duration(read_new_messages_elapsed),
add_to_groups = %format_duration(add_to_message_groups_elapsed),

exec_msgs_total = %format_duration(execute_msgs_total_elapsed),
process_txs_total = %format_duration(process_txs_total_elapsed),
Expand Down Expand Up @@ -573,6 +577,8 @@ impl CollatorStdImpl {
collation_data: &mut BlockCollationData,
continue_from_read_to: bool,
) -> Result<(Vec<Box<ParsedMessage>>, bool)> {
let labels = [("workchain", shard_id.workchain().to_string())];

tracing::info!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"shard: {}, count: {}", shard_id, count,
);
Expand Down Expand Up @@ -649,7 +655,12 @@ impl CollatorStdImpl {
let next_chain_time =
collation_data.gen_utime as u64 * 1000 + collation_data.gen_utime_ms as u64;
if next_chain_time - anchor.chain_time > expire_timeout {
let iter = anchor.iter_externals(0);
let iter_from = if key == was_read_to.0 {
was_read_to.1 as usize
} else {
0
};
let iter = anchor.iter_externals(iter_from);
let mut expired_msgs_count = 0;
for ext_msg in iter {
tracing::trace!(target: tracing_targets::COLLATOR,
Expand All @@ -658,11 +669,12 @@ impl CollatorStdImpl {
expired_msgs_count += 1;
}

let labels = &[("workchain", shard_id.workchain().to_string())];
metrics::counter!("tycho_do_collate_ext_msgs_expired_count", labels)
metrics::counter!("tycho_do_collate_ext_msgs_expired_count", &labels)
.increment(expired_msgs_count);
metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
.decrement(expired_msgs_count as f64);

// skip and remove fully expired anchor
// skip and remove expired anchor
let _ = anchors_cache.remove(next_idx);
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"anchor with key {} fully skipped due to expiration, removed from anchors cache", key,
Expand Down Expand Up @@ -699,12 +711,14 @@ impl CollatorStdImpl {
);

// get iterator and read messages
let mut msgs_read_from_last_anchor = 0;
let mut msgs_collected_from_last_anchor = 0;
let iter = anchor.iter_externals(msgs_read_offset_in_last_anchor as usize);
for ext_msg in iter {
tracing::trace!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"read ext_msg dst: {}", ext_msg.info.dst,
);
msgs_read_from_last_anchor += 1;
if total_msgs_collected < count {
msgs_read_offset_in_last_anchor += 1;
}
Expand Down Expand Up @@ -732,6 +746,10 @@ impl CollatorStdImpl {
}
}
}

metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
.decrement(msgs_read_from_last_anchor as f64);

tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"{} externals collected from anchor {}, msgs_read_offset_in_last_anchor: {}",
msgs_collected_from_last_anchor, key, msgs_read_offset_in_last_anchor,
Expand Down
24 changes: 23 additions & 1 deletion collator/src/collator/execution_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ pub(super) struct ExecutionManager {
read_new_messages_total_elapsed: Duration,
/// sum total time of reading external messages
read_ext_messages_total_elapsed: Duration,
/// sum total time of adding messages to groups
add_to_message_groups_total_elapsed: Duration,
}

pub(super) struct MessagesExecutor {
Expand Down Expand Up @@ -92,6 +94,7 @@ impl ExecutionManager {
read_existing_messages_total_elapsed: Duration::ZERO,
read_new_messages_total_elapsed: Duration::ZERO,
read_ext_messages_total_elapsed: Duration::ZERO,
add_to_message_groups_total_elapsed: Duration::ZERO,
}
}

Expand All @@ -106,6 +109,7 @@ impl ExecutionManager {
self.read_existing_messages_total_elapsed = Duration::ZERO;
self.read_new_messages_total_elapsed = Duration::ZERO;
self.read_ext_messages_total_elapsed = Duration::ZERO;
self.add_to_message_groups_total_elapsed = Duration::ZERO;

let current_iterator_positions = std::mem::take(&mut self.current_iterator_positions);
QueueIteratorAdapter::new(
Expand Down Expand Up @@ -137,6 +141,10 @@ impl ExecutionManager {
self.read_ext_messages_total_elapsed
}

pub fn add_to_message_groups_total_elapsed(&self) -> Duration {
self.add_to_message_groups_total_elapsed
}

#[tracing::instrument(skip_all)]
pub async fn get_next_message_group(
&mut self,
Expand Down Expand Up @@ -166,6 +174,7 @@ impl ExecutionManager {
}

let timer = std::time::Instant::now();
let mut add_to_groups_elapsed = Duration::ZERO;

// read messages from iterator and fill messages groups
// until the first group fully loaded
Expand All @@ -176,6 +185,7 @@ impl ExecutionManager {

existing_internals_read_count += 1;

let timer_add_to_groups = std::time::Instant::now();
self.message_groups.add_message(Box::new(ParsedMessage {
info: MsgInfo::Int(int_msg.message_with_source.message.info.clone()),
dst_in_current_shard: true,
Expand All @@ -185,6 +195,7 @@ impl ExecutionManager {
same_shard: int_msg.message_with_source.shard_id == self.shard_id,
}),
}));
add_to_groups_elapsed += timer_add_to_groups.elapsed();

if self.message_groups.messages_count() >= self.messages_buffer_limit {
tracing::debug!(target: tracing_targets::COLLATOR,
Expand Down Expand Up @@ -217,6 +228,8 @@ impl ExecutionManager {
}

self.read_existing_messages_total_elapsed += timer.elapsed();
self.read_existing_messages_total_elapsed -= add_to_groups_elapsed;
self.add_to_message_groups_total_elapsed += add_to_groups_elapsed;

// when message_groups buffer is empty and no more existing internals in current iterator
// then set all read messages as processed
Expand Down Expand Up @@ -257,6 +270,7 @@ impl ExecutionManager {

if group_opt.is_none() && self.process_ext_messages && !self.process_new_messages {
let timer = std::time::Instant::now();
let mut add_to_groups_elapsed = Duration::ZERO;

let mut externals_read_count = 0;
while collator.has_pending_externals {
Expand All @@ -269,9 +283,11 @@ impl ExecutionManager {

externals_read_count += ext_msgs.len() as u64;

let timer_add_to_groups = std::time::Instant::now();
for ext_msg in ext_msgs {
self.message_groups.add_message(ext_msg);
}
add_to_groups_elapsed += timer_add_to_groups.elapsed();

if self.message_groups.messages_count() >= self.messages_buffer_limit {
tracing::debug!(target: tracing_targets::COLLATOR,
Expand Down Expand Up @@ -304,6 +320,8 @@ impl ExecutionManager {
}

self.read_ext_messages_total_elapsed += timer.elapsed();
self.read_ext_messages_total_elapsed -= add_to_groups_elapsed;
self.add_to_message_groups_total_elapsed += add_to_groups_elapsed;

if self.message_groups.is_empty() && !collator.has_pending_externals {
tracing::debug!(target: tracing_targets::COLLATOR,
Expand Down Expand Up @@ -332,21 +350,23 @@ impl ExecutionManager {
.try_update_new_messages_read_to(max_new_message_key_to_current_shard)?;

let timer = std::time::Instant::now();
let mut add_to_groups_elapsed = Duration::ZERO;

let mut new_internals_read_count = 0;
while let Some(int_msg) = mq_iterator_adapter.next_new_message()? {
assert!(int_msg.is_new);

new_internals_read_count += 1;
collation_data.read_new_msgs_from_iterator += 1;

let timer_add_to_groups = std::time::Instant::now();
self.message_groups.add_message(Box::new(ParsedMessage {
info: MsgInfo::Int(int_msg.message_with_source.message.info.clone()),
dst_in_current_shard: true,
cell: int_msg.message_with_source.message.cell.clone(),
special_origin: None,
dequeued: None,
}));
add_to_groups_elapsed += timer_add_to_groups.elapsed();

if self.message_groups.messages_count() >= self.messages_buffer_limit {
tracing::debug!(target: tracing_targets::COLLATOR,
Expand Down Expand Up @@ -382,6 +402,8 @@ impl ExecutionManager {
}

self.read_new_messages_total_elapsed += timer.elapsed();
self.read_new_messages_total_elapsed -= add_to_groups_elapsed;
self.add_to_message_groups_total_elapsed += add_to_groups_elapsed;

// actually, we process all message groups with new messages in one step,
// so we update internals processed_upto each step
Expand Down
31 changes: 21 additions & 10 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -513,20 +513,20 @@ impl CollatorStdImpl {
) -> Result<()> {
let labels = [("workchain", self.shard_id.workchain().to_string())];

let histogram = HistogramGuardWithLabels::begin(
"tycho_collator_import_next_anchors_on_init_time",
&labels,
);
let timer = std::time::Instant::now();

let mut next_anchor = self
.mpool_adapter
.get_anchor_by_id(processed_upto_anchor_id)
.await?
.unwrap();

#[allow(dead_code)]
#[derive(Debug)]
struct AnchorInfo {
externals_count: usize,
id: MempoolAnchorId,
ct: u64,
externals: usize,
}

let mut anchors: Vec<AnchorInfo> = vec![];
Expand All @@ -545,7 +545,11 @@ impl CollatorStdImpl {
has_externals,
}));

anchors.push(AnchorInfo { externals_count });
anchors.push(AnchorInfo {
id: next_anchor.id,
ct: next_anchor.chain_time,
externals: externals_count,
});
while last_block_chain_time > next_anchor_chain_time {
next_anchor = self.mpool_adapter.get_next_anchor(last_anchor_id).await?;

Expand All @@ -563,19 +567,26 @@ impl CollatorStdImpl {
has_externals,
}));

anchors.push(AnchorInfo { externals_count });
anchors.push(AnchorInfo {
id: next_anchor.id,
ct: next_anchor.chain_time,
externals: externals_count,
});
}

let imported_count: usize = anchors.iter().map(|a| a.externals).sum();
metrics::counter!("tycho_collator_ext_msgs_imported_count", &labels)
.increment(anchors.iter().map(|a| a.externals_count).sum::<usize>() as u64);
.increment(imported_count as u64);
metrics::gauge!("tycho_collator_ext_msgs_imported_queue_size", &labels)
.increment(imported_count as f64);

self.last_imported_anchor_id = Some(last_anchor_id);
self.last_imported_anchor_chain_time = Some(next_anchor_chain_time);
self.last_imported_anchor_author = Some(next_anchor.author);

tracing::debug!(target: tracing_targets::COLLATOR,
elapsed = histogram.finish().as_millis(),
"Collator (block_id={}): init: imported anchors on init ({:?})",
elapsed = timer.elapsed().as_millis(),
"Collator (block_id={}): init: imported anchors on init: {:?}",
self.next_block_id_short,
anchors.as_slice()
);
Expand Down
5 changes: 5 additions & 0 deletions collator/src/collator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,11 @@ impl MessageGroups {
loop {
let group_entry = self.groups.entry(offset).or_default();

if group_entry.is_full {
offset += 1;
continue;
}

let group_len = group_entry.inner.len();
match group_entry.inner.entry(account_id) {
Entry::Vacant(entry) => {
Expand Down
20 changes: 15 additions & 5 deletions scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -719,11 +719,21 @@ def collator_message_metrics() -> RowPanel:
"All executed msgs count",
labels_selectors=['workchain=~"$workchain"'],
),
create_gauge_panel(
"tycho_collator_ext_msgs_imported_queue_size",
"Ext msgs imported queue size",
labels=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_collator_ext_msgs_imported_count",
"Imported Ext msgs count from mempool",
labels_selectors=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_do_collate_ext_msgs_expired_count",
"Ext msgs expired count",
labels_selectors=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_do_collate_msgs_read_count_ext",
"Read Ext msgs count",
Expand All @@ -739,11 +749,6 @@ def collator_message_metrics() -> RowPanel:
"Ext msgs error count",
labels_selectors=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_do_collate_ext_msgs_expired_count",
"Ext msgs expired count",
labels_selectors=['workchain=~"$workchain"'],
),
create_counter_panel(
"tycho_do_collate_msgs_read_count_int",
"Read Int msgs count",
Expand Down Expand Up @@ -862,6 +867,11 @@ def collator_core_operations_metrics() -> RowPanel:
"Execution time: incl Fill messages: read new",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_add_to_msg_groups_time",
"Execution time: incl Fill messages: add to msg groups",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_exec_msgs_total_time",
"Execution time: incl Execute messages",
Expand Down
Loading