Skip to content

Commit

Permalink
fix(collator): disable commit to internal queue
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored and Rexagon committed Jun 8, 2024
1 parent e9a8016 commit 748057d
Show file tree
Hide file tree
Showing 12 changed files with 94 additions and 117 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ base64 = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
indexmap = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
Expand Down
13 changes: 13 additions & 0 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ impl CollatorStdImpl {
.finalize_block(&mut collation_data, exec_manager)
.await?;

tracing::info!(target: tracing_targets::COLLATOR,
"created block candidate: start_lt={}, end_lt={}, txs={}, new_msgs={}, in_msgs={}, out_msgs={}",
collation_data.start_lt, collation_data.next_lt, block_transactions_count,
diff.messages.len(), collation_data.in_msgs.len(), collation_data.out_msgs.len(),
);

self.mq_adapter
.apply_diff(diff.clone(), candidate.block_id.as_short_id())
.await?;
Expand All @@ -439,6 +445,13 @@ impl CollatorStdImpl {
new_state_stuff: new_state_stuff.clone(),
has_pending_internals,
};

// log collation_result
tracing::info!(target: tracing_targets::COLLATOR,
"Created block candidate: collated_file_hash={}, block_id={}",
collation_result.candidate.collated_file_hash, collation_result.candidate.block_id
);

self.listener.on_block_candidate(collation_result).await?;

tracing::info!(target: tracing_targets::COLLATOR,
Expand Down
13 changes: 0 additions & 13 deletions collator/src/internal_queue/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,9 @@ impl QueueIterator for QueueIteratorImpl {

if with_new {
if let Some(next_message) = self.messages_for_current_shard.pop() {
tracing::trace!(
target: crate::tracing_targets::MQ,
"Popping message from current shard messages: {:?}",
next_message.0);
let message_key = next_message.0.message.key();

if self.new_messages.remove(&message_key).is_some() {
tracing::trace!(
target: crate::tracing_targets::MQ,
"Message deleted new messages and in current shard messages: {:?}",
message_key);
return Ok(Some(IterItem {
message_with_source: next_message.0.clone(),
is_new: true,
Expand Down Expand Up @@ -190,11 +182,6 @@ impl QueueIterator for QueueIteratorImpl {
{
let message_with_source = MessageWithSource::new(self.for_shard, message.clone());

tracing::trace!(
target: crate::tracing_targets::MQ,
"Adding messages directly because it's for current shard: {:?}",
message_with_source);

self.messages_for_current_shard
.push(Reverse(Arc::new(message_with_source)));
};
Expand Down

This file was deleted.

11 changes: 6 additions & 5 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@ pub struct QueueConfig {
}

pub trait QueueFactory {
type Queue: LocalQueue;
type Queue: Queue;

fn create(&self) -> Self::Queue;
}

impl<F, R> QueueFactory for F
where
F: Fn() -> R,
R: LocalQueue,
R: Queue,
{
type Queue = R;

Expand Down Expand Up @@ -109,10 +109,11 @@ where
session_state_lock.iterator(ranges, for_shard_id).await
};

let persistent_state_lock = self.persistent_state.read().await;
let persistent_iter = persistent_state_lock.iterator(for_shard_id);
// let persistent_state_lock = self.persistent_state.read().await;
// let persistent_iter = persistent_state_lock.iterator(for_shard_id);

vec![session_iter, persistent_iter]
// vec![session_iter, persistent_iter]
vec![session_iter]
}

async fn split_shard(&self, shard_id: &ShardIdent) -> Result<(), QueueError> {
Expand Down
3 changes: 3 additions & 0 deletions collator/src/internal_queue/shard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::sync::Arc;
use everscale_types::models::BlockIdShort;

use crate::internal_queue::types::{EnqueuedMessage, InternalMessageKey, QueueDiff};
use crate::tracing_targets;

#[derive(Clone, Default)]
pub struct Shard {
Expand All @@ -27,6 +28,8 @@ impl Shard {
self.outgoing_messages.remove(&message.key());
}
return Some(diff);
} else {
tracing::warn!(target: tracing_targets::MQ, "Diff not found: {:?}", diff_id);
}
None
}
Expand Down
55 changes: 37 additions & 18 deletions collator/src/internal_queue/state/session/session_state_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ use std::collections::{HashMap, VecDeque};
use std::sync::Arc;

use anyhow::Result;
use everscale_types::cell::HashBytes;
use everscale_types::models::ShardIdent;

use crate::internal_queue::shard::Shard;
use crate::internal_queue::state::state_iterator::{MessageWithSource, ShardRange, StateIterator};
use crate::internal_queue::types::InternalMessageKey;
use crate::tracing_targets;

pub struct SessionStateIterator {
message_queue: VecDeque<Arc<MessageWithSource>>,
Expand All @@ -19,7 +22,6 @@ impl SessionStateIterator {
) -> Self {
let queue = Self::prepare_queue(flat_shards, shard_ranges, shard_id);
Self {
// flat_shards,
message_queue: queue,
}
}
Expand All @@ -30,32 +32,49 @@ impl SessionStateIterator {
shard_id: &ShardIdent,
) -> VecDeque<Arc<MessageWithSource>> {
let mut message_queue = VecDeque::new();
if message_queue.is_empty() {
for (shard_ident, shard) in &flat_shards {
for (message_key, message) in &shard.outgoing_messages {

for (shard_ident, shard) in flat_shards.iter() {
if let Some(shard_range) = shard_ranges.get(shard_ident) {
let from_lt = match shard_range.from_lt {
None => 0,
Some(from_lt) => from_lt + 1,
};
let range_start = InternalMessageKey {
lt: from_lt,
hash: HashBytes::ZERO,
};
let range_end = InternalMessageKey {
lt: shard_range.to_lt.unwrap_or(u64::MAX),
hash: HashBytes([255; 32]),
};

let shard_size = shard.outgoing_messages.len();

tracing::trace!(
target: tracing_targets::MQ,
"Shard {} has {} messages",
shard_ident,
shard_size
);

// Perform a range query on the BTreeMap
let range = shard.outgoing_messages.range(range_start..=range_end);

for (_, message) in range {
if let Ok((workchain, account_hash)) = message.destination() {
if shard_id.contains_account(&account_hash)
&& shard_id.workchain() == workchain as i32
{
if let Some(shard_range) = shard_ranges.get(shard_ident) {
if (shard_range.from_lt.is_none()
|| message_key.lt
> shard_range.from_lt.unwrap_or(message_key.lt))
&& (shard_range.to_lt.is_none()
|| message_key.lt
<= shard_range.to_lt.unwrap_or(message_key.lt))
{
message_queue.push_back(Arc::new(MessageWithSource::new(
*shard_ident,
message.clone(),
)));
}
}
message_queue.push_back(Arc::new(MessageWithSource::new(
*shard_ident,
message.clone(),
)));
}
}
}
}
}

message_queue
}
}
Expand Down
17 changes: 9 additions & 8 deletions collator/src/queue_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,23 +95,24 @@ impl MessageQueueAdapter for MessageQueueAdapterStdImpl {
shards_from: FastHashMap<ShardIdent, u64>,
shards_to: FastHashMap<ShardIdent, u64>,
) -> Result<Box<dyn QueueIterator>> {
tracing::debug!(
target: tracing_targets::MQ_ADAPTER,
"Creating iterator"
);

let time_start = std::time::Instant::now();
let ranges = QueueIteratorExt::collect_ranges(shards_from, shards_to);

let states_iterators = self.queue.iterator(&ranges, for_shard_id).await;

let states_iterators_manager = StatesIteratorsManager::new(states_iterators);

let iterator = QueueIteratorImpl::new(states_iterators_manager, for_shard_id)?;
tracing::trace!(
target: tracing_targets::MQ_ADAPTER,
elapsed = %humantime::format_duration(time_start.elapsed()),
"Iterator created"
);
Ok(Box::new(iterator))
}

async fn apply_diff(&self, diff: Arc<QueueDiff>, block_id_short: BlockIdShort) -> Result<()> {
tracing::info!(
tracing::trace!(
target: tracing_targets::MQ_ADAPTER,
id = ?block_id_short,
new_messages_len = diff.messages.len(),
Expand All @@ -123,7 +124,7 @@ impl MessageQueueAdapter for MessageQueueAdapterStdImpl {

#[instrument(skip(self), fields(%diff_id))]
async fn commit_diff(&self, diff_id: &BlockIdShort) -> Result<Option<Arc<QueueDiff>>> {
tracing::info!(
tracing::trace!(
target: tracing_targets::MQ_ADAPTER,
"Committing diff to the queue"
);
Expand Down Expand Up @@ -159,7 +160,7 @@ impl MessageQueueAdapter for MessageQueueAdapterStdImpl {
iterator: &mut Box<dyn QueueIterator>,
messages: Vec<(ShardIdent, InternalMessageKey)>,
) -> Result<()> {
tracing::debug!(
tracing::trace!(
target: tracing_targets::MQ_ADAPTER,
messages_len = messages.len(),
"Committing messages to iterator"
Expand Down
1 change: 1 addition & 0 deletions collator/src/validator/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ impl SessionInfo {
root_hash=?root_hash,
file_hash=?file_hash,
"Invalid signature");
panic!("Invalid signature");
}
}

Expand Down
2 changes: 1 addition & 1 deletion collator/src/validator/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,13 +371,13 @@ pub async fn process_new_signatures(

Ok((validation_status, NotificationStatus::NotNotified))
} else {
tracing::debug!(target: tracing_targets::VALIDATOR, "Caching signatures for block");
if block_id_short.seqno > 0 {
let previous_block =
BlockIdShort::from((block_id_short.shard, block_id_short.seqno - 1));
let previous_block = session.get_block(&previous_block);

if previous_block.is_some() {
tracing::trace!(target: tracing_targets::VALIDATOR, "Caching signatures for block");
session.cache_signatures(&block_id_short, signatures);
}
}
Expand Down
44 changes: 22 additions & 22 deletions collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,28 +144,28 @@ async fn intershard_message_delivery_test() -> anyhow::Result<()> {
adapter.commit_diff(&block_id).await?;

// peek message from persistent state
let mut iterator = adapter
.create_iterator(shard_id_2, from_ranges.clone(), to_ranges.clone())
.await
.unwrap();

let peek_message = iterator.peek(false)?;
assert!(peek_message.is_some());

let next_message = iterator.next(false)?;

assert!(next_message.is_some());
// println!("{:?}", peek_message.unwrap().message_with_source.message.hash);
let next_message = next_message.unwrap();
println!("{:?}", next_message.message_with_source.message.info.dst);
assert_eq!(
next_message.message_with_source.message.info.dst,
int_message.dst
);
assert_eq!(next_message.message_with_source.shard_id, shard_id_1);

let next_message = iterator.next(false)?;
assert!(next_message.is_none());
// let mut iterator = adapter
// .create_iterator(shard_id_2, from_ranges.clone(), to_ranges.clone())
// .await
// .unwrap();
//
// let peek_message = iterator.peek(false)?;
// assert!(peek_message.is_some());
//
// let next_message = iterator.next(false)?;
//
// assert!(next_message.is_some());
// // println!("{:?}", peek_message.unwrap().message_with_source.message.hash);
// let next_message = next_message.unwrap();
// println!("{:?}", next_message.message_with_source.message.info.dst);
// assert_eq!(
// next_message.message_with_source.message.info.dst,
// int_message.dst
// );
// assert_eq!(next_message.message_with_source.shard_id, shard_id_1);
//
// let next_message = iterator.next(false)?;
// assert!(next_message.is_none());

Ok(())
}

0 comments on commit 748057d

Please sign in to comment.