Skip to content

Commit

Permalink
feat(collator): impl finalize block in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
serejkaaa512 authored and Rexagon committed Jul 3, 2024
1 parent 6dec9bb commit 57a31c3
Show file tree
Hide file tree
Showing 2 changed files with 159 additions and 119 deletions.
14 changes: 7 additions & 7 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,27 @@ backon = { workspace = true }
base64 = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
everscale-crypto = { workspace = true }
everscale-types = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
indexmap = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true, features = ["preserve_order"] }
sha2 = { workspace = true }
tl-proto = { workspace = true }
tempfile = { workspace = true }
tl-proto = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt", "signal"] }
tokio-util = { workspace = true }
ton_executor = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
ton_executor = { workspace = true }
trait-variant = { workspace = true }
weedb = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true, features = ["preserve_order"] }
rayon = { workspace = true }
everscale-types = { workspace = true }
everscale-crypto = { workspace = true }

# local deps
tycho-block-util = { workspace = true }
Expand Down
264 changes: 152 additions & 112 deletions collator/src/collator/build_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use tycho_util::metrics::HistogramGuard;

use super::execution_manager::ExecutionManager;
use super::CollatorStdImpl;
use crate::collator::types::BlockCollationData;
use crate::collator::types::{BlockCollationData, PreparedInMsg, PreparedOutMsg, PrevData};
use crate::tracing_targets;
use crate::types::BlockCandidate;

Expand All @@ -33,124 +33,62 @@ impl CollatorStdImpl {
let prev_shard_data = &self.working_state().prev_shard_data;

// update shard accounts tree and prepare accounts blocks
let mut new_config_params = None::<BlockchainConfigParams>;
let mut global_libraries = exec_manager.executor_params().state_libs.clone();

let is_masterchain = collation_data.block_id_short.shard.is_masterchain();
let config_address = &self.working_state().mc_data.config().address;

let build_account_blocks_elapsed;
let (account_blocks, shard_accounts) = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_account_blocks_time",
labels,
);

let mut account_blocks = BTreeMap::new();
let mut shard_accounts =
RelaxedAugDict::from_full(prev_shard_data.observable_accounts());

for updated_account in exec_manager.into_changed_accounts() {
if updated_account.transactions.is_empty() {
continue;
}

let loaded_account = updated_account.shard_account.load_account()?;
match &loaded_account {
Some(account) => {
if is_masterchain && &updated_account.account_addr == config_address {
if let AccountState::Active(StateInit { data, .. }) = &account.state {
if let Some(data) = data {
new_config_params =
Some(data.parse::<BlockchainConfigParams>()?);
}
}
}

shard_accounts.set_any(
&updated_account.account_addr,
&DepthBalanceInfo {
split_depth: 0, // TODO: fix
balance: account.balance.clone(),
},
&updated_account.shard_account,
)?;
}
None => {
shard_accounts.remove(&updated_account.account_addr)?;
}
}

if is_masterchain {
updated_account
.update_public_libraries(&loaded_account, &mut global_libraries)?;
}

let account_block = AccountBlock {
state_update: updated_account.build_hash_update(), // TODO: fix state update
account: updated_account.account_addr,
transactions: AugDict::try_from_btree(&updated_account.transactions)?,
};

account_blocks.insert(updated_account.account_addr, account_block);
}
let mut processed_accounts_res = Ok(Default::default());
let mut build_account_blocks_elapsed = Duration::ZERO;
let mut in_msgs_res = Ok(Default::default());
let mut build_in_msgs_elapsed = Duration::ZERO;
let mut out_msgs_res = Ok(Default::default());
let mut build_out_msgs_elapsed = Duration::ZERO;
rayon::scope(|s| {
s.spawn(|_| {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_account_blocks_time",
labels,
);

build_account_blocks_elapsed = histogram.finish();
processed_accounts_res = self.build_accounts(
exec_manager,
prev_shard_data,
is_masterchain,
config_address,
&mut global_libraries,
);
build_account_blocks_elapsed = histogram.finish();
});
s.spawn(|_| {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_in_msgs_time",
labels,
);
in_msgs_res = self.build_in_msgs(&collation_data.in_msgs);
build_in_msgs_elapsed = histogram.finish();
});
s.spawn(|_| {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_out_msgs_time",
labels,
);
out_msgs_res = self.build_out_msgs(&collation_data.out_msgs);
build_out_msgs_elapsed = histogram.finish();
});
});

