Skip to content

Commit

Permalink
chore(sequencing): replace BTreeMap with HashMap in StreamHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
guy-starkware committed Dec 8, 2024
1 parent bb57c46 commit 60d4672
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 18 deletions.
17 changes: 8 additions & 9 deletions crates/sequencing/papyrus_consensus/src/stream_handler.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
//! Stream handler, see StreamManager struct.
use std::cmp::Ordering;
use std::collections::btree_map::Entry as BTreeEntry;
use std::collections::hash_map::Entry as HashMapEntry;
use std::collections::{BTreeMap, HashMap};
use std::collections::hash_map::Entry::{Occupied, Vacant};
use std::collections::HashMap;

use futures::channel::mpsc;
use futures::StreamExt;
Expand Down Expand Up @@ -39,7 +38,7 @@ struct StreamData<
max_message_id_received: MessageId,
sender: mpsc::Sender<T>,
// A buffer for messages that were received out of order.
message_buffer: BTreeMap<MessageId, StreamMessage<T>>,
message_buffer: HashMap<MessageId, StreamMessage<T>>,
}

impl<T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError>> StreamData<T> {
Expand All @@ -49,7 +48,7 @@ impl<T: Clone + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversionError
fin_message_id: None,
max_message_id_received: 0,
sender,
message_buffer: BTreeMap::new(),
message_buffer: HashMap::new(),
}
}
}
Expand Down Expand Up @@ -232,8 +231,8 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
let message_id = message.message_id;

let data = match self.inbound_stream_data.entry(key.clone()) {
HashMapEntry::Occupied(entry) => entry.into_mut(),
HashMapEntry::Vacant(e) => {
Occupied(entry) => entry.into_mut(),
Vacant(e) => {
// If we received a message for a stream that we have not seen before,
// we need to create a new receiver for it.
let (sender, receiver) = mpsc::channel(CHANNEL_BUFFER_LENGTH);
Expand Down Expand Up @@ -309,10 +308,10 @@ impl<T: Clone + Send + Into<Vec<u8>> + TryFrom<Vec<u8>, Error = ProtobufConversi
let message_id = message.message_id;

match data.message_buffer.entry(message_id) {
BTreeEntry::Vacant(e) => {
Vacant(e) => {
e.insert(message);
}
BTreeEntry::Occupied(_) => {
Occupied(_) => {
// TODO(guyn): replace warnings with more graceful error handling
warn!(
"Two messages with the same message_id in buffer! key: {:?}, message_id: {}",
Expand Down
25 changes: 16 additions & 9 deletions crates/sequencing/papyrus_consensus/src/stream_handler_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,17 @@ mod tests {
StreamMessage { message: content, stream_id, message_id }
}

// Check if two vectors are the same:
fn do_vecs_match<T: PartialEq>(a: &[T], b: &[T]) -> bool {
let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a == b).count();
matching == a.len() && matching == b.len()
// Check if two vectors are the same, regardless of ordering
fn do_vecs_match_unordered<T: PartialEq>(a: &[T], b: &[T]) -> bool
where
T: std::hash::Hash + Eq,
{
if a.len() != b.len() {
return false;
}
let set_a: std::collections::HashSet<_> = a.iter().collect();
let set_b: std::collections::HashSet<_> = b.iter().collect();
set_a == set_b
}

async fn send(
Expand Down Expand Up @@ -183,7 +190,7 @@ mod tests {
.message_buffer
.into_keys()
.collect();
assert!(do_vecs_match(&keys, &range));
assert!(do_vecs_match_unordered(&keys, &range));

// Now send the last message:
send(&mut network_sender, &inbound_metadata, make_test_message(stream_id, 0, false)).await;
Expand Down Expand Up @@ -258,7 +265,7 @@ mod tests {
);

// We have all message from 1 to 9 buffered.
assert!(do_vecs_match(
assert!(do_vecs_match_unordered(
&stream_handler.inbound_stream_data[&(peer_id.clone(), stream_id1)]
.message_buffer
.clone()
Expand All @@ -268,7 +275,7 @@ mod tests {
));

// We have all message from 1 to 5 buffered.
assert!(do_vecs_match(
assert!(do_vecs_match_unordered(
&stream_handler.inbound_stream_data[&(peer_id.clone(), stream_id2)]
.message_buffer
.clone()
Expand All @@ -278,7 +285,7 @@ mod tests {
));

// We have all message from 1 to 5 buffered.
assert!(do_vecs_match(
assert!(do_vecs_match_unordered(
&stream_handler.inbound_stream_data[&(peer_id.clone(), stream_id3)]
.message_buffer
.clone()
Expand Down Expand Up @@ -486,7 +493,7 @@ mod tests {
vec1.sort();
let mut vec2 = vec![&stream_id1, &stream_id2];
vec2.sort();
do_vecs_match(&vec1, &vec2);
do_vecs_match_unordered(&vec1, &vec2);
assert_eq!(stream_handler.outbound_stream_number[&stream_id2], 1);

// Close the first channel.
Expand Down

0 comments on commit 60d4672

Please sign in to comment.