Skip to content

Commit

Permalink
feat(collator): use both last collated and last processed mc block id…
Browse files Browse the repository at this point in the history
… to skip queue diff apply on sync
  • Loading branch information
SmaGMan committed Feb 12, 2025
1 parent 7f49e74 commit 164d06b
Showing 1 changed file with 62 additions and 37 deletions.
99 changes: 62 additions & 37 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -380,10 +380,6 @@ where
}
}

fn get_last_processed_mc_block_id(&self) -> Option<BlockId> {
*self.last_processed_mc_block_id.lock()
}

/// (TODO) Check sync status between mempool and blockchain state
/// and pause collation when we are far behind other nodesб
/// jusct sync blcoks from blockchain
Expand Down Expand Up @@ -592,14 +588,14 @@ where
);

// get info about applied mc blocks in cache
let (last_collated_block_id, applied_range) = self
let (last_collated_mc_block_id, applied_range) = self
.blocks_cache
.get_last_collated_block_and_applied_mc_queue_range();

// run sync if have applied mc blocks
self.sync_to_applied_mc_block_if_exist(
last_collated_block_id.as_ref(),
applied_range.as_ref(),
last_collated_mc_block_id,
applied_range,
)
.await?;
}
Expand Down Expand Up @@ -961,15 +957,15 @@ where

