Skip to content

Commit

Permalink
refactor(collator): read messages detailed metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
SmaGMan committed Jul 16, 2024
1 parent 498b146 commit d2fe285
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 12 deletions.
23 changes: 19 additions & 4 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,12 +294,23 @@ impl CollatorStdImpl {
.set(executed_groups_count as f64);
}

metrics::histogram!("tycho_do_collate_fill_msgs_total_time", labels)
.record(fill_msgs_total_elapsed);

let init_iterator_elapsed = mq_iterator_adapter.init_iterator_total_elapsed();
metrics::histogram!("tycho_do_collate_init_iterator_time", labels)
.record(init_iterator_elapsed);
let read_existing_messages_elapsed =
mq_iterator_adapter.read_existing_messages_total_elapsed();
metrics::histogram!("tycho_do_collate_read_int_msgs_time", labels)
.record(read_existing_messages_elapsed);
let read_new_messages_elapsed = mq_iterator_adapter.read_new_messages_total_elapsed();
metrics::histogram!("tycho_do_collate_read_new_msgs_time", labels)
.record(read_new_messages_elapsed);
let read_ext_messages_elapsed = exec_manager.read_ext_messages_total_elapsed();
metrics::histogram!("tycho_do_collate_read_ext_msgs_time", labels)
.record(read_ext_messages_elapsed);

