Skip to content

Commit

Permalink
feat(int-queue): improve skip (#157)
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored Jul 5, 2024
2 parents 318417b + 5dd0712 commit c179d38
Showing 1 changed file with 83 additions and 52 deletions.
135 changes: 83 additions & 52 deletions collator/src/internal_queue/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,21 @@ use crate::internal_queue::error::QueueError;
use crate::internal_queue::state::state_iterator::{IterRange, MessageWithSource, ShardRange};
use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager;
use crate::internal_queue::types::{EnqueuedMessage, InternalMessageKey, QueueDiff};
use crate::tracing_targets;
use crate::types::ShardIdentExt;

pub trait QueueIterator: Send {
/// Get next message
fn next(&mut self, with_new: bool) -> Result<Option<IterItem>>; // Function to update the committed position
fn update_committed_position(&mut self, next_message: &Arc<MessageWithSource>); // Function to process the new messages
fn next(&mut self, with_new: bool) -> Result<Option<IterItem>>;
fn update_last_read_message(
&mut self,
source_shard: ShardIdent,
message_key: &InternalMessageKey,
);
fn process_new_messages(&mut self) -> Result<Option<IterItem>>;
/// Take diff from iterator
/// Move current position to commited position
/// Create new transaction
fn take_diff(&mut self) -> QueueDiff;
fn take_diff(&self) -> QueueDiff;
/// Commit processed messages
/// It's getting last message position for each shard and save
fn commit(&mut self, messages: Vec<(ShardIdent, InternalMessageKey)>) -> Result<()>;
Expand All @@ -35,6 +38,9 @@ pub struct QueueIteratorImpl {
messages_for_current_shard: BinaryHeap<Reverse<Arc<MessageWithSource>>>,
new_messages: FastHashMap<InternalMessageKey, Arc<EnqueuedMessage>>,
snapshot_manager: StatesIteratorsManager,
last_processed_message: FastHashMap<ShardIdent, InternalMessageKey>,
last_read_message_for_current_shard: FastHashMap<ShardIdent, InternalMessageKey>,
read_position: BTreeMap<ShardIdent, InternalMessageKey>,
}

impl QueueIteratorImpl {
Expand All @@ -50,6 +56,9 @@ impl QueueIteratorImpl {
new_messages: Default::default(),
commited_current_position: Default::default(),
snapshot_manager,
last_processed_message: Default::default(),
last_read_message_for_current_shard: Default::default(),
read_position: Default::default(),
})
}
}
Expand All @@ -74,16 +83,21 @@ impl QueueIterator for QueueIteratorImpl {
fn next(&mut self, with_new: bool) -> Result<Option<IterItem>> {
// Process the next message from the snapshot manager
while let Some(next_message) = self.snapshot_manager.next()? {
self.update_last_read_message(next_message.shard_id, &next_message.message.key());

if self
.for_shard
.contains_address(&next_message.message.info.dst)
{
self.last_read_message_for_current_shard
.insert(next_message.shard_id, next_message.message.key());

return Ok(Some(IterItem {
message_with_source: next_message.clone(),
is_new: false,
}));
} else {
self.update_committed_position(&next_message);
continue;
}
}

Expand All @@ -96,15 +110,19 @@ impl QueueIterator for QueueIteratorImpl {
}

// Function to update the committed position
fn update_committed_position(&mut self, next_message: &Arc<MessageWithSource>) {
self.commited_current_position
.entry(next_message.shard_id)
fn update_last_read_message(
&mut self,
source_shard: ShardIdent,
message_key: &InternalMessageKey,
) {
self.read_position
.entry(source_shard)
.and_modify(|e| {
if next_message.message.key() > *e {
*e = next_message.message.key().clone();
if message_key > e {
*e = message_key.clone();
}
})
.or_insert(next_message.message.key().clone());
.or_insert(message_key.clone());
}

// Function to process the new messages
Expand All @@ -127,65 +145,78 @@ impl QueueIterator for QueueIteratorImpl {
Ok(None)
}

fn take_diff(&mut self) -> QueueDiff {
fn take_diff(&self) -> QueueDiff {
tracing::trace!(
target: crate::tracing_targets::MQ,
"Taking diff from iterator. New messages count: {}",
self.new_messages.len());

let mut diff = QueueDiff::default();

for (shard_id, lt) in self.commited_current_position.iter() {
diff.processed_upto.insert(*shard_id, lt.clone());
}

let current_shard_processed_upto = self
.commited_current_position
.get(&self.for_shard)
.cloned()
.unwrap_or_default();

let amount_before = self.new_messages.len();
let mut read_position = self.read_position.clone();

let mut inserted_new_messages = 0;
// tracing::debug!(target: "local_debug", "Current shard processed upto: {:?}",current_shard_processed_upto);
// tracing::debug!(target: "local_debug", "Commited position: {:?} {:?}", self.commited_current_position, self.for_shard);
for processed_last_message in self.last_processed_message.iter() {
if !read_position.contains_key(&processed_last_message.0) {
read_position.insert(
processed_last_message.0.clone(),
processed_last_message.1.clone(),
);
}
}

for message in self.new_messages.values() {
if self.for_shard.contains_address(&message.info.dst) {
if message.key() > current_shard_processed_upto {
diff.messages.insert(message.key(), message.clone());
inserted_new_messages += 1;
for (shard_id, last_read_key) in read_position.iter() {
let last_read_message_for_current_shard = self
.last_read_message_for_current_shard
.get(&shard_id)
.cloned();
let processed_last_message = self.last_processed_message.get(&shard_id).cloned();

match (last_read_message_for_current_shard, processed_last_message) {
(Some(read_last_message), Some(processed_last_message)) => {
if read_last_message == processed_last_message {
// processed all read messages
diff.processed_upto
.insert(*shard_id, read_last_message.clone());
} else {
// processed greater than read or lower
diff.processed_upto
.insert(*shard_id, processed_last_message);
}
}
(Some(_read_last_message), None) => {
// read last message but no one processed. try again next time
// diff.processed_upto
// .insert(*shard_id, read_last_message.clone());
}
(None, Some(processed_last_message)) => {
// no old messages read, but some new processed
diff.processed_upto
.insert(*shard_id, processed_last_message.clone());
}
(None, None) => {
// no old messages read, no new processed
diff.processed_upto.insert(*shard_id, last_read_key.clone());
}
} else {
diff.messages.insert(message.key(), message.clone());
inserted_new_messages += 1;
}
}

tracing::trace!(
target: crate::tracing_targets::MQ,
"Inserted {} messages out of {} to diff",
inserted_new_messages,
amount_before);
for message in self.new_messages.values() {
diff.messages.insert(message.key(), message.clone());
}

diff
}

fn commit(&mut self, messages: Vec<(ShardIdent, InternalMessageKey)>) -> Result<()> {
tracing::info!(
target: tracing_targets::MQ,
"Committing messages to the iterator. Messages count: {}",
messages.len());

for message in messages {
if let Some(current_key) = self.commited_current_position.get_mut(&message.0) {
if message.1 > *current_key {
current_key.clone_from(&message.1);
}
} else {
self.commited_current_position.insert(message.0, message.1);
}
for (source_shard, message_key) in messages {
self.last_processed_message
.entry(source_shard)
.and_modify(|e| {
if message_key > *e {
*e = message_key.clone();
}
})
.or_insert(message_key.clone());
}
Ok(())
}
Expand Down

0 comments on commit c179d38

Please sign in to comment.