// run sync or process just collated block
if should_sync_to_last_applied_mc_block {
// INFO: last collated mc block subgraph is already committed here

let last_collated_block_id = store_res.last_collated_mc_block_id.as_ref();
let applied_range = store_res.applied_mc_queue_range.as_ref();
// NOTE: last collated mc block subgraph is already committed here

if block_id.is_masterchain() {
// run sync if have applied mc blocks
self.sync_to_applied_mc_block_if_exist(last_collated_block_id, applied_range)
.await?;
self.sync_to_applied_mc_block_if_exist(
store_res.last_collated_mc_block_id,
store_res.applied_mc_queue_range,
)
.await?;
} else {
// cancel further collation of blocks in the current shard because we need to sync
tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
Expand All @@ -990,8 +986,11 @@ where
);

// run sync if have applied mc blocks
self.sync_to_applied_mc_block_if_exist(last_collated_block_id, applied_range)
.await?;
self.sync_to_applied_mc_block_if_exist(
store_res.last_collated_mc_block_id,
store_res.applied_mc_queue_range,
)
.await?;
}
}
} else if block_id.is_masterchain() {
Expand Down Expand Up @@ -1280,8 +1279,8 @@ where
if should_sync_to_last_applied_mc_block {
// run sync if have applied mc blocks
self.sync_to_applied_mc_block_if_exist(
store_res.last_collated_mc_block_id.as_ref(),
store_res.applied_mc_queue_range.as_ref(),
store_res.last_collated_mc_block_id,
store_res.applied_mc_queue_range,
)
.await?;
} else {
Expand Down Expand Up @@ -1313,20 +1312,19 @@ where

async fn sync_to_applied_mc_block_if_exist(
&self,
last_collated_block_id: Option<&BlockId>,
applied_range: Option<&(BlockSeqno, BlockSeqno)>,
last_collated_mc_block_id: Option<BlockId>,
applied_range: Option<(BlockSeqno, BlockSeqno)>,
) -> Result<()> {
if let Some(applied_range) = applied_range {
if !self
.sync_to_applied_mc_block(applied_range, last_collated_block_id)
.sync_to_applied_mc_block(applied_range, last_collated_mc_block_id)
.await?
{
let last_applied_mc_block_id_short = BlockIdShort {
shard: ShardIdent::MASTERCHAIN,
seqno: applied_range.1,
};
tracing::info!(target: tracing_targets::COLLATION_MANAGER,
last_applied_mc_block_id = %last_applied_mc_block_id_short,
last_applied_mc_block_id = %BlockIdShort {
shard: ShardIdent::MASTERCHAIN,
seqno: applied_range.1,
},
"sync_to_applied_mc_block: unable to sync to last applied mc block, \
need to receive next blocks from bc",
);
Expand All @@ -1349,8 +1347,8 @@ where
#[tracing::instrument(skip_all, fields(applied_range = ?applied_range))]
async fn sync_to_applied_mc_block(
&self,
applied_range: &(BlockSeqno, BlockSeqno),
last_collated_block_id: Option<&BlockId>,
applied_range: (BlockSeqno, BlockSeqno),
last_collated_mc_block_id: Option<BlockId>,
) -> Result<bool> {
tracing::info!(target: tracing_targets::COLLATION_MANAGER,
"start sync to applied mc block",
Expand Down Expand Up @@ -1442,10 +1440,11 @@ where
}).collect::<Vec<_>>().as_slice(),
);

// collect top collated blocks. Blocks below these blocks should be skipped for inclusion in the diffs list
let top_collated_blocks = if let Some(last_collated_block_id) = last_collated_block_id {
self.collect_top_collated_blocks(last_collated_block_id)
.await?
// collect top blocks queue diffs already applied to
let queue_diffs_applied_to_top_blocks = if let Some(applied_to_mc_block_id) =
self.get_queue_diffs_applied_to_mc_block_id(last_collated_mc_block_id)
{
self.get_top_blocks_seqno(&applied_to_mc_block_id).await?
} else {
FastHashMap::default()
};
Expand All @@ -1463,7 +1462,7 @@ where
continue;
}

if let Some(border) = top_collated_blocks.get(shard_id) {
if let Some(border) = queue_diffs_applied_to_top_blocks.get(shard_id) {
if prev_block_id.seqno <= *border {
continue;
}
Expand Down Expand Up @@ -1577,7 +1576,9 @@ where
.chain(subgraph.shard_blocks.iter())
{
// do not apply diff if block was collated
if let Some(border) = top_collated_blocks.get(&block_entry.block_id.shard) {
if let Some(border) =
queue_diffs_applied_to_top_blocks.get(&block_entry.block_id.shard)
{
if block_entry.block_id.seqno <= *border {
continue;
}
Expand Down Expand Up @@ -1697,6 +1698,29 @@ where
Ok(result)
}

// Returns top master block id upto which all queue diffs applied
fn get_queue_diffs_applied_to_mc_block_id(
&self,
last_collated_mc_block_id: Option<BlockId>,
) -> Option<BlockId> {
let last_processed_mc_block_id = *self.last_processed_mc_block_id.lock();
match (last_processed_mc_block_id, last_collated_mc_block_id) {
(Some(last_processed), Some(last_collated)) => {
if last_processed.seqno > last_collated.seqno {
Some(last_processed)
} else {
Some(last_collated)
}
}
(Some(mc_block_id), _) | (_, Some(mc_block_id)) => Some(mc_block_id),
_ => None,
}
}

fn get_last_processed_mc_block_id(&self) -> Option<BlockId> {
*self.last_processed_mc_block_id.lock()
}

fn check_should_process_and_update_last_processed_mc_block(&self, block_id: &BlockId) -> bool {
let mut guard = self.last_processed_mc_block_id.lock();
let last_processed_mc_block_id_opt = guard.as_ref();
Expand Down Expand Up @@ -1807,7 +1831,7 @@ where
) -> Result<()> {
tracing::debug!(
target: tracing_targets::COLLATION_MANAGER,
"Trying to refresh collation sessions by mc state ({})...",
"Start refresh collation sessions by mc state ({})...",
mc_data.block_id.as_short_id(),
);

Expand Down Expand Up @@ -2711,9 +2735,10 @@ where
}
}
}
/// Collect top collated blocks from all shards by mc block id
/// Mc block included
async fn collect_top_collated_blocks(

/// Collect top blocks seqno from all shards by master block id.
/// Master block seqno included
async fn get_top_blocks_seqno(
&self,
mc_block_id: &BlockId,
) -> Result<FastHashMap<ShardIdent, BlockSeqno>> {
Expand Down

0 comments on commit 164d06b

Please sign in to comment.