Skip to content

Commit

Permalink
feat(collator): impl finalize block in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
serejkaaa512 committed Jun 29, 2024
1 parent 8f59f34 commit 1de201f
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 100 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ indexmap = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
sha2 = { workspace = true }
tl-proto = { workspace = true }
tempfile = { workspace = true }
Expand Down
232 changes: 132 additions & 100 deletions collator/src/collator/build_block.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::time::Duration;

use anyhow::{bail, Result};
Expand All @@ -13,7 +14,7 @@ use tycho_util::metrics::HistogramGuard;

use super::execution_manager::ExecutionManager;
use super::CollatorStdImpl;
use crate::collator::types::BlockCollationData;
use crate::collator::types::{BlockCollationData, PreparedInMsg, PreparedOutMsg, PrevData};
use crate::tracing_targets;
use crate::types::BlockCandidate;

Expand All @@ -33,116 +34,60 @@ impl CollatorStdImpl {
let prev_shard_data = &self.working_state().prev_shard_data;

// update shard accounts tree and prepare accounts blocks
let mut new_config_params = None::<BlockchainConfigParams>;
let mut global_libraries = exec_manager.executor_params().state_libs.clone();

let is_masterchain = collation_data.block_id_short.shard.is_masterchain();
let config_address = &self.working_state().mc_data.config().address;

let build_account_blocks_elapsed;
let (account_blocks, shard_accounts) = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_account_blocks_time",
labels,
);

let mut account_blocks = RelaxedAugDict::new();
let mut shard_accounts =
RelaxedAugDict::from_full(prev_shard_data.observable_accounts());

for updated_account in exec_manager.into_changed_accounts() {
if updated_account.transactions.is_empty() {
continue;
}

let loaded_account = updated_account.shard_account.load_account()?;
match &loaded_account {
Some(account) => {
if is_masterchain && &updated_account.account_addr == config_address {
if let AccountState::Active(StateInit { data, .. }) = &account.state {
if let Some(data) = data {
new_config_params =
Some(data.parse::<BlockchainConfigParams>()?);
}
}
}

shard_accounts.set_any(
&updated_account.account_addr,
&DepthBalanceInfo {
split_depth: 0, // TODO: fix
balance: account.balance.clone(),
},
&updated_account.shard_account,
)?;
}
None => {
shard_accounts.remove(&updated_account.account_addr)?;
}
}

if is_masterchain {
updated_account
.update_public_libraries(&loaded_account, &mut global_libraries)?;
}

let account_block = AccountBlock {
state_update: updated_account.build_hash_update(), // TODO: fix state update
account: updated_account.account_addr,
transactions: updated_account.transactions.build()?,
};

account_blocks.set_any(
&updated_account.account_addr,
account_block.transactions.root_extra(),
&account_block,
)?;
}
let mut build_account_blocks_elapsed = Default::default();
let mut shards_info_res = Ok((Default::default(), Default::default(), None));
let mut in_msgs_res = Ok(Default::default());
let mut out_msgs_res = Ok(Default::default());
let mut build_in_msgs_elapsed = Default::default();
let mut build_out_msgs_elapsed = Default::default();
let in_msgs_iter = &mut collation_data.in_msgs;
let out_msgs_iter = &mut collation_data.out_msgs;
rayon::scope(|s| {
s.spawn(|_| {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_account_blocks_time",
labels,
);
shards_info_res = self.calculate_shards_info(
exec_manager,
prev_shard_data,
is_masterchain,
*config_address,
&mut global_libraries,
);
build_account_blocks_elapsed = histogram.finish();
});
s.spawn(|_| {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_in_msgs_time",
labels,
);
in_msgs_res = self.calculate_in_msgs(in_msgs_iter);
build_in_msgs_elapsed = histogram.finish();
});
s.spawn(|_| {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_out_msgs_time",
labels,
);
out_msgs_res = self.calculate_out_msgs(out_msgs_iter);
build_out_msgs_elapsed = histogram.finish();
});
});

build_account_blocks_elapsed = histogram.finish();
(account_blocks.build()?, shard_accounts.build()?)
};
let (account_blocks, shard_accounts, new_config_params) = shards_info_res?;
let in_msgs = in_msgs_res?;
let out_msgs = out_msgs_res?;

// TODO: update new_config_opt from hard fork

