diff --git a/Cargo.lock b/Cargo.lock index 98e32c967..df5dcc9e2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3250,7 +3250,6 @@ dependencies = [ "everscale-crypto", "everscale-types", "futures-util", - "indexmap", "metrics", "parking_lot", "rand", diff --git a/collator/Cargo.toml b/collator/Cargo.toml index 07bde525a..e873e2e0a 100644 --- a/collator/Cargo.toml +++ b/collator/Cargo.toml @@ -17,7 +17,6 @@ base64 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } futures-util = { workspace = true } -indexmap = { workspace = true } metrics = { workspace = true } parking_lot = { workspace = true } rand = { workspace = true } diff --git a/collator/src/mempool/external_message_cache.rs b/collator/src/mempool/external_message_cache.rs index f043591dc..ca030d234 100644 --- a/collator/src/mempool/external_message_cache.rs +++ b/collator/src/mempool/external_message_cache.rs @@ -1,50 +1,48 @@ +use std::collections::BTreeMap; use std::sync::Arc; use everscale_types::cell::HashBytes; -use indexmap::map::Entry; -use indexmap::IndexMap; use crate::mempool::types::ExternalMessage; pub struct ExternalMessageCache { - external_message_cache: IndexMap, + external_message_cache: BTreeMap, round_window_size: u16, } impl ExternalMessageCache { pub fn new(round_window_size: u16) -> Self { Self { - external_message_cache: IndexMap::new(), + external_message_cache: BTreeMap::new(), round_window_size, } } pub fn add_dedup_messages( &mut self, round_id: u32, - mut messages: Vec>, - ) -> Vec> { - messages.retain( - |message| match self.external_message_cache.entry(*message.hash()) { - Entry::Vacant(e) => { - e.insert(round_id); - true + messages: &mut Vec>, + messages_info_len: &mut Vec, // msg size with same pos as in messages vec + removed_messages_info_len: &mut Vec, + ) { + let mut cur_pos: usize = 0; + messages.retain(|message| { + let is_new = self + .external_message_cache + .insert(*message.hash(), round_id) + .is_none(); + if tracing::enabled!(tracing::Level::INFO) { + if is_new { + cur_pos += 1; + } else { + removed_messages_info_len.push(messages_info_len.remove(cur_pos)); } - Entry::Occupied(_) => false, - }, - ); - - let mut split_index: usize = 0; - - for (index, (_, value)) in self.external_message_cache.iter().enumerate() { - split_index = index; - if *value > round_id.saturating_sub(self.round_window_size as u32) { - break; } - } - - self.external_message_cache = self.external_message_cache.split_off(split_index); + is_new + }); - messages + let bottom_round = round_id.saturating_sub(self.round_window_size as u32); + self.external_message_cache + .retain(|_, last_round| *last_round >= bottom_round); } } @@ -59,12 +57,14 @@ mod tests { use crate::mempool::types::ExternalMessage; #[test] - pub fn cache_test() { + pub fn dedup_external_msgs() { + tycho_util::test::init_logger("dedup_external_msgs", "info"); let mut cache = ExternalMessageCache::new(100); - let executed_rounds = 1000; - for i in 0..executed_rounds { + for round_id in 0..1000 { let mut messages = Vec::new(); - if i < 100 { + let mut messages_info_len = Vec::new(); + let mut removed_messages_info_len = Vec::new(); + if round_id < 100 { let message = Arc::new(ExternalMessage::new( everscale_types::cell::Cell::empty_cell(), ExtInMsgInfo::default(), @@ -74,7 +74,9 @@ mod tests { ExtInMsgInfo::default(), )); messages.push(message); + messages_info_len.push(1); // orig 1 messages.push(message2); + messages_info_len.push(2); // dup of 1 } let mut builder = everscale_types::cell::CellBuilder::new(); @@ -85,16 +87,29 @@ mod tests { ExtInMsgInfo::default(), )); messages.push(message); + messages_info_len.push(3); // orig 3 let message2 = Arc::new(ExternalMessage::new( everscale_types::cell::Cell::empty_cell(), ExtInMsgInfo::default(), )); messages.push(message2); + messages_info_len.push(4); // dup of 1 if !messages.is_empty() { - let _ = cache.add_dedup_messages(i, messages); + let _ = cache.add_dedup_messages( + round_id as u32, + &mut messages, + &mut messages_info_len, + &mut removed_messages_info_len, + ); } + tracing::info!( + iter = round_id, + added = debug(messages_info_len), + removed = debug(removed_messages_info_len), + "dedup" + ); } assert_eq!(cache.external_message_cache.len(), 2); diff --git a/collator/src/mempool/mempool_adapter.rs b/collator/src/mempool/mempool_adapter.rs index 1107dad0b..113f8b1e5 100644 --- a/collator/src/mempool/mempool_adapter.rs +++ b/collator/src/mempool/mempool_adapter.rs @@ -152,6 +152,9 @@ pub async fn handle_anchors( let mut repr_hashes = FastHashSet::default(); let mut messages = Vec::new(); + let mut messages_info_len = Vec::new(); + let mut removed_messages_info_len = Vec::new(); + for point in points { 'message: for message in &point.body.payload { let cell = match Boc::decode(message) { @@ -184,16 +187,35 @@ pub async fn handle_anchors( if repr_hashes.insert(*cell.repr_hash()) { messages.push(Arc::new(ExternalMessage::new(cell.clone(), ext_in_message))); + if tracing::enabled!(tracing::Level::INFO) { + messages_info_len.push(message.len()); + } + } else if tracing::enabled!(tracing::Level::INFO) { + removed_messages_info_len.push(message.len()); } } } - let added_messages = cache.add_dedup_messages(anchor.body.location.round.0, messages); + cache.add_dedup_messages( + anchor.body.location.round.0, + &mut messages, + &mut messages_info_len, + &mut removed_messages_info_len, + ); + + tracing::info!( + target: tracing_targets::MEMPOOL_ADAPTER, + round = anchor.body.location.round.0, + time = anchor.body.time.as_u64(), + msgs_bytes_len = debug(messages_info_len), + removed_msgs_bytes_len = debug(removed_messages_info_len), + "got anchor" + ); let anchor = Arc::new(MempoolAnchor::new( anchor.body.location.round.0, anchor.body.time.as_u64(), - added_messages, + messages, )); adapter.add_anchor(anchor); diff --git a/consensus/src/engine/engine.rs b/consensus/src/engine/engine.rs index cbfcf5dc7..e06da09be 100644 --- a/consensus/src/engine/engine.rs +++ b/consensus/src/engine/engine.rs @@ -270,9 +270,13 @@ impl Engine { { tracing::info!( parent: round_effects.span(), + digest = display(own_point.digest.alt()), payload_ki_bytes = own_point .body.payload.iter().map(|bytes| bytes.len()).sum::() / 1024, - digest = display(own_point.digest.alt()), + payload_msg_count = own_point.body.payload.iter().count(), + payload_len_bytes = debug( + own_point.body.payload.iter().map(|bytes| bytes.len()).collect::>() + ), "produced point" ); let state = current_dag_round.insert_exact_sign(