Skip to content

Commit

Permalink
perf(execution_manager): batch collations inside single spawn
Browse files Browse the repository at this point in the history
Add `tycho_message_execution_time` metric
  • Loading branch information
0xdeafbeef committed Jun 12, 2024
1 parent ef30c52 commit 4340896
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 48 deletions.
102 changes: 54 additions & 48 deletions collator/src/collator/execution_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -215,22 +215,32 @@ impl ExecutionManager {
mut shard_account_stuff: ShardAccountStuff,
) -> Result<(Vec<ExecutedMessage>, ShardAccountStuff)> {
let mut results = vec![];
for msg in msgs {
let (executed_msg, updated_shard_account_stuff) = self
.execute_one_message(account_id, msg, shard_account_stuff)
.await?;
results.push(executed_msg);
shard_account_stuff = updated_shard_account_stuff;
}
Ok((results, shard_account_stuff))
let (config, params) = self.get_execute_params()?;

rayon_run(move || {
for msg in msgs {
let (executed_msg, updated_shard_account_stuff) = Self::execute_one_message(
account_id,
msg,
shard_account_stuff,
&config,
&params,
)?;
results.push(executed_msg);
shard_account_stuff = updated_shard_account_stuff;
}
Ok((results, shard_account_stuff))
})
.await
}

/// execute message
pub async fn execute_one_message(
&self,
pub fn execute_one_message(
account_id: AccountId,
new_msg: AsyncMessage,
mut shard_account_stuff: ShardAccountStuff,
config: &PreloadedBlockchainConfig,
params: &ExecuteParams,
) -> Result<(ExecutedMessage, ShardAccountStuff)> {
tracing::trace!(
target: tracing_targets::EXEC_MANAGER,
Expand All @@ -247,42 +257,36 @@ impl ExecutionManager {
account_id,
);
let now = std::time::Instant::now();
let (config, params) = self.get_execute_params()?;
let (transaction_result, in_message, updated_shard_account_stuff) = rayon_run(move || {
let shard_account = &mut shard_account_stuff.shard_account;
let mut transaction = match &new_msg {
AsyncMessage::Recover(new_msg_cell)
| AsyncMessage::Mint(new_msg_cell)
| AsyncMessage::Ext(_, new_msg_cell)
| AsyncMessage::Int(_, new_msg_cell, _)
| AsyncMessage::NewInt(_, new_msg_cell) => {
execute_ordinary_message(new_msg_cell, shard_account, params, &config)
}
AsyncMessage::TickTock(ticktock_) => {
execute_ticktock_message(*ticktock_, shard_account, params, &config)
}
};

if let Ok(transaction) = transaction.as_mut() {
// TODO replace with batch set
shard_account_stuff.add_transaction(&transaction.0, transaction.1.clone())?;
let shard_account = &mut shard_account_stuff.shard_account;
let mut transaction = match &new_msg {
AsyncMessage::Recover(new_msg_cell)
| AsyncMessage::Mint(new_msg_cell)
| AsyncMessage::Ext(_, new_msg_cell)
| AsyncMessage::Int(_, new_msg_cell, _)
| AsyncMessage::NewInt(_, new_msg_cell) => {
execute_ordinary_message(new_msg_cell, shard_account, params, config)
}
Ok((transaction, new_msg, shard_account_stuff))
as Result<(
Result<Box<(CurrencyCollection, Lazy<Transaction>)>>,
AsyncMessage,
ShardAccountStuff,
)>
})
.await?;
let transaction_duration = now.elapsed().as_millis() as u64;
AsyncMessage::TickTock(ticktock_) => {
execute_ticktock_message(*ticktock_, shard_account, params, config)
}
};

if let Ok(transaction) = transaction.as_mut() {
// TODO replace with batch set
shard_account_stuff.add_transaction(&transaction.0, transaction.1.clone())?;
}

let elapsed = now.elapsed();
metrics::histogram!("tycho_message_execution_time").record(elapsed);

let transaction_duration = elapsed.as_millis() as u64;
Ok((
ExecutedMessage {
transaction_result,
in_message,
transaction_result: transaction,
in_message: new_msg,
transaction_duration,
},
updated_shard_account_stuff,
shard_account_stuff,
))
}

Expand Down Expand Up @@ -316,16 +320,18 @@ impl ExecutionManager {
shard_account_stuff: ShardAccountStuff,
) -> Result<(AsyncMessage, Box<(CurrencyCollection, Lazy<Transaction>)>)> {
tracing::trace!(target: tracing_targets::EXEC_MANAGER, "execute_special_transaction()");
let (config, params) = self.get_execute_params()?;
let (
ExecutedMessage {
transaction_result,
in_message,
..
},
updated_shard_account_stuff,
) = self
.execute_one_message(account_id, msg, shard_account_stuff)
.await?;
) = rayon_run(move || {
Self::execute_one_message(account_id, msg, shard_account_stuff, &config, &params)
})
.await?;
self.min_next_lt = cmp::max(
self.min_next_lt,
updated_shard_account_stuff.shard_account.last_trans_lt + 1,
Expand Down Expand Up @@ -359,25 +365,25 @@ impl ExecutionManager {
fn execute_ordinary_message(
new_msg_cell: &Cell,
shard_account: &mut ShardAccount,
params: ExecuteParams,
params: &ExecuteParams,
config: &PreloadedBlockchainConfig,
) -> Result<Box<(CurrencyCollection, Lazy<Transaction>)>> {
let executor = OrdinaryTransactionExecutor::new();
executor
.execute_with_libs_and_params(Some(new_msg_cell), shard_account, &params, config)
.execute_with_libs_and_params(Some(new_msg_cell), shard_account, params, config)
.map(Box::new)
}

/// execute tick tock message
fn execute_ticktock_message(
tick_tock: TickTock,
shard_account: &mut ShardAccount,
params: ExecuteParams,
params: &ExecuteParams,
config: &PreloadedBlockchainConfig,
) -> Result<Box<(CurrencyCollection, Lazy<Transaction>)>> {
let executor = TickTockTransactionExecutor::new(tick_tock);
executor
.execute_with_libs_and_params(None, shard_account, &params, config)
.execute_with_libs_and_params(None, shard_account, params, config)
.map(Box::new)
}

Expand Down
11 changes: 11 additions & 0 deletions scripts/gen-dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,17 @@ def collator_do_collate() -> RowPanel:
return create_row("Collator Do Collate", metrics)


def collator_execution_manager() -> RowPanel:
metrics = [
create_heatmap_panel(
"tycho_message_execution_time",
"Message execution time",
yaxis(UNITS.SECONDS),
),
]
return create_row("Collator Execution Manager", metrics)


def templates() -> Templating:
return Templating(
list=[
Expand Down

0 comments on commit 4340896

Please sign in to comment.