Skip to content

Commit

Permalink
feat(collator): store shard state after collation (#153)
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon authored Jul 3, 2024
2 parents f2bf869 + c5525a1 commit 69ff05b
Show file tree
Hide file tree
Showing 16 changed files with 269 additions and 155 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

42 changes: 41 additions & 1 deletion block-util/src/dict.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use everscale_types::dict::{
aug_dict_insert, aug_dict_remove_owned, AugDictExtra, DictKey, SetMode,
aug_dict_insert, aug_dict_remove_owned, build_aug_dict_from_sorted_iter, AugDictExtra, DictKey,
SetMode,
};
use everscale_types::error::Error;
use everscale_types::models::Lazy;
Expand Down Expand Up @@ -44,6 +45,45 @@ where
K: Store + DictKey,
for<'a> A: AugDictExtra + Store + Load<'a>,
{
pub fn try_from_sorted_iter_lazy<'a, I>(iter: I) -> Result<Self, Error>
where
I: IntoIterator<Item = (&'a K, &'a A, &'a Lazy<V>)>,
K: Ord + 'a,
A: 'a,
V: 'a,
{
Ok(Self {
dict_root: build_aug_dict_from_sorted_iter(
iter.into_iter().map(|(k, a, v)| {
// SAFETY: We know that this cell is not a library cell.
let value = unsafe { v.inner().as_slice_unchecked() };
(k, a, value)
}),
K::BITS,
A::comp_add,
&mut Cell::empty_context(),
)?,
_marker: std::marker::PhantomData,
})
}

pub fn try_from_sorted_iter_any<'a, I>(iter: I) -> Result<Self, Error>
where
I: IntoIterator<Item = (&'a K, &'a A, &'a dyn Store)>,
K: Ord + 'a,
A: 'a,
{
Ok(Self {
dict_root: build_aug_dict_from_sorted_iter(
iter,
K::BITS,
A::comp_add,
&mut Cell::empty_context(),
)?,
_marker: std::marker::PhantomData,
})
}

pub fn set_as_lazy(&mut self, key: &K, extra: &A, value: &Lazy<V>) -> Result<bool, Error> {
self.set_any(key, extra, &value.inner().as_slice()?)
}
Expand Down
27 changes: 14 additions & 13 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ use tycho_collator::validator::config::ValidatorConfig;
use tycho_collator::validator::validator::ValidatorStdImplFactory;
use tycho_core::block_strider::{
BlockProvider, BlockStrider, BlockchainBlockProvider, BlockchainBlockProviderConfig,
MetricsSubscriber, OptionalBlockStuff, PersistentBlockStriderState, StateSubscriber,
StateSubscriberContext, StorageBlockProvider,
MetricsSubscriber, OptionalBlockStuff, PersistentBlockStriderState, ShardStateApplier,
StateSubscriber, StateSubscriberContext, StorageBlockProvider,
};
use tycho_core::blockchain_rpc::{
BlockchainRpcClient, BlockchainRpcService, BlockchainRpcServiceConfig, BroadcastListener,
Expand Down Expand Up @@ -481,12 +481,10 @@ impl Node {
let state_storage = self.storage.shard_state_storage();

for state in to_import {
let (handle, status) =
handle_storage.create_or_load_handle(state.block_id(), BlockMetaData {
is_key_block: state.block_id().is_masterchain(),
gen_utime,
mc_ref_seqno: 0,
});
let (handle, status) = handle_storage.create_or_load_handle(
state.block_id(),
BlockMetaData::zero_state(gen_utime, state.block_id().is_masterchain()),
);

let stored = state_storage
.store_state(&handle, &state)
Expand Down Expand Up @@ -674,11 +672,14 @@ impl Node {
collator_block_provider,
))
.with_state(strider_state)
.with_state_subscriber(
self.state_tracker.clone(),
self.storage.clone(),
((collator_state_subscriber, rpc_state), MetricsSubscriber),
)
.with_block_subscriber((
ShardStateApplier::new(
self.state_tracker.clone(),
self.storage.clone(),
(collator_state_subscriber, rpc_state),
),
MetricsSubscriber,
))
.build();

// Run block strider
Expand Down
66 changes: 28 additions & 38 deletions collator/src/collator/build_block.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::collections::BTreeMap;
use std::time::Duration;

use anyhow::{bail, Result};
Expand All @@ -8,7 +9,6 @@ use humantime::format_duration;
use tokio::time::Instant;
use tycho_block_util::config::BlockchainConfigExt;
use tycho_block_util::dict::RelaxedAugDict;
use tycho_block_util::state::ShardStateStuff;
use tycho_util::metrics::HistogramGuard;

use super::execution_manager::ExecutionManager;
Expand All @@ -22,7 +22,7 @@ impl CollatorStdImpl {
&mut self,
collation_data: &mut BlockCollationData,
exec_manager: ExecutionManager,
) -> Result<(Box<BlockCandidate>, ShardStateStuff)> {
) -> Result<(Box<BlockCandidate>, Cell)> {
tracing::debug!(target: tracing_targets::COLLATOR, "finalize_block()");

let labels = &[("workchain", self.shard_id.workchain().to_string())];
Expand All @@ -46,7 +46,7 @@ impl CollatorStdImpl {
labels,
);

let mut account_blocks = RelaxedAugDict::new();
let mut account_blocks = BTreeMap::new();
let mut shard_accounts =
RelaxedAugDict::from_full(prev_shard_data.observable_accounts());

Expand Down Expand Up @@ -89,17 +89,21 @@ impl CollatorStdImpl {
let account_block = AccountBlock {
state_update: updated_account.build_hash_update(), // TODO: fix state update
account: updated_account.account_addr,
transactions: updated_account.transactions.build()?,
transactions: AugDict::try_from_btree(&updated_account.transactions)?,
};

account_blocks.set_any(
&updated_account.account_addr,
account_block.transactions.root_extra(),
&account_block,
)?;
account_blocks.insert(updated_account.account_addr, account_block);
}

build_account_blocks_elapsed = histogram.finish();

// TODO: Somehow consume accounts inside an iterator
let account_blocks = RelaxedAugDict::try_from_sorted_iter_any(
account_blocks
.iter()
.map(|(k, v)| (k, v.transactions.root_extra(), v as &dyn Store)),
)?;

(account_blocks.build()?, shard_accounts.build()?)
};

Expand All @@ -115,11 +119,12 @@ impl CollatorStdImpl {
labels,
);

let mut in_msgs = RelaxedAugDict::new();
// TODO: use more effective algorithm than iter and set
for (msg_id, msg) in collation_data.in_msgs.iter() {
in_msgs.set_as_lazy(msg_id, &msg.import_fees, &msg.in_msg)?;
}
let in_msgs = RelaxedAugDict::try_from_sorted_iter_lazy(
collation_data
.in_msgs
.iter()
.map(|(msg_id, msg)| (msg_id, &msg.import_fees, &msg.in_msg)),
)?;

build_in_msgs_elapsed = histogram.finish();
in_msgs.build()?
Expand All @@ -133,11 +138,12 @@ impl CollatorStdImpl {
labels,
);

let mut out_msgs = RelaxedAugDict::new();
// TODO: use more effective algorithm than iter and set
for (msg_id, msg) in collation_data.out_msgs.iter() {
out_msgs.set_as_lazy(msg_id, &msg.exported_value, &msg.out_msg)?;
}
let out_msgs = RelaxedAugDict::try_from_sorted_iter_lazy(
collation_data
.out_msgs
.iter()
.map(|(msg_id, msg)| (msg_id, &msg.exported_value, &msg.out_msg)),
)?;

build_out_msgs_elapsed = histogram.finish();
out_msgs.build()?
Expand Down Expand Up @@ -217,6 +223,7 @@ impl CollatorStdImpl {
}

let build_state_update_elapsed;
let new_state_root;
let state_update = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_state_update_time",
Expand Down Expand Up @@ -261,7 +268,7 @@ impl CollatorStdImpl {

// TODO: update smc on hard fork

let new_state_root = CellBuilder::build_from(&new_observable_state)?;
new_state_root = CellBuilder::build_from(&new_observable_state)?;

// calc merkle update
let merkle_update = create_merkle_update(
Expand Down Expand Up @@ -364,22 +371,6 @@ impl CollatorStdImpl {
// build new shard state using merkle update
// to get updated state without UsageTree

let build_new_state_elapsed;
let new_state_stuff = {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_finalize_build_new_state_time",
labels,
);

let pure_prev_state_root = prev_shard_data.pure_state_root();
let new_state_root = state_update.apply(pure_prev_state_root)?;
let new_state_stuff =
ShardStateStuff::from_root(&new_block_id, new_state_root, &self.state_tracker)?;

build_new_state_elapsed = histogram.finish();
new_state_stuff
};

let total_elapsed = histogram.finish();

tracing::debug!(
Expand All @@ -391,11 +382,10 @@ impl CollatorStdImpl {
build_mc_state_extra = %format_duration(build_mc_state_extra_elapsed),
build_state_update = %format_duration(build_state_update_elapsed),
build_block = %format_duration(build_block_elapsed),
build_new_state = %format_duration(build_new_state_elapsed),
"finalize block timings"
);

Ok((block_candidate, new_state_stuff))
Ok((block_candidate, new_state_root))
}

fn create_mc_state_extra(
Expand Down
21 changes: 20 additions & 1 deletion collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use everscale_types::prelude::*;
use humantime::format_duration;
use sha2::Digest;
use ton_executor::{ExecuteParams, ExecutorOutput, PreloadedBlockchainConfig};
use tycho_storage::BlockMetaData;
use tycho_util::futures::JoinTask;
use tycho_util::metrics::HistogramGuard;
use tycho_util::time::now_millis;
use tycho_util::FastHashMap;
Expand Down Expand Up @@ -545,14 +547,29 @@ impl CollatorStdImpl {
let finalize_block_timer = std::time::Instant::now();
// TODO: Move into rayon
tokio::task::yield_now().await;
let (candidate, new_state_stuff) =
let (candidate, new_state_root) =
tokio::task::block_in_place(|| self.finalize_block(&mut collation_data, exec_manager))?;
tokio::task::yield_now().await;
let finalize_block_elapsed = finalize_block_timer.elapsed();

metrics::counter!("tycho_do_collate_blocks_count", labels).increment(1);
metrics::gauge!("tycho_do_collate_block_seqno", labels).set(self.next_block_id_short.seqno);

let new_state_stuff = JoinTask::new({
let block_id = candidate.block_id;
let meta = BlockMetaData {
is_key_block: false, // TODO: set from collation data
gen_utime: collation_data.gen_utime,
mc_ref_seqno: None,
};
let adapter = self.state_node_adapter.clone();
async move {
adapter
.store_state_root(&block_id, meta, new_state_root)
.await
}
});

let apply_queue_diff_elapsed;
{
let histogram =
Expand All @@ -570,6 +587,8 @@ impl CollatorStdImpl {
labels,
);

let new_state_stuff = new_state_stuff.await?;

// return collation result
self.listener
.on_block_candidate(BlockCollationResult {
Expand Down
22 changes: 10 additions & 12 deletions collator/src/collator/execution_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,6 @@ impl ExecutionManager {
cmp::max(self.min_next_lt, executor_output.account_last_trans_lt);

items.push(ExecutedTickItem {
account_addr: executed.account_state.account_addr,
in_message: tx.in_message,
executor_output,
});
Expand Down Expand Up @@ -374,7 +373,6 @@ impl ExecutedTick {
}

pub struct ExecutedTickItem {
pub account_addr: AccountId,
pub in_message: Box<ParsedMessage>,
pub executor_output: ExecutorOutput,
}
Expand Down Expand Up @@ -416,16 +414,16 @@ fn execute_ordinary_transaction_impl(
config,
);

if let Ok((total_fees, executor_output)) = &result {
// TODO replace with batch set
let tx_lt = shard_account.last_trans_lt;
account_stuff.add_transaction(tx_lt, total_fees, &executor_output.transaction)?;
}
let result = match result {
Ok((total_fees, executor_output)) => {
let tx_lt = shard_account.last_trans_lt;
account_stuff.add_transaction(tx_lt, total_fees, executor_output.transaction.clone());
Ok(executor_output)
}
Err(e) => Err(e),
};

Ok(ExecutedOrdinaryTransaction {
result: result.map(|(_, exec_out)| exec_out),
in_message,
})
Ok(ExecutedOrdinaryTransaction { result, in_message })
}

fn execute_ticktock_transaction(
Expand All @@ -452,7 +450,7 @@ fn execute_ticktock_transaction(

// TODO replace with batch set
let tx_lt = shard_account.last_trans_lt;
account_stuff.add_transaction(tx_lt, &total_fees, &executor_output.transaction)?;
account_stuff.add_transaction(tx_lt, total_fees, executor_output.transaction.clone());

Ok(executor_output)
}
Expand Down
Loading

0 comments on commit 69ff05b

Please sign in to comment.