Skip to content

Commit

Permalink
Replace IndexMap, because of O(n) shift_remove
Browse files Browse the repository at this point in the history
  • Loading branch information
mystenmark committed May 28, 2024
1 parent 60aed80 commit bfee8ca
Showing 1 changed file with 106 additions and 10 deletions.
116 changes: 106 additions & 10 deletions crates/sui-core/src/transaction_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,12 @@
// SPDX-License-Identifier: Apache-2.0

use std::{
cmp::max,
collections::{BTreeSet, HashMap, HashSet},
cmp::{max, Reverse},
collections::{BTreeSet, BinaryHeap, HashMap, HashSet},
sync::Arc,
time::Duration,
};

use indexmap::IndexMap;
use lru::LruCache;
use mysten_metrics::monitored_scope;
use parking_lot::RwLock;
Expand Down Expand Up @@ -227,7 +226,7 @@ struct Inner {
// Stores age info for all transactions depending on each object.
// Used for throttling signing and submitting transactions depending on hot objects.
// An `IndexMap` is used to ensure that the insertion order is preserved.
input_objects: HashMap<ObjectID, IndexMap<TransactionDigest, Instant>>,
input_objects: HashMap<ObjectID, TransactionQueue>,

// Maps object IDs to the highest observed sequence number of the object. When the value is
// None, indicates that the object is immutable, corresponding to an InputKey with no sequence
Expand Down Expand Up @@ -286,12 +285,10 @@ impl Inner {
)
});
for digest in digests.iter() {
let age_opt = input_txns.shift_remove(digest);
// The digest of the transaction must be inside the map.
assert!(age_opt.is_some());
let age_opt = input_txns.remove(digest).expect("digest must be in map");
metrics
.transaction_manager_transaction_queue_age_s
.observe(age_opt.unwrap().elapsed().as_secs_f64());
.observe(age_opt.elapsed().as_secs_f64());
}

if input_txns.is_empty() {
Expand Down Expand Up @@ -786,12 +783,12 @@ impl TransactionManager {
let inner = reconfig_lock.read();
keys.into_iter()
.map(|key| {
let default_map = IndexMap::new();
let default_map = TransactionQueue::default();
let txns = inner.input_objects.get(&key).unwrap_or(&default_map);
(
key,
txns.len(),
txns.first().map(|(_, time)| time.elapsed()),
txns.first().map(|(time, _)| time.elapsed()),
)
})
.collect()
Expand Down Expand Up @@ -939,6 +936,48 @@ where
}
}

#[derive(Default, Debug)]
struct TransactionQueue {
digests: HashMap<TransactionDigest, Instant>,
ages: BinaryHeap<(Reverse<Instant>, TransactionDigest)>,
}

impl TransactionQueue {
fn len(&self) -> usize {
self.digests.len()
}

fn is_empty(&self) -> bool {
self.digests.is_empty()
}

fn insert(&mut self, digest: TransactionDigest, time: Instant) {
if self.digests.insert(digest, time).is_none() {
self.ages.push((Reverse(time), digest));
}
}

fn remove(&mut self, digest: &TransactionDigest) -> Option<Instant> {
let Some(when) = self.digests.remove(digest) else {
return None;
};

while !self.ages.is_empty()
&& !self
.digests
.contains_key(&self.ages.peek().expect("heap cannot be empty").1)
{
self.ages.pop();
}

Some(when)
}

fn first(&self) -> Option<(Instant, TransactionDigest)> {
self.ages.peek().map(|(time, digest)| (time.0, *digest))
}
}

#[cfg(test)]
mod test {
use super::*;
Expand Down Expand Up @@ -1015,4 +1054,61 @@ mod test {
};
assert_eq!(cache.is_object_available(&input_key), Some(true));
}

#[test]
fn test_transaction_queue() {
let mut queue = TransactionQueue::default();

// insert and remove an item
let time = Instant::now();
let digest = TransactionDigest::new([1; 32]);
queue.insert(digest, time);
assert_eq!(queue.first(), Some((time, digest)));
queue.remove(&digest);
assert_eq!(queue.first(), None);

// remove a non-existent item
assert_eq!(queue.remove(&digest), None);
}

#[test]
fn test_transaction_queue_remove_in_order() {
// insert two items, remove them in insertion order
let time1 = Instant::now();
let digest1 = TransactionDigest::new([1; 32]);
let time2 = time1 + Duration::from_secs(1);
let digest2 = TransactionDigest::new([2; 32]);

let mut queue = TransactionQueue::default();
queue.insert(digest1, time1);
queue.insert(digest2, time2);

assert_eq!(queue.first(), Some((time1, digest1)));
assert_eq!(queue.remove(&digest1), Some(time1));
assert_eq!(queue.first(), Some((time2, digest2)));
assert_eq!(queue.remove(&digest2), Some(time2));
assert_eq!(queue.first(), None);
}

#[test]
fn test_transaction_queue_remove_in_reverse_order() {
// insert two items, remove them in reverse order
let time1 = Instant::now();
let digest1 = TransactionDigest::new([1; 32]);
let time2 = time1 + Duration::from_secs(1);
let digest2 = TransactionDigest::new([2; 32]);

let mut queue = TransactionQueue::default();
queue.insert(digest1, time1);
queue.insert(digest2, time2);

assert_eq!(queue.first(), Some((time1, digest1)));
assert_eq!(queue.remove(&digest2), Some(time2));

// after removing digest2, digest1 is still the first item
assert_eq!(queue.first(), Some((time1, digest1)));
assert_eq!(queue.remove(&digest1), Some(time1));

assert_eq!(queue.first(), None);
}
}

0 comments on commit bfee8ca

Please sign in to comment.