// TODO: Somehow consume accounts inside an iterator
let account_blocks = RelaxedAugDict::try_from_sorted_iter_any(
account_blocks
.iter()
.map(|(k, v)| (k, v.transactions.root_extra(), v as &dyn Store)),
)?;

(account_blocks.build()?, shard_accounts.build()?)
};
let processed_accounts = processed_accounts_res?;
let in_msgs = in_msgs_res?;
let out_msgs = out_msgs_res?;

// TODO: update new_config_opt from hard fork

// calc value flow
let mut value_flow = collation_data.value_flow.clone();

let build_in_msgs_elapsed;
let in_msgs = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_in_msgs_time",
labels,
);

let in_msgs = RelaxedAugDict::try_from_sorted_iter_lazy(
collation_data
.in_msgs
.iter()
.map(|(msg_id, msg)| (msg_id, &msg.import_fees, &msg.in_msg)),
)?;

build_in_msgs_elapsed = histogram.finish();
in_msgs.build()?
};
value_flow.imported = in_msgs.root_extra().value_imported.clone();

let build_out_msgs_elapsed;
let out_msgs = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_out_msgs_time",
labels,
);

let out_msgs = RelaxedAugDict::try_from_sorted_iter_lazy(
collation_data
.out_msgs
.iter()
.map(|(msg_id, msg)| (msg_id, &msg.exported_value, &msg.out_msg)),
)?;

build_out_msgs_elapsed = histogram.finish();
out_msgs.build()?
};