metrics::histogram!("tycho_do_collate_fill_msgs_total_time", labels)
.record(fill_msgs_total_elapsed);
metrics::histogram!("tycho_do_collate_exec_msgs_total_time", labels)
.record(execute_msgs_total_elapsed);
metrics::histogram!("tycho_do_collate_process_txs_total_time", labels)
Expand Down Expand Up @@ -476,7 +487,7 @@ impl CollatorStdImpl {

tracing::info!(target: tracing_targets::COLLATOR,
"Created and sent block candidate: time_diff={}, \
collation_time={}, elapsed_from_prev_block={}, overhead = {}, \
collation_time={}, elapsed_from_prev_block={}, overhead={}, \
start_lt={}, end_lt={}, exec_count={}, \
exec_ext={}, exec_int={}, exec_new_int={}, \
enqueue_count={}, dequeue_count={}, \
Expand Down Expand Up @@ -507,12 +518,16 @@ impl CollatorStdImpl {
overhead = %format_duration(collation_mngmnt_overhead),
total = %format_duration(total_elapsed),
prepare = %format_duration(prepare_elapsed),
init_iterator = %format_duration(init_iterator_elapsed),
execute_tick = %format_duration(execute_tick_elapsed),
execute_tock = %format_duration(execute_tock_elapsed),
execute_total = %format_duration(execute_elapsed),

fill_msgs_total = %format_duration(fill_msgs_total_elapsed),
init_iterator = %format_duration(init_iterator_elapsed),
read_existing = %format_duration(read_existing_messages_elapsed),
read_ext = %format_duration(read_ext_messages_elapsed),
read_new = %format_duration(read_new_messages_elapsed),

exec_msgs_total = %format_duration(execute_msgs_total_elapsed),
process_txs_total = %format_duration(process_txs_total_elapsed),

Expand Down
14 changes: 13 additions & 1 deletion collator/src/collator/execution_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ pub(super) struct ExecutionManager {
/// current read positions of internals mq iterator
/// when it is not finished
current_iterator_positions: FastHashMap<ShardIdent, InternalMessageKey>,

/// sum total time of reading external messages
read_ext_messages_total_elapsed: Duration,
}

pub(super) struct MessagesExecutor {
Expand Down Expand Up @@ -82,6 +85,7 @@ impl ExecutionManager {
process_new_messages: false,
mq_adapter,
current_iterator_positions: Default::default(),
read_ext_messages_total_elapsed: Duration::ZERO,
}
}

Expand All @@ -93,6 +97,8 @@ impl ExecutionManager {
self.process_ext_messages = false;
self.process_new_messages = false;

self.read_ext_messages_total_elapsed = Duration::ZERO;

let current_iterator_positions = std::mem::take(&mut self.current_iterator_positions);
QueueIteratorAdapter::new(
self.shard_id,
Expand All @@ -111,6 +117,10 @@ impl ExecutionManager {
Ok(has_pending_internals)
}

pub fn read_ext_messages_total_elapsed(&self) -> Duration {
self.read_ext_messages_total_elapsed
}

#[tracing::instrument(skip_all)]
pub async fn get_next_message_group(
&mut self,
Expand Down Expand Up @@ -228,11 +238,13 @@ impl ExecutionManager {
if group_opt.is_none() && self.process_ext_messages && !self.process_new_messages {
let mut externals_read_count = 0;
while collator.has_pending_externals {
let timer = std::time::Instant::now();
let ext_msgs = collator.read_next_externals(
1,
3,
collation_data,
self.ext_messages_reader_started,
)?;
self.read_ext_messages_total_elapsed += timer.elapsed();
self.ext_messages_reader_started = true;

externals_read_count += ext_msgs.len() as u64;
Expand Down
15 changes: 15 additions & 0 deletions collator/src/collator/mq_iterator_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,13 @@ pub(super) struct QueueIteratorAdapter {
new_messages_read_to: InternalMessageKey,
/// current read position of internals mq iterator
current_positions: FastHashMap<ShardIdent, InternalMessageKey>,

/// sum total iterators initialization time
init_iterator_total_elapsed: Duration,
/// sum total time of reading existing internal messages
read_existing_messages_total_elapsed: Duration,
/// sum total time of reading new internal messages
read_new_messages_total_elapsed: Duration,
}

impl QueueIteratorAdapter {
Expand All @@ -46,6 +51,8 @@ impl QueueIteratorAdapter {
new_messages_read_to: InternalMessageKey::default(),
current_positions: current_positions_opt.unwrap_or_default(),
init_iterator_total_elapsed: Duration::ZERO,
read_existing_messages_total_elapsed: Duration::ZERO,
read_new_messages_total_elapsed: Duration::ZERO,
}
}

Expand Down Expand Up @@ -238,6 +245,10 @@ impl QueueIteratorAdapter {
Ok(res)
}

pub fn read_existing_messages_total_elapsed(&self) -> Duration {
self.read_existing_messages_total_elapsed
}

pub fn next_existing_message(&mut self) -> Result<Option<IterItem>> {
if self.no_pending_existing_internals {
Ok(None)
Expand Down Expand Up @@ -279,6 +290,10 @@ impl QueueIteratorAdapter {
}
}

pub fn read_new_messages_total_elapsed(&self) -> Duration {
self.read_new_messages_total_elapsed
}

pub fn next_new_message(&mut self) -> Result<Option<IterItem>> {
// fill messages groups from iterator until the first group filled
// or current new messages read window finished
Expand Down
29 changes: 22 additions & 7 deletions scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -832,29 +832,44 @@ def collator_core_operations_metrics() -> RowPanel:
"Collation prepare time",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_execute_time",
"Execution time",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_fill_msgs_total_time",
"Execution time: incl Fill messages",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_init_iterator_time",
"Init iterator time",
"Execution time: incl Fill messages: init iterator",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_execute_time",
"Execution time",
"tycho_do_collate_read_int_msgs_time",
"Execution time: incl Fill messages: read existing",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_fill_msgs_total_time",
"Execution time: incl Fill messages time",
"tycho_do_collate_read_ext_msgs_time",
"Execution time: incl Fill messages: read externals",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_read_new_msgs_time",
"Execution time: incl Fill messages: read new",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_exec_msgs_total_time",
"Execution time: incl Execute messages time",
"Execution time: incl Execute messages",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
"tycho_do_collate_process_txs_total_time",
"Execution time: incl Process transactions time",
"Execution time: incl Process transactions",
labels=['workchain=~"$workchain"'],
),
create_heatmap_panel(
Expand Down

0 comments on commit d2fe285

Please sign in to comment.