Skip to content

Commit

Permalink
fix(collator): internal messages diff
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Jun 12, 2024
1 parent f4b5066 commit e444fb8
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 43 deletions.
122 changes: 90 additions & 32 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::collator::execution_manager::ExecutionManager;
use crate::collator::types::{
AsyncMessage, BlockCollationData, McData, PrevData, ShardDescriptionExt,
};
use crate::internal_queue::types::InternalMessageKey;
use crate::mempool::MempoolAnchorId;
use crate::tracing_targets;
use crate::types::{BlockCollationResult, ShardIdentExt, TopBlockDescription};
Expand Down Expand Up @@ -165,8 +166,8 @@ impl CollatorStdImpl {
loop {
// let mut one_set_timer = std::time::Instant::now();

let mut internal_messages_in_set = vec![];

let mut executed_internal_messages = vec![];
let mut internal_messages_sources = FastHashMap::default();
// build messages set
let mut msgs_set: Vec<AsyncMessage> = vec![];

Expand Down Expand Up @@ -198,10 +199,10 @@ impl CollatorStdImpl {

msgs_set.push(async_message);

internal_messages_in_set.push((
message_with_source.shard_id,
internal_messages_sources.insert(
message_with_source.message.key(),
));
message_with_source.shard_id,
);

remaining_capacity -= 1;
}
Expand All @@ -212,8 +213,8 @@ impl CollatorStdImpl {
}

tracing::debug!(target: tracing_targets::COLLATOR,
"read {} externals and {} internals",
ext_msgs.len(), internal_messages_in_set.len(),
"read {} externals",
ext_msgs.len(),
);

// 3. Join existing internals and externals
Expand Down Expand Up @@ -251,13 +252,21 @@ impl CollatorStdImpl {
MsgInfo::Int(message_with_source.message.info.clone());
let cell = message_with_source.message.cell.clone();

let async_message = AsyncMessage::NewInt(int_msg_info, cell);
let async_message = if int_msg.is_new {
collation_data.read_new_msgs_from_iterator += 1;
AsyncMessage::NewInt(int_msg_info, cell)
} else {
let is_current_shard =
message_with_source.shard_id == self.shard_id;
AsyncMessage::Int(int_msg_info, cell, is_current_shard)
};

msgs_set.push(async_message);
internal_messages_in_set.push((
message_with_source.shard_id,

internal_messages_sources.insert(
message_with_source.message.key(),
));
message_with_source.shard_id,
);
new_internal_messages_in_set += 1;

remaining_capacity -= 1;
Expand Down Expand Up @@ -293,6 +302,9 @@ impl CollatorStdImpl {
do_collate_fill_msgs_set_elapsed += timer.elapsed();
timer = std::time::Instant::now();

let mut messages_inserted_to_iterator = 0;

let mut executed_messages = 0;
// execute msgs processing by groups
while !msgs_set_full_processed {
let one_tick_executed_count = if STUB_SKIP_EXECUTION {
Expand All @@ -319,6 +331,35 @@ impl CollatorStdImpl {
timer = std::time::Instant::now();

for (_account_id, msg_info, transaction) in group {
match msg_info {
AsyncMessage::Int(ref msg_info, ref cell, _)
| AsyncMessage::NewInt(ref msg_info, ref cell) => {
if let MsgInfo::Int(int_msg_info) = msg_info {
let hash = cell.repr_hash();
let key = InternalMessageKey {
lt: int_msg_info.created_lt,
hash: *hash,
};

match internal_messages_sources.get(&key).cloned() {
Some(shard_ident) => {
executed_internal_messages.push((shard_ident, key));
}
None => {
return Err(anyhow!(
"Internal message source \
shard not found for key: {:?}",
key
));
}
}
}
}
_ => {}
}

executed_messages += 1;

let new_internal_messages = new_transaction(
&mut collation_data,
&self.shard_id,
Expand All @@ -329,6 +370,9 @@ impl CollatorStdImpl {
collation_data.new_msgs_created += new_internal_messages.len() as u32;

if !new_internal_messages.is_empty() {
collation_data.inserted_new_msgs_to_iterator +=
new_internal_messages.len() as u32;
messages_inserted_to_iterator += new_internal_messages.len();
self.mq_adapter.add_messages_to_iterator(
&mut internal_messages_iterator,
new_internal_messages,
Expand Down Expand Up @@ -358,25 +402,37 @@ impl CollatorStdImpl {
msgs_set_executed_count, msgs_set_len,
block_transactions_count, msgs_set_offset,
);
if block_transactions_count >= 10000 {
tracing::debug!(target: tracing_targets::COLLATOR,
"STUB: block limit reached: {}/10000",
block_transactions_count,
);
block_limits_reached = true;
break;
}

if msgs_set_offset == msgs_set_len {
msgs_set_full_processed = true;
}
}

// TODO STUB: block limit checking outside
if block_transactions_count >= 10000 {
tracing::debug!(target: tracing_targets::COLLATOR,
"STUB: block limit reached: {}/10000",
block_transactions_count,
);
block_limits_reached = true;
break;
}

tracing::debug!(target: tracing_targets::COLLATOR,
"Inserted message to iterator last set: {}",
messages_inserted_to_iterator,
);

tracing::debug!(target: tracing_targets::COLLATOR,
"Executed messages last set: {}",
executed_messages,
);

// commit messages to iterator only if set was fully processed
if msgs_set_full_processed {
self.mq_adapter.commit_messages_to_iterator(
&mut internal_messages_iterator,
internal_messages_in_set,
executed_internal_messages,
)?;
}

Expand All @@ -387,8 +443,9 @@ impl CollatorStdImpl {
msgs_set_offset,
);

tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.externals = {:?}",
tracing::debug!(target: tracing_targets::COLLATOR, "updated processed_upto.externals = {:?}. offset = {}",
collation_data.processed_upto.externals,
collation_data.processed_upto.processed_offset,
);

has_pending_internals = internal_messages_iterator.peek(true)?.is_some();
Expand Down Expand Up @@ -456,25 +513,27 @@ impl CollatorStdImpl {
has_pending_internals,
};

// log collation_result
tracing::info!(target: tracing_targets::COLLATOR,
"Created block candidate: collated_file_hash={}, block_id={}",
collation_result.candidate.collated_file_hash, collation_result.candidate.block_id
);

self.listener.on_block_candidate(collation_result).await?;

tracing::info!(target: tracing_targets::COLLATOR,
"Created and sent block candidate: start_lt={}, end_lt={}, exec_count={}, \
exec_ext={}, exec_int={}, exec_new_int={}, \
enqueue_count={}, dequeue_count={}, \
new_msgs_created={}, new_msgs_added={}, \
in_msgs={}, out_msgs={}",
new_msgs_added_to_diff={}, \
in_msgs={}, out_msgs={} read_new_msgs_from_iterator={} inserted_new_msgs_to_iterator={} txs={}",
collation_data.start_lt, collation_data.next_lt, collation_data.execute_count_all,
collation_data.execute_count_ext, collation_data.execute_count_int, collation_data.execute_count_new_int,
collation_data.enqueue_count, collation_data.dequeue_count,
collation_data.new_msgs_created, diff.messages.len(),
collation_data.in_msgs.len(), collation_data.out_msgs.len(),
diff.messages.len(),
collation_data.in_msgs.len(),
collation_data.out_msgs.len(),
collation_data.read_new_msgs_from_iterator,
collation_data.inserted_new_msgs_to_iterator, block_transactions_count
);

assert_eq!(
collation_data.enqueue_count,
collation_data.inserted_new_msgs_to_iterator - collation_data.execute_count_new_int
);

self.update_stats(&collation_data);
Expand Down Expand Up @@ -1269,7 +1328,6 @@ fn new_transaction(
.out_msgs
.insert(*in_msg_cell.repr_hash(), out_msg.clone());
collation_data.enqueue_count -= 1;

(in_msg, in_msg_cell)
}
AsyncMessage::Mint(in_msg_cell) | AsyncMessage::Recover(in_msg_cell) => {
Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/execution_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ fn execute_ticktock_message(
}

/// calculate group
pub fn calculate_group(
pub fn _calculate_group(
messages_set: &[AsyncMessage],
group_limit: u32,
offset: u32,
Expand Down
3 changes: 3 additions & 0 deletions collator/src/collator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ pub(super) struct BlockCollationData {
pub execute_count_new_int: u32,
pub enqueue_count: u32,
pub dequeue_count: u32,
pub inserted_new_msgs_to_iterator: u32,
pub read_new_msgs_from_iterator: u32,

pub tx_count: u32,

pub new_msgs_created: u32,
Expand Down
44 changes: 36 additions & 8 deletions collator/src/internal_queue/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl QueueIterator for QueueIteratorImpl {
if let Some(next_message) = self.messages_for_current_shard.pop() {
let message_key = next_message.0.message.key();

if self.new_messages.remove(&message_key).is_some() {
if self.new_messages.contains_key(&message_key) {
return Ok(Some(IterItem {
message_with_source: next_message.0.clone(),
is_new: true,
Expand Down Expand Up @@ -132,19 +132,50 @@ impl QueueIterator for QueueIteratorImpl {
}

fn take_diff(&mut self) -> QueueDiff {
tracing::debug!(
tracing::trace!(
target: crate::tracing_targets::MQ,
"Taking diff from iterator. New messages count: {}",
self.new_messages.len());

let mut diff = QueueDiff::default();

for (shard_id, lt) in self.commited_current_position.iter() {
diff.processed_upto.insert(*shard_id, lt.clone());
}

let current_shard_processed_upto = self
.commited_current_position
.get(&self.for_shard)
.cloned()
.unwrap_or_default();

let amount_before = self.new_messages.len();

let mut inserted_new_messages = 0;

tracing::debug!(target: crate::tracing_targets::COLLATOR, "Current shard processed upto: {:?}",current_shard_processed_upto);

for message in self.new_messages.values() {
diff.messages.push(message.clone());
let (dest_workchain, dest_account) = message.destination().unwrap();
if self.for_shard.contains_account(&dest_account)
&& self.for_shard.workchain() == dest_workchain as i32
{
if message.key() > current_shard_processed_upto {
diff.messages.push(message.clone());
inserted_new_messages += 1;
}
} else {
diff.messages.push(message.clone());
inserted_new_messages += 1;
}
}

tracing::trace!(
target: crate::tracing_targets::MQ,
"Inserted {} messages out of {} to diff",
inserted_new_messages,
amount_before);

self.current_position
.clone_from(&self.commited_current_position);
self.commited_current_position.clear();
Expand All @@ -161,9 +192,9 @@ impl QueueIterator for QueueIteratorImpl {

for message in messages {
// insert only if key greater then current
if let Some(current_key) = self.commited_current_position.get(&message.0) {
if let Some(current_key) = self.commited_current_position.get_mut(&message.0) {
if message.1 > *current_key {
self.commited_current_position.insert(message.0, message.1);
current_key.clone_from(&message.1);
}
} else {
self.commited_current_position.insert(message.0, message.1);
Expand All @@ -174,14 +205,11 @@ impl QueueIterator for QueueIteratorImpl {

fn add_message(&mut self, message: Arc<EnqueuedMessage>) -> Result<()> {
self.new_messages.insert(message.key(), message.clone());

let (dest_workchain, dest_account) = message.destination()?;

if self.for_shard.contains_account(&dest_account)
&& self.for_shard.workchain() == dest_workchain as i32
{
let message_with_source = MessageWithSource::new(self.for_shard, message.clone());

self.messages_for_current_shard
.push(Reverse(Arc::new(message_with_source)));
};
Expand Down
2 changes: 1 addition & 1 deletion collator/src/internal_queue/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl Ord for EnqueuedMessage {
}
}

#[derive(Debug, Ord, Eq, PartialEq, PartialOrd, Hash, Clone)]
#[derive(Default, Debug, Ord, Eq, PartialEq, PartialOrd, Hash, Clone)]
pub struct InternalMessageKey {
pub lt: Lt,
pub hash: HashBytes,
Expand Down
2 changes: 1 addition & 1 deletion collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ impl MempoolAdapterStdImpl {
guard.insert(anchor.id(), anchor);
}

self.anchor_added.notify_waiters()
self.anchor_added.notify_waiters();
}
}

Expand Down

0 comments on commit e444fb8

Please sign in to comment.