Skip to content

Commit

Permalink
feature(collator): reload shardchain state on mc state update
Browse files Browse the repository at this point in the history
  • Loading branch information
SmaGMan committed Jun 24, 2024
1 parent aec65e3 commit e10add2
Show file tree
Hide file tree
Showing 9 changed files with 182 additions and 56 deletions.
1 change: 1 addition & 0 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,7 @@ impl Node {
},
},
CollatorStdImplFactory,
self.state_tracker.clone(),
#[cfg(test)]
vec![],
);
Expand Down
14 changes: 12 additions & 2 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -547,10 +547,20 @@ impl CollatorStdImpl {
})
.await?;

// update PrevData in working state
self.update_working_state(new_state_stuff)?;
// update PrevData [and McData] in working state
let new_mc_state_stuff_opt = if self.shard_id.is_masterchain() {
Some(new_state_stuff.clone())
} else {
None
};
self.next_block_id_short.seqno += 1;
self.update_working_state(new_mc_state_stuff_opt, vec![new_state_stuff], false)?;
self.update_working_state_pending_internals(Some(has_pending_internals))?;

tracing::debug!(target: tracing_targets::COLLATOR,
"working state updated from just collated block",
);

handle_block_candidate_elapsed = histogram.finish();
}

Expand Down
88 changes: 50 additions & 38 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ impl CollatorStdImpl {
let (mc_state, prev_states) = Self::load_init_states(
self.state_node_adapter.clone(),
self.shard_id,
prev_blocks_ids,
&prev_blocks_ids,
mc_state,
)
.await?;
Expand Down Expand Up @@ -333,33 +333,29 @@ impl CollatorStdImpl {
Ok(())
}

/// Update working state from new block and state after block collation
fn update_working_state(&mut self, new_state_stuff: ShardStateStuff) -> Result<()> {
self.next_block_id_short = BlockIdShort {
shard: new_state_stuff.block_id().shard,
seqno: new_state_stuff.block_id().seqno + 1,
};

/// Update working state
fn update_working_state(
&mut self,
mc_state: Option<ShardStateStuff>,
prev_states: Vec<ShardStateStuff>,
prev_states_loaded_from_storage: bool,
) -> Result<()> {
let working_state_mut = self
.working_state
.as_mut()
.expect("should `init` collator before calling `update_working_state`");

if new_state_stuff.block_id().shard.is_masterchain() {
let new_mc_data = McData::build(new_state_stuff.clone())?;
if let Some(new_mc_state) = mc_state {
let new_mc_data = McData::build(new_mc_state)?;
working_state_mut.mc_data = new_mc_data;
}

let prev_states = vec![new_state_stuff];
Self::check_prev_states_and_master(&working_state_mut.mc_data, &prev_states)?;
let (new_prev_shard_data, usage_tree) = PrevData::build(prev_states, &self.state_tracker)?;
working_state_mut.prev_shard_data = new_prev_shard_data;
working_state_mut.prev_states_loaded_from_storage = prev_states_loaded_from_storage;
working_state_mut.usage_tree = usage_tree;

tracing::debug!(target: tracing_targets::COLLATOR,
"working state updated from just collated block",
);

Ok(())
}

Expand All @@ -377,39 +373,56 @@ impl CollatorStdImpl {
Ok(())
}

/// Update McData in working state
#[tracing::instrument(skip_all, fields(block_id = %self.next_block_id_short))]
fn update_mc_data(&mut self, mc_state: ShardStateStuff) -> Result<()> {
/// Update McData and reload prev state from storage to reduce prev state runtime size
#[tracing::instrument(skip_all, fields(
block_id = %self.next_block_id_short,
mc_block_id = %new_mc_state.block_id().as_short_id(),
))]
async fn update_mc_data(&mut self, new_mc_state: ShardStateStuff) -> Result<()> {
let labels = [("workchain", self.shard_id.workchain().to_string())];

let _histogram =
HistogramGuardWithLabels::begin("tycho_collator_update_mc_data_time", &labels);

let mc_state_block_id_short = mc_state.block_id().as_short_id();
let prev_states_loaded_from_storage = self.working_state().prev_states_loaded_from_storage;

let new_mc_data = McData::build(mc_state)?;
if prev_states_loaded_from_storage {
let working_state_mut = self
.working_state
.as_mut()
.expect("should `init` collator before calling `update_mc_data`");

let working_state_mut = self
.working_state
.as_mut()
.expect("should `init` collator before calling `update_mc_data`");
working_state_mut.mc_data = McData::build(new_mc_state)?;

tracing::debug!(target: tracing_targets::COLLATOR,
"only McData updated in working state from new master state",
);
} else {
let prev_blocks_ids = self.working_state().prev_shard_data.blocks_ids();
let (mc_state, prev_states) = Self::load_init_states(
self.state_node_adapter.clone(),
self.shard_id,
prev_blocks_ids,
new_mc_state,
)
.await?;

working_state_mut.mc_data = new_mc_data;
self.update_working_state(Some(mc_state), prev_states, true)?;

tracing::debug!(target: tracing_targets::COLLATOR,
"McData updated in working state from new master state (block_id={})",
mc_state_block_id_short,
);
tracing::debug!(target: tracing_targets::COLLATOR,
"McData updated in working state from new master state and \
shard prev state reloaded from storage",
);
}

Ok(())
}

/// Load required initial states:
/// master state + list of previous shard states
/// Load required previous shard states (for shardchain)
/// or just return master state (for masterchain)
async fn load_init_states(
state_node_adapter: Arc<dyn StateNodeAdapter>,
shard_id: ShardIdent,
prev_blocks_ids: Vec<BlockId>,
prev_blocks_ids: &[BlockId],
mc_state: ShardStateStuff,
) -> Result<(ShardStateStuff, Vec<ShardStateStuff>)> {
// if current shard is a masterchain then can take current master state
Expand All @@ -421,10 +434,10 @@ impl CollatorStdImpl {
let mut prev_states = vec![];
for prev_block_id in prev_blocks_ids {
// request state for prev block and wait for response
let state = state_node_adapter.load_state(&prev_block_id).await?;
let state = state_node_adapter.load_state(prev_block_id).await?;
tracing::info!(
target: tracing_targets::COLLATOR,
"To init working state loaded prev shard state for prev_block_id {}",
"Prev shard state loaded for prev_block_id {}",
prev_block_id.as_short_id(),
);
prev_states.push(state);
Expand All @@ -444,15 +457,14 @@ impl CollatorStdImpl {
prev_states: Vec<ShardStateStuff>,
state_tracker: &MinRefMcStateTracker,
) -> Result<WorkingState> {
// TODO: make real implementation

let mc_data = McData::build(mc_state)?;
Self::check_prev_states_and_master(&mc_data, &prev_states)?;
let (prev_shard_data, usage_tree) = PrevData::build(prev_states, state_tracker)?;

let working_state = WorkingState {
mc_data,
prev_shard_data,
prev_states_loaded_from_storage: true,
usage_tree,
has_pending_internals: None,
};
Expand Down Expand Up @@ -606,7 +618,7 @@ impl CollatorStdImpl {
}

async fn update_mc_data_and_try_collate(&mut self, mc_state: ShardStateStuff) -> Result<()> {
self.update_mc_data(mc_state)?;
self.update_mc_data(mc_state).await?;
self.update_working_state_pending_internals(None)?;
self.try_collate_next_shard_block_impl().await
}
Expand Down
1 change: 1 addition & 0 deletions collator/src/collator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ use crate::types::ProofFunds;
pub(super) struct WorkingState {
pub mc_data: McData,
pub prev_shard_data: PrevData,
pub prev_states_loaded_from_storage: bool,
pub usage_tree: UsageTree,
pub has_pending_internals: Option<bool>,
}
Expand Down
50 changes: 45 additions & 5 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@ use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use everscale_crypto::ed25519::KeyPair;
use everscale_types::models::{BlockId, BlockIdShort, ShardIdent};
use humantime::format_duration;
use tycho_block_util::block::ValidatorSubsetInfo;
use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
use tycho_util::metrics::HistogramGuard;
use tycho_util::FastHashMap;
use types::BlockCandidateEntry;

use self::types::{
BlockCacheKey, BlockCandidateContainer, BlockCandidateToSend, BlocksCache,
Expand Down Expand Up @@ -219,6 +221,7 @@ where
mpool_adapter_factory: MPF,
validator_factory: VF,
collator_factory: CF,
state_tracker: MinRefMcStateTracker,
#[cfg(any(test, feature = "test"))] test_validators_keypairs: Vec<Arc<KeyPair>>,
) -> RunningCollationManager<CF, V>
where
Expand Down Expand Up @@ -258,7 +261,7 @@ where
mq_adapter: mq_adapter.clone(),
collator_factory,
validator,
state_tracker: MinRefMcStateTracker::default(),
state_tracker,
active_collation_sessions: FastHashMap::default(),
collation_sessions_to_finish: FastHashMap::default(),
active_collators: FastHashMap::default(),
Expand Down Expand Up @@ -878,8 +881,14 @@ where
"Start processing block candidate",
);

let _histogram =
HistogramGuard::begin("tycho_collator_process_collated_block_candidate_time");
let candidate_chain_time = collation_result.candidate.chain_time;
let candidate_id = collation_result.candidate.block_id;

let labels = &[("workchain", candidate_id.shard.workchain().to_string())];
let _histogram = HistogramGuard::begin_with_labels(
"tycho_collator_process_collated_block_candidate_time",
labels,
);

// find session related to this block by shard
let session_info = self
Expand All @@ -890,8 +899,39 @@ where
))?
.clone();

let candidate_chain_time = collation_result.candidate.chain_time;
let candidate_id = collation_result.candidate.block_id;
// pre-accept shard block to store new shard state
if !candidate_id.shard.is_masterchain() {
let histogram = HistogramGuard::begin_with_labels(
"tycho_collator_process_collated_block_candidate_pre_accept_total_time",
labels,
);

let histogram_build_block_stuff = HistogramGuard::begin_with_labels(
"tycho_collator_process_collated_block_candidate_pre_accept_build_block_stuff_time",
labels,
);
let block_for_pre_accept = build_block_stuff_for_sync(&BlockCandidateEntry {
key: candidate_id.as_short_id(),
candidate: collation_result.candidate.clone(),
signatures: Default::default(),
})?;
let build_block_stuff_elapsed = histogram_build_block_stuff.finish();

let histogram_build_block_stuff = HistogramGuard::begin_with_labels(
"tycho_collator_process_collated_block_candidate_pre_accept_time",
labels,
);
self.state_node_adapter
.pre_accept_block(block_for_pre_accept, &collation_result.new_state_stuff)
.await?;

tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
total_time = %format_duration(histogram.finish()),
build_block_stuff_time = %format_duration(build_block_stuff_elapsed),
pre_accept_time = %format_duration(histogram_build_block_stuff.finish()),
"Block pre-accepted",
);
}

tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
"Saving block candidate to cache...",
Expand Down
42 changes: 42 additions & 0 deletions collator/src/state_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ pub trait StateNodeAdapter: Send + Sync + 'static {
async fn load_block(&self, block_id: &BlockId) -> Result<Option<BlockStuff>>;
/// Return block handle by it's id from node local state
async fn load_block_handle(&self, block_id: &BlockId) -> Result<Option<BlockHandle>>;

/// Just store updated state
async fn pre_accept_block(
&self,
block: BlockStuffForSync,
new_state: &ShardStateStuff,
) -> Result<()>;

/// Accept block:
/// 1. (TODO) Broadcast block to blockchain network
/// 2. Provide block to the block strider
Expand Down Expand Up @@ -125,6 +133,40 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl {
Ok(self.storage.block_handle_storage().load_handle(block_id))
}

async fn pre_accept_block(
&self,
block: BlockStuffForSync,
new_state: &ShardStateStuff,
) -> Result<()> {
tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Pre-accepting block {}", block.block_id.as_short_id());

// Store block and load handle
let block_storage = self.storage.block_storage();
let block_stuff = &block.block_stuff_aug.data;
let archive_data = &block.block_stuff_aug.archive_data;
let info = block_stuff.load_info()?;
let mc_ref_seqno = info.min_ref_mc_seqno + 1;
let store_res = block_storage
.store_block_data(block_stuff, archive_data, BlockMetaData {
is_key_block: info.key_block,
gen_utime: info.gen_utime,
mc_ref_seqno,
})
.await?;
let handle = store_res.handle;

// Save state
let state_storage = self.storage.shard_state_storage();
state_storage
.store_state(&handle, new_state)
.await
.context("Failed to store new state")?;

tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Block pre-accepted {}", block.block_id.as_short_id());

Ok(())
}

async fn accept_block(&self, block: BlockStuffForSync) -> Result<()> {
tracing::debug!(target: tracing_targets::STATE_NODE_ADAPTER, "Block accepted: {}", block.block_id.as_short_id());

Expand Down
9 changes: 4 additions & 5 deletions collator/tests/collation_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,8 @@ async fn test_collation_process_on_stubs() {
let queue = queue_factory.create();
let message_queue_adapter = MessageQueueAdapterStdImpl::new(queue);

let state_tracker = MinRefMcStateTracker::default();

let manager = CollationManager::start(
node_1_keypair.clone(),
config,
Expand All @@ -147,6 +149,7 @@ async fn test_collation_process_on_stubs() {
config: validator_config,
},
CollatorStdImplFactory,
state_tracker.clone(),
#[cfg(feature = "test")]
vec![
node_1_keypair,
Expand All @@ -164,11 +167,7 @@ async fn test_collation_process_on_stubs() {
zerostate_id,
storage.clone(),
))
.with_state_subscriber(
MinRefMcStateTracker::default(),
storage.clone(),
state_node_adapter,
)
.with_state_subscriber(state_tracker.clone(), storage.clone(), state_node_adapter)
.build();

let strider_handle = block_strider.run();
Expand Down
Loading

0 comments on commit e10add2

Please sign in to comment.