Skip to content

Commit

Permalink
fix(collator): use VecDeque for anchors_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
SmaGMan committed Jun 7, 2024
1 parent 92878a2 commit 360106e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 50 deletions.
29 changes: 13 additions & 16 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::collections::{HashMap, VecDeque};
use std::ops::Deref;
use std::sync::Arc;

Expand Down Expand Up @@ -544,7 +543,7 @@ impl CollatorStdImpl {

fn read_next_externals_impl(
shard_id: &ShardIdent,
anchors_cache: &mut BTreeMap<MempoolAnchorId, CachedMempoolAnchor>,
anchors_cache: &mut VecDeque<(MempoolAnchorId, CachedMempoolAnchor)>,
count: usize,
collation_data: &mut BlockCollationData,
) -> Result<(Vec<(MsgInfo, Cell)>, bool)> {
Expand Down Expand Up @@ -582,33 +581,32 @@ impl CollatorStdImpl {
let mut ext_messages = vec![];
let mut has_pending_externals_in_last_read_anchor = false;

let mut next_key = anchors_cache.first_key_value().map_or(0, |(key, _)| *key);
let mut next_idx = 0;
loop {
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"try read next anchor next_key: {}", next_key,
"try read next anchor from cache",
);
// try read next anchor
let next_entry = anchors_cache.entry(next_key);
let next_entry = anchors_cache.get(next_idx);
let entry = match next_entry {
Entry::Occupied(entry) => entry,
Some(entry) => entry,
// stop reading if there is no next anchor
Entry::Vacant(_) => {
None => {
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"no entry in anchors cache by key {}", next_key,
"no next entry in anchors cache",
);
break;
}
};

let key = *entry.key();
let key = entry.0;
if key < was_read_upto.0 {
// skip and remove already processed anchor from cache
let _ = entry.remove();
let _ = anchors_cache.remove(next_idx);
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"anchor with key {} already processed, removed from anchors cache", key,
);
// try read next anchor
next_key += 1;
continue;
} else {
if read_from_anchor_opt.is_none() {
Expand All @@ -622,16 +620,15 @@ impl CollatorStdImpl {
"last_read_anchor: {}", key,
);

let anchor = &entry.get().anchor;
let anchor = &entry.1.anchor;

if key == was_read_upto.0 && anchor.externals_count() == was_read_upto.1 as usize {
// skip and remove fully processed anchor
let _ = entry.remove();
let _ = anchors_cache.remove(next_idx);
tracing::debug!(target: tracing_targets::COLLATOR_READ_NEXT_EXTS,
"anchor with key {} fully processed, removed from anchors cache", key,
);
// try read next anchor
next_key += 1;
continue;
}

Expand Down Expand Up @@ -688,7 +685,7 @@ impl CollatorStdImpl {
break;
}
// try read next anchor
next_key += 1;
next_idx += 1;
}
}

Expand Down
10 changes: 5 additions & 5 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::VecDeque;
use std::sync::Arc;

use anyhow::Result;
Expand Down Expand Up @@ -173,7 +173,7 @@ pub struct CollatorStdImpl {

/// The cache of imported from mempool anchors that were not processed yet.
/// Anchor is removed from the cache when all its externals are processed.
anchors_cache: BTreeMap<MempoolAnchorId, CachedMempoolAnchor>,
anchors_cache: VecDeque<(MempoolAnchorId, CachedMempoolAnchor)>,

last_imported_anchor_id: Option<MempoolAnchorId>,
last_imported_anchor_chain_time: Option<u64>,
Expand Down Expand Up @@ -230,7 +230,7 @@ impl CollatorStdImpl {
shard_id,
working_state: None,

anchors_cache: BTreeMap::new(),
anchors_cache: VecDeque::new(),
last_imported_anchor_id: None,
last_imported_anchor_chain_time: None,

Expand Down Expand Up @@ -500,10 +500,10 @@ impl CollatorStdImpl {
self.last_imported_anchor_id = Some(next_anchor.id());
self.last_imported_anchor_chain_time = Some(next_anchor.chain_time());
self.anchors_cache
.insert(next_anchor.id(), CachedMempoolAnchor {
.push_back((next_anchor.id(), CachedMempoolAnchor {
anchor: next_anchor.clone(),
has_externals,
});
}));

Ok((next_anchor, has_externals))
}
Expand Down
61 changes: 32 additions & 29 deletions collator/src/collator/tests/do_collate_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::BTreeMap;
use std::collections::VecDeque;

use everscale_types::models::{ExternalsProcessedUpto, ShardIdent};

Expand All @@ -11,11 +11,14 @@ use crate::test_utils::try_init_test_tracing;
fn test_read_next_externals() {
try_init_test_tracing(tracing_subscriber::filter::LevelFilter::TRACE);

let mut anchors_cache = BTreeMap::new();
let mut anchors_cache = VecDeque::new();

let shard_id = ShardIdent::new_full(0);

for anchor_id in 1..=10 {
for anchor_id in 1..=40 {
if anchor_id % 4 != 0 {
continue;
}
let anchor = _stub_create_random_anchor_with_stub_externals(anchor_id);
let has_externals = anchor.check_has_externals_for(&shard_id);
tracing::trace!(
Expand All @@ -31,10 +34,10 @@ fn test_read_next_externals() {
.collect::<Vec<_>>()
.as_slice(),
);
anchors_cache.insert(anchor_id, CachedMempoolAnchor {
anchors_cache.push_back((anchor_id, CachedMempoolAnchor {
anchor,
has_externals,
});
}));
}

let mut collation_data = BlockCollationData::default();
Expand All @@ -50,15 +53,15 @@ fn test_read_next_externals() {
assert_eq!(externals.len(), 3);
assert!(has_pending_externals);
let ext_processed_upto = collation_data.processed_upto.externals.as_ref().unwrap();
assert_eq!(ext_processed_upto.processed_to, (1, 0));
assert_eq!(ext_processed_upto.read_to, (1, 3));
let kv = anchors_cache.first_key_value().unwrap();
assert_eq!(*kv.0, 1);
assert_eq!(ext_processed_upto.processed_to, (4, 0));
assert_eq!(ext_processed_upto.read_to, (4, 3));
let kv = anchors_cache.front().unwrap();
assert_eq!(kv.0, 4);

collation_data.externals_reading_started = false;
collation_data.processed_upto.externals = Some(ExternalsProcessedUpto {
processed_to: (1, 3),
read_to: (2, 1),
processed_to: (4, 3),
read_to: (8, 1),
});

let (externals, has_pending_externals) = CollatorStdImpl::read_next_externals_impl(
Expand All @@ -72,10 +75,10 @@ fn test_read_next_externals() {
assert_eq!(externals.len(), 3);
assert!(has_pending_externals);
let ext_processed_upto = collation_data.processed_upto.externals.as_ref().unwrap();
assert_eq!(ext_processed_upto.processed_to, (1, 3));
assert_eq!(ext_processed_upto.read_to, (2, 1));
let kv = anchors_cache.first_key_value().unwrap();
assert_eq!(*kv.0, 1);
assert_eq!(ext_processed_upto.processed_to, (4, 3));
assert_eq!(ext_processed_upto.read_to, (8, 3));
let kv = anchors_cache.front().unwrap();
assert_eq!(kv.0, 4);

let (externals, has_pending_externals) = CollatorStdImpl::read_next_externals_impl(
&shard_id,
Expand All @@ -88,10 +91,10 @@ fn test_read_next_externals() {
assert_eq!(externals.len(), 10);
assert!(has_pending_externals);
let ext_processed_upto = collation_data.processed_upto.externals.as_ref().unwrap();
assert_eq!(ext_processed_upto.processed_to, (1, 3));
assert_eq!(ext_processed_upto.read_to, (4, 3));
let kv = anchors_cache.first_key_value().unwrap();
assert_eq!(*kv.0, 2);
assert_eq!(ext_processed_upto.processed_to, (4, 3));
assert_eq!(ext_processed_upto.read_to, (16, 6));
let kv = anchors_cache.front().unwrap();
assert_eq!(kv.0, 8);

let (externals, has_pending_externals) = CollatorStdImpl::read_next_externals_impl(
&shard_id,
Expand All @@ -104,10 +107,10 @@ fn test_read_next_externals() {
assert_eq!(externals.len(), 16);
assert!(!has_pending_externals);
let ext_processed_upto = collation_data.processed_upto.externals.as_ref().unwrap();
assert_eq!(ext_processed_upto.processed_to, (1, 3));
assert_eq!(ext_processed_upto.read_to, (10, 0));
let kv = anchors_cache.first_key_value().unwrap();
assert_eq!(*kv.0, 4);
assert_eq!(ext_processed_upto.processed_to, (4, 3));
assert_eq!(ext_processed_upto.read_to, (40, 0));
let kv = anchors_cache.front().unwrap();
assert_eq!(kv.0, 20);

let (externals, has_pending_externals) = CollatorStdImpl::read_next_externals_impl(
&shard_id,
Expand All @@ -120,9 +123,9 @@ fn test_read_next_externals() {
assert_eq!(externals.len(), 0);
assert!(!has_pending_externals);
let ext_processed_upto = collation_data.processed_upto.externals.as_ref().unwrap();
assert_eq!(ext_processed_upto.processed_to, (1, 3));
assert_eq!(ext_processed_upto.read_to, (10, 0));
let kv = anchors_cache.first_key_value();
assert_eq!(ext_processed_upto.processed_to, (4, 3));
assert_eq!(ext_processed_upto.read_to, (40, 0));
let kv = anchors_cache.front();
assert!(kv.is_none());

// all anchors removed from cache, should not fail on empty cache
Expand All @@ -137,8 +140,8 @@ fn test_read_next_externals() {
assert_eq!(externals.len(), 0);
assert!(!has_pending_externals);
let ext_processed_upto = collation_data.processed_upto.externals.as_ref().unwrap();
assert_eq!(ext_processed_upto.processed_to, (1, 3));
assert_eq!(ext_processed_upto.read_to, (10, 0));
let kv = anchors_cache.first_key_value();
assert_eq!(ext_processed_upto.processed_to, (4, 3));
assert_eq!(ext_processed_upto.read_to, (40, 0));
let kv = anchors_cache.front();
assert!(kv.is_none());
}

0 comments on commit 360106e

Please sign in to comment.