Skip to content

Commit

Permalink
fix(consensus): dedup external msgs by last round
Browse files Browse the repository at this point in the history
allows to replenish cache on sync
  • Loading branch information
Mododo committed Jun 7, 2024
1 parent 3214c5c commit 6387d12
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 35 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ base64 = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
futures-util = { workspace = true }
indexmap = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
rand = { workspace = true }
Expand Down
75 changes: 45 additions & 30 deletions collator/src/mempool/external_message_cache.rs
Original file line number Diff line number Diff line change
@@ -1,50 +1,48 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use everscale_types::cell::HashBytes;
use indexmap::map::Entry;
use indexmap::IndexMap;

use crate::mempool::types::ExternalMessage;

pub struct ExternalMessageCache {
external_message_cache: IndexMap<HashBytes, u32>,
external_message_cache: BTreeMap<HashBytes, u32>,
round_window_size: u16,
}

impl ExternalMessageCache {
pub fn new(round_window_size: u16) -> Self {
Self {
external_message_cache: IndexMap::new(),
external_message_cache: BTreeMap::new(),
round_window_size,
}
}
pub fn add_dedup_messages(
&mut self,
round_id: u32,
mut messages: Vec<Arc<ExternalMessage>>,
) -> Vec<Arc<ExternalMessage>> {
messages.retain(
|message| match self.external_message_cache.entry(*message.hash()) {
Entry::Vacant(e) => {
e.insert(round_id);
true
messages: &mut Vec<Arc<ExternalMessage>>,
messages_info_len: &mut Vec<usize>, // msg size with same pos as in messages vec
removed_messages_info_len: &mut Vec<usize>,
) {
let mut cur_pos: usize = 0;
messages.retain(|message| {
let is_new = self
.external_message_cache
.insert(*message.hash(), round_id)
.is_none();
if tracing::enabled!(tracing::Level::INFO) {
if is_new {
cur_pos += 1;
} else {
removed_messages_info_len.push(messages_info_len.remove(cur_pos));
}
Entry::Occupied(_) => false,
},
);

let mut split_index: usize = 0;

for (index, (_, value)) in self.external_message_cache.iter().enumerate() {
split_index = index;
if *value > round_id.saturating_sub(self.round_window_size as u32) {
break;
}
}

self.external_message_cache = self.external_message_cache.split_off(split_index);
is_new
});

messages
let bottom_round = round_id.saturating_sub(self.round_window_size as u32);
self.external_message_cache
.retain(|_, last_round| *last_round >= bottom_round);
}
}

Expand All @@ -59,12 +57,14 @@ mod tests {
use crate::mempool::types::ExternalMessage;

#[test]
pub fn cache_test() {
pub fn dedup_external_msgs() {
tycho_util::test::init_logger("dedup_external_msgs", "info");
let mut cache = ExternalMessageCache::new(100);
let executed_rounds = 1000;
for i in 0..executed_rounds {
for round_id in 0..1000 {
let mut messages = Vec::new();
if i < 100 {
let mut messages_info_len = Vec::new();
let mut removed_messages_info_len = Vec::new();
if round_id < 100 {
let message = Arc::new(ExternalMessage::new(
everscale_types::cell::Cell::empty_cell(),
ExtInMsgInfo::default(),
Expand All @@ -74,7 +74,9 @@ mod tests {
ExtInMsgInfo::default(),
));
messages.push(message);
messages_info_len.push(1); // orig 1
messages.push(message2);
messages_info_len.push(2); // dup of 1
}

let mut builder = everscale_types::cell::CellBuilder::new();
Expand All @@ -85,16 +87,29 @@ mod tests {
ExtInMsgInfo::default(),
));
messages.push(message);
messages_info_len.push(3); // orig 3

let message2 = Arc::new(ExternalMessage::new(
everscale_types::cell::Cell::empty_cell(),
ExtInMsgInfo::default(),
));
messages.push(message2);
messages_info_len.push(4); // dup of 1

if !messages.is_empty() {
let _ = cache.add_dedup_messages(i, messages);
let _ = cache.add_dedup_messages(
round_id as u32,
&mut messages,
&mut messages_info_len,
&mut removed_messages_info_len,
);
}
tracing::info!(
iter = round_id,
added = debug(messages_info_len),
removed = debug(removed_messages_info_len),
"dedup"
);
}

assert_eq!(cache.external_message_cache.len(), 2);
Expand Down
26 changes: 24 additions & 2 deletions collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ pub async fn handle_anchors(
let mut repr_hashes = FastHashSet::default();
let mut messages = Vec::new();

let mut messages_info_len = Vec::new();
let mut removed_messages_info_len = Vec::new();

for point in points {
'message: for message in &point.body.payload {
let cell = match Boc::decode(message) {
Expand Down Expand Up @@ -184,16 +187,35 @@ pub async fn handle_anchors(

if repr_hashes.insert(*cell.repr_hash()) {
messages.push(Arc::new(ExternalMessage::new(cell.clone(), ext_in_message)));
if tracing::enabled!(tracing::Level::INFO) {
messages_info_len.push(message.len());
}
} else if tracing::enabled!(tracing::Level::INFO) {
removed_messages_info_len.push(message.len());
}
}
}

let added_messages = cache.add_dedup_messages(anchor.body.location.round.0, messages);
cache.add_dedup_messages(
anchor.body.location.round.0,
&mut messages,
&mut messages_info_len,
&mut removed_messages_info_len,
);

tracing::info!(
target: tracing_targets::MEMPOOL_ADAPTER,
round = anchor.body.location.round.0,
time = anchor.body.time.as_u64(),
msgs_bytes_len = debug(messages_info_len),
removed_msgs_bytes_len = debug(removed_messages_info_len),
"got anchor"
);

let anchor = Arc::new(MempoolAnchor::new(
anchor.body.location.round.0,
anchor.body.time.as_u64(),
added_messages,
messages,
));

adapter.add_anchor(anchor);
Expand Down
6 changes: 5 additions & 1 deletion consensus/src/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,9 +270,13 @@ impl Engine {
{
tracing::info!(
parent: round_effects.span(),
digest = display(own_point.digest.alt()),
payload_ki_bytes = own_point
.body.payload.iter().map(|bytes| bytes.len()).sum::<usize>() / 1024,
digest = display(own_point.digest.alt()),
payload_msg_count = own_point.body.payload.iter().count(),
payload_len_bytes = debug(
own_point.body.payload.iter().map(|bytes| bytes.len()).collect::<Vec<_>>()
),
"produced point"
);
let state = current_dag_round.insert_exact_sign(
Expand Down

0 comments on commit 6387d12

Please sign in to comment.