Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(collator): clear session queue #111

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,13 @@ where
let session_state_lock = self.session_state.lock().await;
session_state_lock.remove_diff(diff_id).await?
};
if let Some(diff) = &diff {
let persistent_state_lock = self.persistent_state.write().await;
persistent_state_lock
.add_messages(*diff_id, diff.messages.clone())
.await?;
}

// if let Some(diff) = &diff {
// let persistent_state_lock = self.persistent_state.write().await;
// persistent_state_lock
// .add_messages(*diff_id, diff.messages.clone())
// .await?;
// }
Ok(diff)
}
}
29 changes: 28 additions & 1 deletion collator/src/internal_queue/shard.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use std::sync::Arc;

use everscale_types::models::BlockIdShort;
use everscale_types::models::{BlockIdShort, ShardIdent};

use crate::internal_queue::types::{EnqueuedMessage, InternalMessageKey, QueueDiff};
use crate::tracing_targets;
Expand Down Expand Up @@ -33,4 +33,31 @@ impl Shard {
}
None
}

// stub
pub fn clear_processed_messages(
&mut self,
diff_shard: ShardIdent,
key: &InternalMessageKey,
) -> anyhow::Result<()> {
let mut keys_to_remove = vec![];

for (k, v) in self.outgoing_messages.iter() {
if k > key {
break;
}

let (chain, acc) = v.destination()?;
// if we proccessed message then delete it
if chain as i32 == diff_shard.workchain() && diff_shard.contains_account(&acc) {
keys_to_remove.push(k.clone());
}
}

for k in keys_to_remove {
self.outgoing_messages.remove(&k);
}

Ok(())
}
}
37 changes: 36 additions & 1 deletion collator/src/internal_queue/state/session/session_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::internal_queue::shard::Shard;
use crate::internal_queue::state::session::session_state_iterator::SessionStateIterator;
use crate::internal_queue::state::state_iterator::{ShardRange, StateIterator};
use crate::internal_queue::types::QueueDiff;
use crate::tracing_targets;

// FACTORY

Expand Down Expand Up @@ -98,11 +99,27 @@ impl SessionState for SessionStateStdImpl {
for_shard_id: ShardIdent,
) -> Box<dyn StateIterator> {
let shards_flat_read = self.shards_flat.read().await;
let mut total_messages = 0;

let mut flat_shards = FastHashMap::default();
for (shard_ident, shard_lock) in shards_flat_read.iter() {
let shard = shard_lock.read().await;
flat_shards.insert(*shard_ident, shard.clone());
total_messages += shard.outgoing_messages.len();
}

tracing::info!(
target: tracing_targets::MQ,
"Creating iterator for shard {} with {} messages",
for_shard_id,
total_messages
);

let labels = [("workchain", for_shard_id.workchain().to_string())];

metrics::histogram!("tycho_session_iterator_messages_all", &labels)
.record(total_messages as f64);

Box::new(SessionStateIterator::new(
flat_shards,
ranges,
Expand Down Expand Up @@ -228,7 +245,25 @@ impl SessionState for SessionStateStdImpl {
.ok_or(QueueError::ShardNotFound(diff_id.shard))?
.clone()
};
let diff = shard_arc.write().await.remove_diff(diff_id);

// TODO clean session queue instead cleaning persistent queue
let diff = shard_arc.write().await.diffs.get(diff_id).cloned();
let processed_uptos = diff.as_ref().map(|d| &d.processed_upto);

if let Some(processed_uptos) = processed_uptos {
let diff_shard = diff_id.shard;
let flat_shards = self.shards_flat.write().await;

for processed_upto in processed_uptos {
let shard = flat_shards
.get(processed_upto.0)
.ok_or(QueueError::ShardNotFound(*processed_upto.0))?;

let mut shard_guard = shard.write().await;
shard_guard.clear_processed_messages(diff_shard, processed_upto.1)?;
}
}

Ok(diff)
}
}
Expand Down
4 changes: 2 additions & 2 deletions collator/src/queue_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ impl MessageQueueAdapter for MessageQueueAdapterStdImpl {
"Committing diff to the queue"
);
// HACK: do not commit diff to avoid incorrect msgs set reading for collation
// let diff = self.queue.commit_diff(diff_id).await?;
let diff = None;
let diff = self.queue.commit_diff(diff_id).await?;
// let diff = None;
Ok(diff)
}

Expand Down
44 changes: 22 additions & 22 deletions collator/tests/internal_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,28 +144,28 @@ async fn intershard_message_delivery_test() -> anyhow::Result<()> {
adapter.commit_diff(&block_id).await?;

// peek message from persistent state
// let mut iterator = adapter
// .create_iterator(shard_id_2, from_ranges.clone(), to_ranges.clone())
// .await
// .unwrap();
//
// let peek_message = iterator.peek(false)?;
// assert!(peek_message.is_some());
//
// let next_message = iterator.next(false)?;
//
// assert!(next_message.is_some());
// // println!("{:?}", peek_message.unwrap().message_with_source.message.hash);
// let next_message = next_message.unwrap();
// println!("{:?}", next_message.message_with_source.message.info.dst);
// assert_eq!(
// next_message.message_with_source.message.info.dst,
// int_message.dst
// );
// assert_eq!(next_message.message_with_source.shard_id, shard_id_1);
//
// let next_message = iterator.next(false)?;
// assert!(next_message.is_none());
let mut iterator = adapter
.create_iterator(shard_id_2, from_ranges.clone(), to_ranges.clone())
.await
.unwrap();

let peek_message = iterator.peek(false)?;
assert!(peek_message.is_some());

let next_message = iterator.next(false)?;

assert!(next_message.is_some());
// println!("{:?}", peek_message.unwrap().message_with_source.message.hash);
let next_message = next_message.unwrap();
println!("{:?}", next_message.message_with_source.message.info.dst);
assert_eq!(
next_message.message_with_source.message.info.dst,
int_message.dst
);
assert_eq!(next_message.message_with_source.shard_id, shard_id_1);

let next_message = iterator.next(false)?;
assert!(next_message.is_none());

Ok(())
}
Loading