Skip to content

Commit

Permalink
feature(collator): added more execution metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
SmaGMan authored and 0xdeafbeef committed Jun 20, 2024
1 parent 4bf3b4a commit 323bd75
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 55 deletions.
74 changes: 51 additions & 23 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,17 @@ impl CollatorStdImpl {
let (max_messages_per_set, min_externals_per_set, group_limit, group_vert_size) =
self.get_msgs_execution_params();

metrics::gauge!("tycho_do_collate_msgs_exec_params_set_size")
.set(max_messages_per_set as f64);
metrics::gauge!("tycho_do_collate_msgs_exec_params_min_exts_per_set")
.set(min_externals_per_set as f64);
metrics::gauge!("tycho_do_collate_msgs_exec_params_group_limit").set(group_limit as f64);
metrics::gauge!("tycho_do_collate_msgs_exec_params_group_vert_size")
.set(group_vert_size as f64);

// init execution manager
let mut exec_manager = ExecutionManager::new(
self.shard_id,
collation_data.next_lt,
Arc::new(PreloadedBlockchainConfig::with_config(
mc_data.config().clone(),
Expand Down Expand Up @@ -191,6 +200,7 @@ impl CollatorStdImpl {
let mut fill_msgs_total_elapsed = Duration::ZERO;
let mut execute_msgs_total_elapsed = Duration::ZERO;
let mut process_txs_total_elapsed = Duration::ZERO;
let mut exec_msgs_sets_count = 0;
loop {
let mut timer = Instant::now();

Expand All @@ -205,7 +215,7 @@ impl CollatorStdImpl {
} else {
vec![]
};
collation_data.read_ext_msgs += ext_msgs.len() as u32;
collation_data.read_ext_msgs += ext_msgs.len() as u64;

// 2. Then iterate through existing internals and try to fill the set
let mut remaining_capacity = max_messages_per_set - ext_msgs.len();
Expand Down Expand Up @@ -256,7 +266,7 @@ impl CollatorStdImpl {
ext_count = ext_msgs.len(),
"read additional externals",
);
collation_data.read_ext_msgs += ext_msgs.len() as u32;
collation_data.read_ext_msgs += ext_msgs.len() as u64;
msgs_set.append(&mut ext_msgs);
}

Expand Down Expand Up @@ -317,8 +327,11 @@ impl CollatorStdImpl {
break;
}

exec_msgs_sets_count += 1;

let msgs_set_len = msgs_set.len() as u32;
let mut msgs_set_executed_count = 0;
let mut exec_ticks_count = 0;
exec_manager.set_msgs_for_execution(msgs_set);
let mut msgs_set_offset = collation_data.processed_upto.processed_offset;
let mut msgs_set_full_processed = false;
Expand All @@ -327,7 +340,8 @@ impl CollatorStdImpl {

// execute msgs processing by groups
while !msgs_set_full_processed {
// Process messages
// Exec messages
exec_ticks_count += 1;
timer = std::time::Instant::now();
let tick = exec_manager.tick(msgs_set_offset).await?;
execute_msgs_total_elapsed += timer.elapsed();
Expand Down Expand Up @@ -357,7 +371,7 @@ impl CollatorStdImpl {
item.in_message,
)?;

collation_data.new_msgs_created += new_messages.len() as u32;
collation_data.new_msgs_created += new_messages.len() as u64;

for new_message in &new_messages {
let MsgInfo::Int(int_msg_info) = &new_message.info else {
Expand All @@ -384,7 +398,8 @@ impl CollatorStdImpl {
// currently we simply check only transactions count
// but needs to make good implementation futher
msgs_set_executed_count += one_tick_executed_count;
collation_data.tx_count += one_tick_executed_count as u32;
collation_data.tx_count += one_tick_executed_count as u64;
collation_data.ext_msgs_error_count += tick.ext_msgs_error_count;
tracing::debug!(target: tracing_targets::COLLATOR,
"processed messages from set {}/{}, total {}, offset = {}",
msgs_set_executed_count, msgs_set_len,
Expand All @@ -396,10 +411,13 @@ impl CollatorStdImpl {
}
}

metrics::gauge!("tycho_do_collate_exec_ticks_per_msgs_set", labels)
.set(exec_ticks_count as f64);

timer = std::time::Instant::now();

// HACK: temporary always full process msgs set and check block limits after
if collation_data.tx_count >= self.config.block_txs_limit {
if collation_data.tx_count >= self.config.block_txs_limit as u64 {
tracing::debug!(target: tracing_targets::COLLATOR,
"STUB: block limit reached: {}/{}",
collation_data.tx_count, self.config.block_txs_limit,
Expand Down Expand Up @@ -433,9 +451,15 @@ impl CollatorStdImpl {

if block_limits_reached {
// block is full - exit loop
metrics::counter!("tycho_do_collate_blocks_with_limits_reached_count", labels)
.increment(1);
break;
}
}

metrics::gauge!("tycho_do_collate_exec_msgs_sets_per_block", labels)
.set(exec_msgs_sets_count as f64);

metrics::histogram!("tycho_do_collate_fill_msgs_total_time", labels)
.record(fill_msgs_total_elapsed);
metrics::histogram!("tycho_do_collate_exec_msgs_total_time", labels)
Expand Down Expand Up @@ -523,40 +547,44 @@ impl CollatorStdImpl {
}

// metrics
metrics::counter!("tycho_do_collate_tx_total", labels)
.increment(collation_data.tx_count as _);
metrics::counter!("tycho_do_collate_tx_total", labels).increment(collation_data.tx_count);

metrics::counter!("tycho_do_collate_int_enqueue_count")
.increment(collation_data.int_enqueue_count as _);
.increment(collation_data.int_enqueue_count);
metrics::counter!("tycho_do_collate_int_dequeue_count")
.increment(collation_data.int_dequeue_count as _);
metrics::gauge!("tycho_do_collate_int_msgs_queue_calc").increment(
.increment(collation_data.int_dequeue_count);
metrics::gauge!("tycho_do_collate_int_msgs_queue_calc").set(
(collation_data.int_enqueue_count as i64 - collation_data.int_dequeue_count as i64)
as f64,
);

metrics::counter!("tycho_do_collate_msgs_exec_count_all", labels)
.increment(collation_data.execute_count_all as _);
.increment(collation_data.execute_count_all);

metrics::counter!("tycho_do_collate_msgs_read_count_ext", labels)
.increment(collation_data.read_ext_msgs as _);
.increment(collation_data.read_ext_msgs);
metrics::counter!("tycho_do_collate_msgs_exec_count_ext", labels)
.increment(collation_data.execute_count_ext as _);
.increment(collation_data.execute_count_ext);
metrics::counter!("tycho_do_collate_msgs_error_count_ext", labels)
.increment(collation_data.ext_msgs_error_count);

metrics::counter!("tycho_do_collate_msgs_read_count_int", labels)
.increment(collation_data.read_int_msgs_from_iterator as _);
.increment(collation_data.read_int_msgs_from_iterator);
metrics::counter!("tycho_do_collate_msgs_exec_count_int", labels)
.increment(collation_data.execute_count_int as _);
.increment(collation_data.execute_count_int);

// new internals messages
metrics::counter!("tycho_do_collate_new_msgs_created_count")
.increment(collation_data.new_msgs_created as _);
metrics::counter!("tycho_do_collate_new_msgs_inserted_to_iterator_count")
.increment(collation_data.inserted_new_msgs_to_iterator as _);
metrics::counter!("tycho_do_collate_msgs_read_count_new_int")
.increment(collation_data.read_new_msgs_from_iterator as _);
metrics::counter!("tycho_do_collate_new_msgs_created_count", labels)
.increment(collation_data.new_msgs_created);
metrics::counter!(
"tycho_do_collate_new_msgs_inserted_to_iterator_count",
labels
)
.increment(collation_data.inserted_new_msgs_to_iterator);
metrics::counter!("tycho_do_collate_msgs_read_count_new_int", labels)
.increment(collation_data.read_new_msgs_from_iterator);
metrics::counter!("tycho_do_collate_msgs_exec_count_new_int", labels)
.increment(collation_data.execute_count_new_int as _);
.increment(collation_data.execute_count_new_int);

collation_data.total_execute_msgs_time_mc = execute_msgs_total_elapsed.as_micros();
self.update_stats(&collation_data);
Expand Down
57 changes: 53 additions & 4 deletions collator/src/collator/execution_manager.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::cmp;
use std::collections::hash_map::Entry;
use std::sync::Arc;
use std::time::Duration;

use anyhow::Result;
use everscale_types::models::*;
use everscale_types::prelude::*;
use futures_util::stream::FuturesUnordered;
use futures_util::{Future, StreamExt};
use humantime::format_duration;
use ton_executor::{
ExecuteParams, ExecutorOutput, OrdinaryTransactionExecutor, PreloadedBlockchainConfig,
TickTockTransactionExecutor, TransactionExecutor,
Expand All @@ -21,6 +23,7 @@ use crate::tracing_targets;

/// Execution manager
pub(super) struct ExecutionManager {
shard_id: ShardIdent,
// this time is used if account's lt is smaller
min_next_lt: u64,
/// blockchain config
Expand All @@ -42,6 +45,7 @@ type MessageGroup = FastHashMap<HashBytes, Vec<Box<ParsedMessage>>>;
impl ExecutionManager {
/// constructor
pub fn new(
shard_id: ShardIdent,
min_next_lt: u64,
config: Arc<PreloadedBlockchainConfig>,
params: Arc<ExecuteParams>,
Expand All @@ -50,6 +54,7 @@ impl ExecutionManager {
shard_accounts: ShardAccounts,
) -> Self {
Self {
shard_id,
min_next_lt,
config,
params,
Expand Down Expand Up @@ -106,35 +111,47 @@ impl ExecutionManager {
pub async fn tick(&mut self, offset: u32) -> Result<ExecutedTick> {
tracing::trace!(target: tracing_targets::EXEC_MANAGER, offset, "messages set execution tick");

let labels = &[("workchain", self.shard_id.workchain().to_string())];

let (new_offset, group) = match self.message_groups.remove(&offset) {
Some(group) => (offset + 1, group),
None => return Ok(ExecutedTick::new_finished(offset)),
};
let finished = !self.message_groups.contains_key(&new_offset);

let group_len = group.len();
tracing::debug!(target: tracing_targets::EXEC_MANAGER, offset, group_len);
let group_size = group.len();
let mut group_max_vert_size = 0;

// TODO check externals is not exist accounts needed ?
let mut futures = FuturesUnordered::new();
for (account_id, msgs) in group {
group_max_vert_size = cmp::max(group_max_vert_size, msgs.len());
let shard_account_stuff = self.accounts_cache.create_account_stuff(&account_id)?;
futures.push(self.execute_messages(shard_account_stuff, msgs));
}

let mut items = Vec::with_capacity(group_len);
let mut items = Vec::with_capacity(group_size);
let mut ext_msgs_error_count = 0;

let mut max_account_msgs_exec_time = Duration::ZERO;
let mut total_exec_time = Duration::ZERO;

while let Some(executed_msgs_result) = futures.next().await {
let executed = executed_msgs_result?;

max_account_msgs_exec_time = max_account_msgs_exec_time.max(executed.exec_time);
total_exec_time += executed.exec_time;

for tx in executed.transactions {
if matches!(&tx.in_message.info, MsgInfo::ExtIn(_)) {
if let Err(e) = &tx.result {
tracing::error!(
tracing::warn!(
target: tracing_targets::EXEC_MANAGER,
account_addr = %executed.account_state.account_addr,
message_hash = %tx.in_message.cell.repr_hash(),
"failed to execute external message: {e:?}",
);
ext_msgs_error_count += 1;
continue;
}
}
Expand All @@ -155,10 +172,36 @@ impl ExecutionManager {
.add_account_stuff(executed.account_state);
}

let mean_account_msgs_exec_time = total_exec_time
.checked_div(group_size as u32)
.unwrap_or_default();

tracing::info!(target: tracing_targets::EXEC_MANAGER,
offset, group_size, group_max_vert_size,
total_exec_time = %format_duration(total_exec_time),
mean_account_msgs_exec_time = %format_duration(mean_account_msgs_exec_time),
max_account_msgs_exec_time = %format_duration(max_account_msgs_exec_time),
);

metrics::gauge!("tycho_do_collate_one_tick_group_size", labels).set(group_size as f64);
metrics::gauge!("tycho_do_collate_one_tick_group_max_vert_size", labels)
.set(group_max_vert_size as f64);
metrics::histogram!(
"tycho_do_collate_one_tick_account_msgs_exec_mean_time",
labels
)
.record(mean_account_msgs_exec_time);
metrics::histogram!(
"tycho_do_collate_one_tick_account_msgs_exec_max_time",
labels
)
.record(max_account_msgs_exec_time);

Ok(ExecutedTick {
new_offset,
finished,
items,
ext_msgs_error_count,
})
}

Expand All @@ -172,6 +215,8 @@ impl ExecutionManager {
let params = self.params.clone();

rayon_run_fifo(move || {
let timer = std::time::Instant::now();

let mut transactions = Vec::with_capacity(msgs.len());

for msg in msgs {
Expand All @@ -187,6 +232,7 @@ impl ExecutionManager {
Ok(ExecutedTransactions {
account_state,
transactions,
exec_time: timer.elapsed(),
})
})
}
Expand Down Expand Up @@ -313,6 +359,7 @@ pub struct ExecutedTick {
pub new_offset: u32,
pub finished: bool,
pub items: Vec<ExecutedTickItem>,
pub ext_msgs_error_count: u64,
}

impl ExecutedTick {
Expand All @@ -321,6 +368,7 @@ impl ExecutedTick {
new_offset,
finished: true,
items: Vec::new(),
ext_msgs_error_count: 0,
}
}
}
Expand All @@ -334,6 +382,7 @@ pub struct ExecutedTickItem {
pub struct ExecutedTransactions {
pub account_state: Box<ShardAccountStuff>,
pub transactions: Vec<ExecutedOrdinaryTransaction>,
pub exec_time: Duration,
}

pub struct ExecutedOrdinaryTransaction {
Expand Down
Loading

0 comments on commit 323bd75

Please sign in to comment.