From e444fb8bb2e1c8a9f517c3318bd6659a3815c1d9 Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Wed, 12 Jun 2024 15:10:27 +0500 Subject: [PATCH] fix(collator): internal messages diff --- collator/src/collator/do_collate.rs | 122 +++++++++++++++------ collator/src/collator/execution_manager.rs | 2 +- collator/src/collator/types.rs | 3 + collator/src/internal_queue/iterator.rs | 44 ++++++-- collator/src/internal_queue/types.rs | 2 +- collator/src/mempool/mempool_adapter.rs | 2 +- 6 files changed, 132 insertions(+), 43 deletions(-) diff --git a/collator/src/collator/do_collate.rs b/collator/src/collator/do_collate.rs index 531ab0b58..84d8ab206 100644 --- a/collator/src/collator/do_collate.rs +++ b/collator/src/collator/do_collate.rs @@ -17,6 +17,7 @@ use crate::collator::execution_manager::ExecutionManager; use crate::collator::types::{ AsyncMessage, BlockCollationData, McData, PrevData, ShardDescriptionExt, }; +use crate::internal_queue::types::InternalMessageKey; use crate::mempool::MempoolAnchorId; use crate::tracing_targets; use crate::types::{BlockCollationResult, ShardIdentExt, TopBlockDescription}; @@ -165,8 +166,8 @@ impl CollatorStdImpl { loop { // let mut one_set_timer = std::time::Instant::now(); - let mut internal_messages_in_set = vec![]; - + let mut executed_internal_messages = vec![]; + let mut internal_messages_sources = FastHashMap::default(); // build messages set let mut msgs_set: Vec = vec![]; @@ -198,10 +199,10 @@ impl CollatorStdImpl { msgs_set.push(async_message); - internal_messages_in_set.push(( - message_with_source.shard_id, + internal_messages_sources.insert( message_with_source.message.key(), - )); + message_with_source.shard_id, + ); remaining_capacity -= 1; } @@ -212,8 +213,8 @@ impl CollatorStdImpl { } tracing::debug!(target: tracing_targets::COLLATOR, - "read {} externals and {} internals", - ext_msgs.len(), internal_messages_in_set.len(), + "read {} externals", + ext_msgs.len(), ); // 3. Join existing internals and externals @@ -251,13 +252,21 @@ impl CollatorStdImpl { MsgInfo::Int(message_with_source.message.info.clone()); let cell = message_with_source.message.cell.clone(); - let async_message = AsyncMessage::NewInt(int_msg_info, cell); + let async_message = if int_msg.is_new { + collation_data.read_new_msgs_from_iterator += 1; + AsyncMessage::NewInt(int_msg_info, cell) + } else { + let is_current_shard = + message_with_source.shard_id == self.shard_id; + AsyncMessage::Int(int_msg_info, cell, is_current_shard) + }; msgs_set.push(async_message); - internal_messages_in_set.push(( - message_with_source.shard_id, + + internal_messages_sources.insert( message_with_source.message.key(), - )); + message_with_source.shard_id, + ); new_internal_messages_in_set += 1; remaining_capacity -= 1; @@ -293,6 +302,9 @@ impl CollatorStdImpl { do_collate_fill_msgs_set_elapsed += timer.elapsed(); timer = std::time::Instant::now(); + let mut messages_inserted_to_iterator = 0; + + let mut executed_messages = 0; // execute msgs processing by groups while !msgs_set_full_processed { let one_tick_executed_count = if STUB_SKIP_EXECUTION { @@ -319,6 +331,35 @@ impl CollatorStdImpl { timer = std::time::Instant::now(); for (_account_id, msg_info, transaction) in group { + match msg_info { + AsyncMessage::Int(ref msg_info, ref cell, _) + | AsyncMessage::NewInt(ref msg_info, ref cell) => { + if let MsgInfo::Int(int_msg_info) = msg_info { + let hash = cell.repr_hash(); + let key = InternalMessageKey { + lt: int_msg_info.created_lt, + hash: *hash, + }; + + match internal_messages_sources.get(&key).cloned() { + Some(shard_ident) => { + executed_internal_messages.push((shard_ident, key)); + } + None => { + return Err(anyhow!( + "Internal message source \ + shard not found for key: {:?}", + key + )); + } + } + } + } + _ => {} + } + + executed_messages += 1; + let new_internal_messages = new_transaction( &mut collation_data, &self.shard_id, @@ -329,6 +370,9 @@ impl CollatorStdImpl { collation_data.new_msgs_created += new_internal_messages.len() as u32; if !new_internal_messages.is_empty() { + collation_data.inserted_new_msgs_to_iterator += + new_internal_messages.len() as u32; + messages_inserted_to_iterator += new_internal_messages.len(); self.mq_adapter.add_messages_to_iterator( &mut internal_messages_iterator, new_internal_messages, @@ -358,25 +402,37 @@ impl CollatorStdImpl { msgs_set_executed_count, msgs_set_len, block_transactions_count, msgs_set_offset, ); - if block_transactions_count >= 10000 { - tracing::debug!(target: tracing_targets::COLLATOR, - "STUB: block limit reached: {}/10000", - block_transactions_count, - ); - block_limits_reached = true; - break; - } if msgs_set_offset == msgs_set_len { msgs_set_full_processed = true; } } + // TODO STUB: block limit checking outside + if block_transactions_count >= 10000 { + tracing::debug!(target: tracing_targets::COLLATOR, + "STUB: block limit reached: {}/10000", + block_transactions_count, + ); + block_limits_reached = true; + break; + } + + tracing::debug!(target: tracing_targets::COLLATOR, + "Inserted message to iterator last set: {}", + messages_inserted_to_iterator, + ); + + tracing::debug!(target: tracing_targets::COLLATOR, + "Executed messages last set: {}", + executed_messages, + ); + // commit messages to iterator only if set was fully processed if msgs_set_full_processed { self.mq_adapter.commit_messages_to_iterator( &mut internal_messages_iterator, - internal_messages_in_set, + executed_internal_messages, )?; } @@ -387,8 +443,9 @@ impl CollatorStdImpl { msgs_set_offset, ); - tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.externals = {:?}", + tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.externals = {:?}. offset = {}", collation_data.processed_upto.externals, + collation_data.processed_upto.processed_offset, ); has_pending_internals = internal_messages_iterator.peek(true)?.is_some(); @@ -456,25 +513,27 @@ impl CollatorStdImpl { has_pending_internals, }; - // log collation_result - tracing::info!(target: tracing_targets::COLLATOR, - "Created block candidate: collated_file_hash={}, block_id={}", - collation_result.candidate.collated_file_hash, collation_result.candidate.block_id - ); - self.listener.on_block_candidate(collation_result).await?; tracing::info!(target: tracing_targets::COLLATOR, "Created and sent block candidate: start_lt={}, end_lt={}, exec_count={}, \ exec_ext={}, exec_int={}, exec_new_int={}, \ enqueue_count={}, dequeue_count={}, \ - new_msgs_created={}, new_msgs_added={}, \ - in_msgs={}, out_msgs={}", + new_msgs_added_to_diff={}, \ + in_msgs={}, out_msgs={} read_new_msgs_from_iterator={} inserted_new_msgs_to_iterator={} txs={}", collation_data.start_lt, collation_data.next_lt, collation_data.execute_count_all, collation_data.execute_count_ext, collation_data.execute_count_int, collation_data.execute_count_new_int, collation_data.enqueue_count, collation_data.dequeue_count, - collation_data.new_msgs_created, diff.messages.len(), - collation_data.in_msgs.len(), collation_data.out_msgs.len(), + diff.messages.len(), + collation_data.in_msgs.len(), + collation_data.out_msgs.len(), + collation_data.read_new_msgs_from_iterator, + collation_data.inserted_new_msgs_to_iterator, block_transactions_count + ); + + assert_eq!( + collation_data.enqueue_count, + collation_data.inserted_new_msgs_to_iterator - collation_data.execute_count_new_int ); self.update_stats(&collation_data); @@ -1269,7 +1328,6 @@ fn new_transaction( .out_msgs .insert(*in_msg_cell.repr_hash(), out_msg.clone()); collation_data.enqueue_count -= 1; - (in_msg, in_msg_cell) } AsyncMessage::Mint(in_msg_cell) | AsyncMessage::Recover(in_msg_cell) => { diff --git a/collator/src/collator/execution_manager.rs b/collator/src/collator/execution_manager.rs index d51763b63..87072bb03 100644 --- a/collator/src/collator/execution_manager.rs +++ b/collator/src/collator/execution_manager.rs @@ -395,7 +395,7 @@ fn execute_ticktock_message( } /// calculate group -pub fn calculate_group( +pub fn _calculate_group( messages_set: &[AsyncMessage], group_limit: u32, offset: u32, diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index 387067483..1bb24d7fb 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -316,6 +316,9 @@ pub(super) struct BlockCollationData { pub execute_count_new_int: u32, pub enqueue_count: u32, pub dequeue_count: u32, + pub inserted_new_msgs_to_iterator: u32, + pub read_new_msgs_from_iterator: u32, + pub tx_count: u32, pub new_msgs_created: u32, diff --git a/collator/src/internal_queue/iterator.rs b/collator/src/internal_queue/iterator.rs index 81e5341c3..76494ec68 100644 --- a/collator/src/internal_queue/iterator.rs +++ b/collator/src/internal_queue/iterator.rs @@ -86,7 +86,7 @@ impl QueueIterator for QueueIteratorImpl { if let Some(next_message) = self.messages_for_current_shard.pop() { let message_key = next_message.0.message.key(); - if self.new_messages.remove(&message_key).is_some() { + if self.new_messages.contains_key(&message_key) { return Ok(Some(IterItem { message_with_source: next_message.0.clone(), is_new: true, @@ -132,19 +132,50 @@ impl QueueIterator for QueueIteratorImpl { } fn take_diff(&mut self) -> QueueDiff { - tracing::debug!( + tracing::trace!( target: crate::tracing_targets::MQ, "Taking diff from iterator. New messages count: {}", self.new_messages.len()); let mut diff = QueueDiff::default(); + for (shard_id, lt) in self.commited_current_position.iter() { diff.processed_upto.insert(*shard_id, lt.clone()); } + + let current_shard_processed_upto = self + .commited_current_position + .get(&self.for_shard) + .cloned() + .unwrap_or_default(); + + let amount_before = self.new_messages.len(); + + let mut inserted_new_messages = 0; + + tracing::debug!(target: crate::tracing_targets::COLLATOR, "Current shard processed upto: {:?}",current_shard_processed_upto); + for message in self.new_messages.values() { - diff.messages.push(message.clone()); + let (dest_workchain, dest_account) = message.destination().unwrap(); + if self.for_shard.contains_account(&dest_account) + && self.for_shard.workchain() == dest_workchain as i32 + { + if message.key() > current_shard_processed_upto { + diff.messages.push(message.clone()); + inserted_new_messages += 1; + } + } else { + diff.messages.push(message.clone()); + inserted_new_messages += 1; + } } + tracing::trace!( + target: crate::tracing_targets::MQ, + "Inserted {} messages out of {} to diff", + inserted_new_messages, + amount_before); + self.current_position .clone_from(&self.commited_current_position); self.commited_current_position.clear(); @@ -161,9 +192,9 @@ impl QueueIterator for QueueIteratorImpl { for message in messages { // insert only if key greater then current - if let Some(current_key) = self.commited_current_position.get(&message.0) { + if let Some(current_key) = self.commited_current_position.get_mut(&message.0) { if message.1 > *current_key { - self.commited_current_position.insert(message.0, message.1); + current_key.clone_from(&message.1); } } else { self.commited_current_position.insert(message.0, message.1); @@ -174,14 +205,11 @@ impl QueueIterator for QueueIteratorImpl { fn add_message(&mut self, message: Arc) -> Result<()> { self.new_messages.insert(message.key(), message.clone()); - let (dest_workchain, dest_account) = message.destination()?; - if self.for_shard.contains_account(&dest_account) && self.for_shard.workchain() == dest_workchain as i32 { let message_with_source = MessageWithSource::new(self.for_shard, message.clone()); - self.messages_for_current_shard .push(Reverse(Arc::new(message_with_source))); }; diff --git a/collator/src/internal_queue/types.rs b/collator/src/internal_queue/types.rs index 45ac339e0..7d084bcd1 100644 --- a/collator/src/internal_queue/types.rs +++ b/collator/src/internal_queue/types.rs @@ -59,7 +59,7 @@ impl Ord for EnqueuedMessage { } } -#[derive(Debug, Ord, Eq, PartialEq, PartialOrd, Hash, Clone)] +#[derive(Default, Debug, Ord, Eq, PartialEq, PartialOrd, Hash, Clone)] pub struct InternalMessageKey { pub lt: Lt, pub hash: HashBytes, diff --git a/collator/src/mempool/mempool_adapter.rs b/collator/src/mempool/mempool_adapter.rs index ae6f67b0e..cfced4a0c 100644 --- a/collator/src/mempool/mempool_adapter.rs +++ b/collator/src/mempool/mempool_adapter.rs @@ -136,7 +136,7 @@ impl MempoolAdapterStdImpl { guard.insert(anchor.id(), anchor); } - self.anchor_added.notify_waiters() + self.anchor_added.notify_waiters(); } }