diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index 03a88b1b7..c6bd39442 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -154,12 +154,13 @@ where let session_state_lock = self.session_state.lock().await; session_state_lock.remove_diff(diff_id).await? }; - if let Some(diff) = &diff { - let persistent_state_lock = self.persistent_state.write().await; - persistent_state_lock - .add_messages(*diff_id, diff.messages.clone()) - .await?; - } + + // if let Some(diff) = &diff { + // let persistent_state_lock = self.persistent_state.write().await; + // persistent_state_lock + // .add_messages(*diff_id, diff.messages.clone()) + // .await?; + // } Ok(diff) } } diff --git a/collator/src/internal_queue/shard.rs b/collator/src/internal_queue/shard.rs index 48e3e0dbb..8712bd4bd 100644 --- a/collator/src/internal_queue/shard.rs +++ b/collator/src/internal_queue/shard.rs @@ -1,7 +1,7 @@ use std::collections::BTreeMap; use std::sync::Arc; -use everscale_types::models::BlockIdShort; +use everscale_types::models::{BlockIdShort, ShardIdent}; use crate::internal_queue::types::{EnqueuedMessage, InternalMessageKey, QueueDiff}; use crate::tracing_targets; @@ -33,4 +33,31 @@ impl Shard { } None } + + // stub + pub fn clear_processed_messages( + &mut self, + diff_shard: ShardIdent, + key: &InternalMessageKey, + ) -> anyhow::Result<()> { + let mut keys_to_remove = vec![]; + + for (k, v) in self.outgoing_messages.iter() { + if k > key { + break; + } + + let (chain, acc) = v.destination()?; + // if we proccessed message then delete it + if chain as i32 == diff_shard.workchain() && diff_shard.contains_account(&acc) { + keys_to_remove.push(k.clone()); + } + } + + for k in keys_to_remove { + self.outgoing_messages.remove(&k); + } + + Ok(()) + } } diff --git a/collator/src/internal_queue/state/session/session_state.rs b/collator/src/internal_queue/state/session/session_state.rs index 9c70b85b2..64fbce87b 100644 --- a/collator/src/internal_queue/state/session/session_state.rs +++ b/collator/src/internal_queue/state/session/session_state.rs @@ -9,6 +9,7 @@ use crate::internal_queue::shard::Shard; use crate::internal_queue::state::session::session_state_iterator::SessionStateIterator; use crate::internal_queue::state::state_iterator::{ShardRange, StateIterator}; use crate::internal_queue::types::QueueDiff; +use crate::tracing_targets; // FACTORY @@ -98,11 +99,27 @@ impl SessionState for SessionStateStdImpl { for_shard_id: ShardIdent, ) -> Box { let shards_flat_read = self.shards_flat.read().await; + let mut total_messages = 0; + let mut flat_shards = FastHashMap::default(); for (shard_ident, shard_lock) in shards_flat_read.iter() { let shard = shard_lock.read().await; flat_shards.insert(*shard_ident, shard.clone()); + total_messages += shard.outgoing_messages.len(); } + + tracing::info!( + target: tracing_targets::MQ, + "Creating iterator for shard {} with {} messages", + for_shard_id, + total_messages + ); + + let labels = [("workchain", for_shard_id.workchain().to_string())]; + + metrics::histogram!("tycho_session_iterator_messages_all", &labels) + .record(total_messages as f64); + Box::new(SessionStateIterator::new( flat_shards, ranges, @@ -228,7 +245,25 @@ impl SessionState for SessionStateStdImpl { .ok_or(QueueError::ShardNotFound(diff_id.shard))? .clone() }; - let diff = shard_arc.write().await.remove_diff(diff_id); + + // TODO clean session queue instead cleaning persistent queue + let diff = shard_arc.write().await.diffs.get(diff_id).cloned(); + let processed_uptos = diff.as_ref().map(|d| &d.processed_upto); + + if let Some(processed_uptos) = processed_uptos { + let diff_shard = diff_id.shard; + let flat_shards = self.shards_flat.write().await; + + for processed_upto in processed_uptos { + let shard = flat_shards + .get(processed_upto.0) + .ok_or(QueueError::ShardNotFound(*processed_upto.0))?; + + let mut shard_guard = shard.write().await; + shard_guard.clear_processed_messages(diff_shard, processed_upto.1)?; + } + } + Ok(diff) } } diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 8d9a882e9..df2182ad7 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -129,8 +129,8 @@ impl MessageQueueAdapter for MessageQueueAdapterStdImpl { "Committing diff to the queue" ); // HACK: do not commit diff to avoid incorrect msgs set reading for collation - // let diff = self.queue.commit_diff(diff_id).await?; - let diff = None; + let diff = self.queue.commit_diff(diff_id).await?; + // let diff = None; Ok(diff) } diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index a8a89563f..8e71439af 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -144,28 +144,28 @@ async fn intershard_message_delivery_test() -> anyhow::Result<()> { adapter.commit_diff(&block_id).await?; // peek message from persistent state - // let mut iterator = adapter - // .create_iterator(shard_id_2, from_ranges.clone(), to_ranges.clone()) - // .await - // .unwrap(); - // - // let peek_message = iterator.peek(false)?; - // assert!(peek_message.is_some()); - // - // let next_message = iterator.next(false)?; - // - // assert!(next_message.is_some()); - // // println!("{:?}", peek_message.unwrap().message_with_source.message.hash); - // let next_message = next_message.unwrap(); - // println!("{:?}", next_message.message_with_source.message.info.dst); - // assert_eq!( - // next_message.message_with_source.message.info.dst, - // int_message.dst - // ); - // assert_eq!(next_message.message_with_source.shard_id, shard_id_1); - // - // let next_message = iterator.next(false)?; - // assert!(next_message.is_none()); + let mut iterator = adapter + .create_iterator(shard_id_2, from_ranges.clone(), to_ranges.clone()) + .await + .unwrap(); + + let peek_message = iterator.peek(false)?; + assert!(peek_message.is_some()); + + let next_message = iterator.next(false)?; + + assert!(next_message.is_some()); + // println!("{:?}", peek_message.unwrap().message_with_source.message.hash); + let next_message = next_message.unwrap(); + println!("{:?}", next_message.message_with_source.message.info.dst); + assert_eq!( + next_message.message_with_source.message.info.dst, + int_message.dst + ); + assert_eq!(next_message.message_with_source.shard_id, shard_id_1); + + let next_message = iterator.next(false)?; + assert!(next_message.is_none()); Ok(()) }