diff --git a/Cargo.lock b/Cargo.lock index cf3e3bfc8..ee91412c7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3347,6 +3347,7 @@ dependencies = [ "metrics", "parking_lot", "rand", + "rayon", "serde", "serde_json", "sha2", diff --git a/collator/Cargo.toml b/collator/Cargo.toml index d9f89a290..30048f99b 100644 --- a/collator/Cargo.toml +++ b/collator/Cargo.toml @@ -22,6 +22,7 @@ indexmap = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } +rayon = { workspace = true } sha2 = { workspace = true } tl-proto = { workspace = true } tempfile = { workspace = true } diff --git a/collator/src/collator/build_block.rs b/collator/src/collator/build_block.rs index 8e1e2f379..4eb43d4a2 100644 --- a/collator/src/collator/build_block.rs +++ b/collator/src/collator/build_block.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeMap; use std::time::Duration; use anyhow::{bail, Result}; @@ -13,7 +14,7 @@ use tycho_util::metrics::HistogramGuard; use super::execution_manager::ExecutionManager; use super::CollatorStdImpl; -use crate::collator::types::BlockCollationData; +use crate::collator::types::{BlockCollationData, PreparedInMsg, PreparedOutMsg, PrevData}; use crate::tracing_targets; use crate::types::BlockCandidate; @@ -33,116 +34,60 @@ impl CollatorStdImpl { let prev_shard_data = &self.working_state().prev_shard_data; // update shard accounts tree and prepare accounts blocks - let mut new_config_params = None::; let mut global_libraries = exec_manager.executor_params().state_libs.clone(); let is_masterchain = collation_data.block_id_short.shard.is_masterchain(); let config_address = &self.working_state().mc_data.config().address; - let build_account_blocks_elapsed; - let (account_blocks, shard_accounts) = { - let histogram = HistogramGuard::begin_with_labels( - "tycho_collator_finalize_build_account_blocks_time", - labels, - ); - - let mut account_blocks = RelaxedAugDict::new(); - let mut shard_accounts = - RelaxedAugDict::from_full(prev_shard_data.observable_accounts()); - - for updated_account in exec_manager.into_changed_accounts() { - if updated_account.transactions.is_empty() { - continue; - } - - let loaded_account = updated_account.shard_account.load_account()?; - match &loaded_account { - Some(account) => { - if is_masterchain && &updated_account.account_addr == config_address { - if let AccountState::Active(StateInit { data, .. }) = &account.state { - if let Some(data) = data { - new_config_params = - Some(data.parse::()?); - } - } - } - - shard_accounts.set_any( - &updated_account.account_addr, - &DepthBalanceInfo { - split_depth: 0, // TODO: fix - balance: account.balance.clone(), - }, - &updated_account.shard_account, - )?; - } - None => { - shard_accounts.remove(&updated_account.account_addr)?; - } - } - - if is_masterchain { - updated_account - .update_public_libraries(&loaded_account, &mut global_libraries)?; - } - - let account_block = AccountBlock { - state_update: updated_account.build_hash_update(), // TODO: fix state update - account: updated_account.account_addr, - transactions: updated_account.transactions.build()?, - }; - - account_blocks.set_any( - &updated_account.account_addr, - account_block.transactions.root_extra(), - &account_block, - )?; - } + let mut build_account_blocks_elapsed = Default::default(); + let mut shards_info_res = Ok((Default::default(), Default::default(), None)); + let mut in_msgs_res = Ok(Default::default()); + let mut out_msgs_res = Ok(Default::default()); + let mut build_in_msgs_elapsed = Default::default(); + let mut build_out_msgs_elapsed = Default::default(); + let in_msgs_iter = &mut collation_data.in_msgs; + let out_msgs_iter = &mut collation_data.out_msgs; + rayon::scope(|s| { + s.spawn(|_| { + let histogram = HistogramGuard::begin_with_labels( + "tycho_collator_finalize_build_account_blocks_time", + labels, + ); + shards_info_res = self.calculate_shards_info( + exec_manager, + prev_shard_data, + is_masterchain, + *config_address, + &mut global_libraries, + ); + build_account_blocks_elapsed = histogram.finish(); + }); + s.spawn(|_| { + let histogram = HistogramGuard::begin_with_labels( + "tycho_collator_finalize_build_in_msgs_time", + labels, + ); + in_msgs_res = self.calculate_in_msgs(in_msgs_iter); + build_in_msgs_elapsed = histogram.finish(); + }); + s.spawn(|_| { + let histogram = HistogramGuard::begin_with_labels( + "tycho_collator_finalize_build_out_msgs_time", + labels, + ); + out_msgs_res = self.calculate_out_msgs(out_msgs_iter); + build_out_msgs_elapsed = histogram.finish(); + }); + }); - build_account_blocks_elapsed = histogram.finish(); - (account_blocks.build()?, shard_accounts.build()?) - }; + let (account_blocks, shard_accounts, new_config_params) = shards_info_res?; + let in_msgs = in_msgs_res?; + let out_msgs = out_msgs_res?; // TODO: update new_config_opt from hard fork - // calc value flow let mut value_flow = collation_data.value_flow.clone(); - - let build_in_msgs_elapsed; - let in_msgs = { - let histogram = HistogramGuard::begin_with_labels( - "tycho_collator_finalize_build_in_msgs_time", - labels, - ); - - let mut in_msgs = RelaxedAugDict::new(); - // TODO: use more effective algorithm than iter and set - for (msg_id, msg) in collation_data.in_msgs.iter() { - in_msgs.set_as_lazy(msg_id, &msg.import_fees, &msg.in_msg)?; - } - - build_in_msgs_elapsed = histogram.finish(); - in_msgs.build()? - }; value_flow.imported = in_msgs.root_extra().value_imported.clone(); - - let build_out_msgs_elapsed; - let out_msgs = { - let histogram = HistogramGuard::begin_with_labels( - "tycho_collator_finalize_build_out_msgs_time", - labels, - ); - - let mut out_msgs = RelaxedAugDict::new(); - // TODO: use more effective algorithm than iter and set - for (msg_id, msg) in collation_data.out_msgs.iter() { - out_msgs.set_as_lazy(msg_id, &msg.exported_value, &msg.out_msg)?; - } - - build_out_msgs_elapsed = histogram.finish(); - out_msgs.build()? - }; - value_flow.exported = out_msgs.root_extra().clone(); value_flow.fees_collected = account_blocks.root_extra().clone(); value_flow @@ -541,6 +486,93 @@ impl CollatorStdImpl { Ok(min_ref_mc_seqno) } + fn calculate_shards_info( + &self, + exec_manager: ExecutionManager, + prev_shard_data: &PrevData, + is_masterchain: bool, + config_address: HashBytes, + global_libraries: &mut Dict, + ) -> Result<(AccountBlocks, ShardAccounts, Option)> { + let mut account_blocks = RelaxedAugDict::new(); + let mut shard_accounts = RelaxedAugDict::from_full(prev_shard_data.observable_accounts()); + let mut new_config_params = None; + + for updated_account in exec_manager.into_changed_accounts() { + if updated_account.transactions.is_empty() { + continue; + } + + let loaded_account = updated_account.shard_account.load_account()?; + match &loaded_account { + Some(account) => { + if is_masterchain && updated_account.account_addr == config_address { + if let AccountState::Active(StateInit { data, .. }) = &account.state { + if let Some(data) = data { + new_config_params = Some(data.parse::()?); + } + } + } + + shard_accounts.set_any( + &updated_account.account_addr, + &DepthBalanceInfo { + split_depth: 0, // TODO: fix + balance: account.balance.clone(), + }, + &updated_account.shard_account, + )?; + } + None => { + shard_accounts.remove(&updated_account.account_addr)?; + } + } + + if is_masterchain { + updated_account.update_public_libraries(&loaded_account, global_libraries)?; + } + + let account_block = AccountBlock { + state_update: updated_account.build_hash_update(), // TODO: fix state update + account: updated_account.account_addr, + transactions: updated_account.transactions.build()?, + }; + + account_blocks.set_any( + &updated_account.account_addr, + account_block.transactions.root_extra(), + &account_block, + )?; + } + Ok(( + account_blocks.build()?, + shard_accounts.build()?, + new_config_params, + )) + } + fn calculate_in_msgs( + &self, + in_msgs_map: &mut BTreeMap, + ) -> Result { + let mut in_msgs = RelaxedAugDict::new(); + // TODO: use more effective algorithm than iter and set + for (msg_id, msg) in in_msgs_map.iter() { + in_msgs.set_as_lazy(msg_id, &msg.import_fees, &msg.in_msg)?; + } + Ok(in_msgs.build()?) + } + fn calculate_out_msgs( + &self, + out_msgs_map: &mut BTreeMap, + ) -> Result { + let mut out_msgs = RelaxedAugDict::new(); + // TODO: use more effective algorithm than iter and set + for (msg_id, msg) in out_msgs_map.iter() { + out_msgs.set_as_lazy(msg_id, &msg.exported_value, &msg.out_msg)?; + } + Ok(out_msgs.build()?) + } + #[cfg(feature = "block-creator-stats")] fn update_block_creator_stats( collation_data: &BlockCollationData,