value_flow.exported = out_msgs.root_extra().clone();
value_flow.fees_collected = account_blocks.root_extra().clone();
value_flow.fees_collected = processed_accounts.account_blocks.root_extra().clone();
value_flow
.fees_collected
.try_add_assign_tokens(in_msgs.root_extra().fees_collected)?;
Expand All @@ -160,7 +98,11 @@ impl CollatorStdImpl {
value_flow
.fees_collected
.try_add_assign(&value_flow.created)?;
value_flow.to_next_block = shard_accounts.root_extra().balance.clone();
value_flow.to_next_block = processed_accounts
.shard_accounts
.root_extra()
.balance
.clone();

// build master state extra or get a ref to last applied master block
// TODO: extract min_ref_mc_seqno from processed_upto info when we have many shards
Expand All @@ -174,10 +116,12 @@ impl CollatorStdImpl {

let (extra, min_ref_mc_seqno) = self.create_mc_state_extra(
collation_data,
new_config_params.map(|params| BlockchainConfig {
address: *config_address,
params,
}),
processed_accounts
.new_config_params
.map(|params| BlockchainConfig {
address: *config_address,
params,
}),
)?;
collation_data.update_ref_min_mc_seqno(min_ref_mc_seqno);

Expand Down Expand Up @@ -242,7 +186,7 @@ impl CollatorStdImpl {
min_ref_mc_seqno: new_block_info.min_ref_mc_seqno,
processed_upto: Lazy::new(&collation_data.processed_upto)?,
before_split: new_block_info.before_split,
accounts: Lazy::new(&shard_accounts)?,
accounts: Lazy::new(&processed_accounts.shard_accounts)?,
overload_history: prev_shard_data.gas_used_from_last_anchor()
+ collation_data.block_limit.gas_used as u64,
underload_history: 0,
Expand Down Expand Up @@ -294,7 +238,7 @@ impl CollatorStdImpl {
let mut new_block_extra = BlockExtra {
in_msg_description: Lazy::new(&in_msgs)?,
out_msg_description: Lazy::new(&out_msgs)?,
account_blocks: Lazy::new(&account_blocks)?,
account_blocks: Lazy::new(&processed_accounts.account_blocks)?,
rand_seed: collation_data.rand_seed,
created_by: collation_data.created_by,
..Default::default()
Expand Down Expand Up @@ -531,6 +475,95 @@ impl CollatorStdImpl {
Ok(min_ref_mc_seqno)
}

fn build_accounts(
&self,
exec_manager: ExecutionManager,
prev_shard_data: &PrevData,
is_masterchain: bool,
config_address: &HashBytes,
global_libraries: &mut Dict<HashBytes, LibDescr>,
) -> Result<ProcessedAccounts> {
let mut account_blocks = BTreeMap::new();
let mut shard_accounts = RelaxedAugDict::from_full(prev_shard_data.observable_accounts());
let mut new_config_params = None;

for updated_account in exec_manager.into_changed_accounts() {
if updated_account.transactions.is_empty() {
continue;
}

let loaded_account = updated_account.shard_account.load_account()?;
match &loaded_account {
Some(account) => {
if is_masterchain && &updated_account.account_addr == config_address {
if let AccountState::Active(StateInit { data, .. }) = &account.state {
if let Some(data) = data {
new_config_params = Some(data.parse::<BlockchainConfigParams>()?);
}
}
}

shard_accounts.set_any(
&updated_account.account_addr,
&DepthBalanceInfo {
split_depth: 0, // TODO: fix
balance: account.balance.clone(),
},
&updated_account.shard_account,
)?;
}
None => {
shard_accounts.remove(&updated_account.account_addr)?;
}
}

if is_masterchain {
updated_account.update_public_libraries(&loaded_account, global_libraries)?;
}

let account_block = AccountBlock {
state_update: updated_account.build_hash_update(), // TODO: fix state update
account: updated_account.account_addr,
transactions: AugDict::try_from_btree(&updated_account.transactions)?,
};

account_blocks.insert(updated_account.account_addr, account_block);
}

// TODO: Somehow consume accounts inside an iterator
let account_blocks = RelaxedAugDict::try_from_sorted_iter_any(
account_blocks
.iter()
.map(|(k, v)| (k, v.transactions.root_extra(), v as &dyn Store)),
)?;

Ok(ProcessedAccounts {
account_blocks: account_blocks.build()?,
shard_accounts: shard_accounts.build()?,
new_config_params,
})
}

fn build_in_msgs(&self, items: &BTreeMap<HashBytes, PreparedInMsg>) -> Result<InMsgDescr> {
RelaxedAugDict::try_from_sorted_iter_lazy(
items
.iter()
.map(|(msg_id, msg)| (msg_id, &msg.import_fees, &msg.in_msg)),
)?
.build()
.map_err(Into::into)
}

fn build_out_msgs(&self, items: &BTreeMap<HashBytes, PreparedOutMsg>) -> Result<OutMsgDescr> {
RelaxedAugDict::try_from_sorted_iter_lazy(
items
.iter()
.map(|(msg_id, msg)| (msg_id, &msg.exported_value, &msg.out_msg)),
)?
.build()
.map_err(Into::into)
}

#[cfg(feature = "block-creator-stats")]
fn update_block_creator_stats(
collation_data: &BlockCollationData,
Expand Down Expand Up @@ -598,6 +631,13 @@ impl CollatorStdImpl {
}
}

#[derive(Default)]
struct ProcessedAccounts {
account_blocks: AccountBlocks,
shard_accounts: ShardAccounts,
new_config_params: Option<BlockchainConfigParams>,
}

fn create_merkle_update(
old_state_root: &Cell,
new_state_root: &Cell,
Expand Down

0 comments on commit 57a31c3

Please sign in to comment.