// calc value flow
let mut value_flow = collation_data.value_flow.clone();

let build_in_msgs_elapsed;
let in_msgs = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_in_msgs_time",
labels,
);

let mut in_msgs = RelaxedAugDict::new();
// TODO: use more effective algorithm than iter and set
for (msg_id, msg) in collation_data.in_msgs.iter() {
in_msgs.set_as_lazy(msg_id, &msg.import_fees, &msg.in_msg)?;
}

build_in_msgs_elapsed = histogram.finish();
in_msgs.build()?
};
value_flow.imported = in_msgs.root_extra().value_imported.clone();

let build_out_msgs_elapsed;
let out_msgs = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_out_msgs_time",
labels,
);

let mut out_msgs = RelaxedAugDict::new();
// TODO: use more effective algorithm than iter and set
for (msg_id, msg) in collation_data.out_msgs.iter() {
out_msgs.set_as_lazy(msg_id, &msg.exported_value, &msg.out_msg)?;
}

build_out_msgs_elapsed = histogram.finish();
out_msgs.build()?
};

value_flow.exported = out_msgs.root_extra().clone();
value_flow.fees_collected = account_blocks.root_extra().clone();
value_flow
Expand Down Expand Up @@ -541,6 +486,93 @@ impl CollatorStdImpl {
Ok(min_ref_mc_seqno)
}

fn calculate_shards_info(
&self,
exec_manager: ExecutionManager,
prev_shard_data: &PrevData,
is_masterchain: bool,
config_address: HashBytes,
global_libraries: &mut Dict<HashBytes, LibDescr>,
) -> Result<(AccountBlocks, ShardAccounts, Option<BlockchainConfigParams>)> {
let mut account_blocks = RelaxedAugDict::new();
let mut shard_accounts = RelaxedAugDict::from_full(prev_shard_data.observable_accounts());
let mut new_config_params = None;

for updated_account in exec_manager.into_changed_accounts() {
if updated_account.transactions.is_empty() {
continue;
}

let loaded_account = updated_account.shard_account.load_account()?;
match &loaded_account {
Some(account) => {
if is_masterchain && updated_account.account_addr == config_address {
if let AccountState::Active(StateInit { data, .. }) = &account.state {
if let Some(data) = data {
new_config_params = Some(data.parse::<BlockchainConfigParams>()?);
}
}
}

shard_accounts.set_any(
&updated_account.account_addr,
&DepthBalanceInfo {
split_depth: 0, // TODO: fix
balance: account.balance.clone(),
},
&updated_account.shard_account,
)?;
}
None => {
shard_accounts.remove(&updated_account.account_addr)?;
}
}

if is_masterchain {
updated_account.update_public_libraries(&loaded_account, global_libraries)?;
}

let account_block = AccountBlock {
state_update: updated_account.build_hash_update(), // TODO: fix state update
account: updated_account.account_addr,
transactions: updated_account.transactions.build()?,
};

account_blocks.set_any(
&updated_account.account_addr,
account_block.transactions.root_extra(),
&account_block,
)?;
}
Ok((
account_blocks.build()?,
shard_accounts.build()?,
new_config_params,
))
}
fn calculate_in_msgs(
&self,
in_msgs_map: &mut BTreeMap<HashBytes, PreparedInMsg>,
) -> Result<InMsgDescr> {
let mut in_msgs = RelaxedAugDict::new();
// TODO: use more effective algorithm than iter and set
for (msg_id, msg) in in_msgs_map.iter() {
in_msgs.set_as_lazy(msg_id, &msg.import_fees, &msg.in_msg)?;
}
Ok(in_msgs.build()?)
}
fn calculate_out_msgs(
&self,
out_msgs_map: &mut BTreeMap<HashBytes, PreparedOutMsg>,
) -> Result<OutMsgDescr> {
let mut out_msgs = RelaxedAugDict::new();
// TODO: use more effective algorithm than iter and set
for (msg_id, msg) in out_msgs_map.iter() {
out_msgs.set_as_lazy(msg_id, &msg.exported_value, &msg.out_msg)?;
}
Ok(out_msgs.build()?)
}

#[cfg(feature = "block-creator-stats")]
fn update_block_creator_stats(
collation_data: &BlockCollationData,
Expand Down

0 comments on commit 1de201f

Please sign in to comment.