From 52f38bd7904b65a03193f8f867de244e5c14969a Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Wed, 12 Feb 2025 16:22:01 +0100 Subject: [PATCH 1/4] feature(collator): int queue version --- collator/src/internal_queue/queue.rs | 45 +-- .../internal_queue/state/commited_state.rs | 11 +- .../internal_queue/state/uncommitted_state.rs | 11 +- collator/src/manager/mod.rs | 16 +- collator/src/queue_adapter.rs | 12 +- collator/tests/internal_queue.rs | 145 ++++++++-- core/src/block_strider/starter/cold_boot.rs | 2 +- storage/src/db/kv_db/mod.rs | 1 + storage/src/db/kv_db/tables.rs | 15 + storage/src/store/internal_queue/mod.rs | 256 ++++++++++-------- 10 files changed, 348 insertions(+), 166 deletions(-) diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index 4bc2ce986..4966ddcb5 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -3,9 +3,9 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use everscale_types::cell::HashBytes; -use everscale_types::models::{BlockIdShort, ShardIdent}; +use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; use serde::{Deserialize, Serialize}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; use tycho_util::metrics::HistogramGuard; @@ -91,7 +91,7 @@ where max_message: QueueKey, ) -> Result<()>; /// Move messages from uncommitted state to committed state and update gc ranges - fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()>; + fn commit_diff(&self, mc_top_blocks: &[(BlockId, bool)]) -> Result<()>; /// remove all data in uncommitted state storage fn clear_uncommitted_state(&self) -> Result<()>; /// Returns the number of diffs in cache for the given shard @@ -110,6 +110,8 @@ where fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option; /// Check if diff exists in the cache fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool; + /// Get last applied mc block id from committed state + fn get_last_applied_mc_block_id(&self) -> Result>; } // IMPLEMENTATION @@ -309,29 +311,33 @@ where Ok(()) } - fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()> { + fn commit_diff(&self, mc_top_blocks: &[(BlockId, bool)]) -> Result<()> { + let mc_block_id = mc_top_blocks + .iter() + .find(|(block_id, _)| block_id.is_masterchain()) + .map(|(block_id, _)| block_id) + .ok_or_else(|| anyhow!("Masterchain block not found in commit_diff"))?; + let mut partitions = FastHashSet::default(); // insert default partition because we doesn't store it in router partitions.insert(QueuePartitionIdx::default()); let mut shards_to_commit = FastHashMap::default(); let mut gc_ranges = FastHashMap::default(); - for (block_id_short, top_shard_block_changed) in mc_top_blocks { + for (block_id, top_shard_block_changed) in mc_top_blocks { let mut diffs_to_commit = vec![]; // find all uncommited diffs for the given shard top block - let prev_shard_uncommitted_diffs = - self.uncommitted_diffs.get_mut(&block_id_short.shard); + let prev_shard_uncommitted_diffs = self.uncommitted_diffs.get_mut(&block_id.shard); if let Some(mut shard_uncommitted_diffs) = prev_shard_uncommitted_diffs { // iterate over all uncommitted diffs for the given shard until the top block seqno - shard_uncommitted_diffs - .range(..=block_id_short.seqno) - .for_each(|(block_seqno, shard_diff)| { + shard_uncommitted_diffs.range(..=block_id.seqno).for_each( + |(block_seqno, shard_diff)| { diffs_to_commit.push(*block_seqno); let current_last_key = shards_to_commit - .entry(block_id_short.shard) + .entry(block_id.shard) .or_insert_with(|| *shard_diff.max_message()); // Add all partitions from the router to the partitions set @@ -346,7 +352,7 @@ where } // find min processed_to for each shard for GC - if *block_seqno == block_id_short.seqno && *top_shard_block_changed { + if *block_seqno == block_id.seqno && *top_shard_block_changed { for (shard_ident, processed_to_key) in shard_diff.processed_to().iter() { let last_key = gc_ranges @@ -358,16 +364,15 @@ where } } } - }); + }, + ); // remove all diffs from uncommitted state that are going to be committed for seqno in diffs_to_commit { if let Some(diff) = shard_uncommitted_diffs.remove(&seqno) { // Move the diff to committed_diffs - let mut shard_committed_diffs = self - .committed_diffs - .entry(block_id_short.shard) - .or_default(); + let mut shard_committed_diffs = + self.committed_diffs.entry(block_id.shard).or_default(); shard_committed_diffs.insert(seqno, diff); } } @@ -385,7 +390,7 @@ where // move all uncommitted diffs messages to committed state self.uncommitted_state - .commit(partitions.clone(), &commit_ranges)?; + .commit(partitions.clone(), &commit_ranges, mc_block_id)?; let uncommitted_diffs_count: usize = self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); @@ -515,4 +520,8 @@ where .get(&block_id_short.shard) .is_some_and(|diffs| diffs.contains_key(&block_id_short.seqno)) } + + fn get_last_applied_mc_block_id(&self) -> Result> { + self.committed_state.get_last_applied_mc_block_id() + } } diff --git a/collator/src/internal_queue/state/commited_state.rs b/collator/src/internal_queue/state/commited_state.rs index 6aad7ced4..1d065d055 100644 --- a/collator/src/internal_queue/state/commited_state.rs +++ b/collator/src/internal_queue/state/commited_state.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use everscale_types::models::{IntAddr, ShardIdent}; +use everscale_types::models::{BlockId, IntAddr, ShardIdent}; use tycho_block_util::queue::QueuePartitionIdx; use tycho_storage::model::ShardsInternalMessagesKey; use tycho_storage::{InternalQueueSnapshot, Storage}; @@ -80,6 +80,9 @@ pub trait CommittedState: Send + Sync { partition: QueuePartitionIdx, range: &[QueueShardRange], ) -> Result<()>; + + /// Get last applied mc block id + fn get_last_applied_mc_block_id(&self) -> Result>; } // IMPLEMENTATION @@ -158,4 +161,10 @@ impl CommittedState for CommittedStateStdImpl { Ok(()) } + + fn get_last_applied_mc_block_id(&self) -> Result> { + self.storage + .internal_queue_storage() + .get_last_applied_mc_block_id() + } } diff --git a/collator/src/internal_queue/state/uncommitted_state.rs b/collator/src/internal_queue/state/uncommitted_state.rs index 126a0d9ed..c2aa6d553 100644 --- a/collator/src/internal_queue/state/uncommitted_state.rs +++ b/collator/src/internal_queue/state/uncommitted_state.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use anyhow::Result; -use everscale_types::models::{IntAddr, ShardIdent}; +use everscale_types::models::{BlockId, IntAddr, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr}; use tycho_storage::model::{QueueRange, ShardsInternalMessagesKey, StatKey}; use tycho_storage::{InternalQueueSnapshot, InternalQueueTransaction, Storage}; @@ -77,6 +77,7 @@ pub trait LocalUncommittedState { &self, partitions: FastHashSet, ranges: &[QueueShardRange], + mc_block_id: &BlockId, ) -> Result<()>; /// Delete all uncommitted messages and statistics @@ -143,6 +144,7 @@ impl UncommittedState for UncommittedStateStdImpl { &self, partitions: FastHashSet, ranges: &[QueueShardRange], + mc_block_id: &BlockId, ) -> Result<()> { let ranges = partitions.iter().flat_map(|&partition| { ranges.iter().map(move |range| QueueRange { @@ -152,8 +154,13 @@ impl UncommittedState for UncommittedStateStdImpl { to: range.to, }) }); + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + let mut tx = self.storage.internal_queue_storage().begin_transaction(); + + tx.commit_messages(&snapshot, ranges)?; + tx.set_last_applied_mc_block_id(mc_block_id); - self.storage.internal_queue_storage().commit(ranges) + tx.write() } fn truncate(&self) -> Result<()> { diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 4aa3acaae..96b10ceb2 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -473,9 +473,9 @@ where let mut top_blocks: Vec<_> = top_shard_blocks_info .iter() - .map(|(id, updated)| (id.as_short_id(), *updated)) + .map(|(id, updated)| (*id, *updated)) .collect(); - top_blocks.push((block_id.as_short_id(), true)); + top_blocks.push((*block_id, true)); if let Err(err) = self.mq_adapter.commit_diff(top_blocks) { bail!( @@ -1442,7 +1442,7 @@ where // collect top blocks queue diffs already applied to let queue_diffs_applied_to_top_blocks = if let Some(applied_to_mc_block_id) = - self.get_queue_diffs_applied_to_mc_block_id(last_collated_mc_block_id) + self.get_queue_diffs_applied_to_mc_block_id(last_collated_mc_block_id)? { self.get_top_blocks_seqno(&applied_to_mc_block_id).await? } else { @@ -1702,18 +1702,18 @@ where fn get_queue_diffs_applied_to_mc_block_id( &self, last_collated_mc_block_id: Option, - ) -> Option { + ) -> Result> { let last_processed_mc_block_id = *self.last_processed_mc_block_id.lock(); match (last_processed_mc_block_id, last_collated_mc_block_id) { (Some(last_processed), Some(last_collated)) => { if last_processed.seqno > last_collated.seqno { - Some(last_processed) + Ok(Some(last_processed)) } else { - Some(last_collated) + Ok(Some(last_collated)) } } - (Some(mc_block_id), _) | (_, Some(mc_block_id)) => Some(mc_block_id), - _ => None, + (Some(mc_block_id), _) | (_, Some(mc_block_id)) => Ok(Some(mc_block_id)), + _ => self.mq_adapter.get_last_applied_mc_block_id(), } } diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 20a74e232..664e1727f 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -1,6 +1,6 @@ use anyhow::Result; use everscale_types::cell::HashBytes; -use everscale_types::models::{BlockIdShort, ShardIdent}; +use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; use tracing::instrument; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; use tycho_util::metrics::HistogramGuard; @@ -53,7 +53,7 @@ where /// Commit previously applied diff, saving changes to committed state (waiting for the operation to complete). /// Return `None` if specified diff does not exist. - fn commit_diff(&self, mc_top_blocks: Vec<(BlockIdShort, bool)>) -> Result<()>; + fn commit_diff(&self, mc_top_blocks: Vec<(BlockId, bool)>) -> Result<()>; /// Add new messages to the iterator fn add_message_to_iterator( @@ -80,6 +80,8 @@ where fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; /// Check if diff exists in the cache fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool; + /// Get last applied mc block id from committed state + fn get_last_applied_mc_block_id(&self) -> Result>; } impl MessageQueueAdapterStdImpl { @@ -157,7 +159,7 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI Ok(()) } - fn commit_diff(&self, mc_top_blocks: Vec<(BlockIdShort, bool)>) -> Result<()> { + fn commit_diff(&self, mc_top_blocks: Vec<(BlockId, bool)>) -> Result<()> { let time = std::time::Instant::now(); self.queue.commit_diff(&mc_top_blocks)?; @@ -223,4 +225,8 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool { self.queue.is_diff_exists(block_id_short) } + + fn get_last_applied_mc_block_id(&self) -> Result> { + self.queue.get_last_applied_mc_block_id() + } } diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index 608fd7e8a..52f3b6b23 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -222,10 +222,19 @@ async fn test_queue() -> anyhow::Result<()> { let queue: QueueImpl = queue_factory.create(); + let mc_block = BlockId { + shard: ShardIdent::MASTERCHAIN, + seqno: 10, + root_hash: Default::default(), + file_hash: Default::default(), + }; + // create first block with queue diff - let block1 = BlockIdShort { + let block1 = BlockId { shard: ShardIdent::new_full(0), seqno: 0, + root_hash: Default::default(), + file_hash: Default::default(), }; let mut diff = QueueDiffWithMessages::new(); @@ -320,7 +329,7 @@ async fn test_queue() -> anyhow::Result<()> { queue.apply_diff( diff_with_messages, - block1, + block1.as_short_id(), &HashBytes::from([1; 32]), diff_statistics, max_message, @@ -328,9 +337,11 @@ async fn test_queue() -> anyhow::Result<()> { // end block 1 diff // create second block with queue diff - let block2 = BlockIdShort { + let block2 = BlockId { shard: ShardIdent::new_full(0), seqno: 1, + root_hash: Default::default(), + file_hash: Default::default(), }; let mut diff = QueueDiffWithMessages::new(); @@ -425,7 +436,7 @@ async fn test_queue() -> anyhow::Result<()> { queue.apply_diff( diff_with_messages, - block2, + block2.as_short_id(), &HashBytes::from([1; 32]), diff_statistics, max_message, @@ -440,7 +451,7 @@ async fn test_queue() -> anyhow::Result<()> { dest_3_normal_priority, )?; - queue.commit_diff(&[(block1, true)])?; + queue.commit_diff(&[(mc_block, true), (block1, true)])?; test_statistics_check_statistics( &queue, dest_1_low_priority, @@ -466,7 +477,7 @@ async fn test_queue() -> anyhow::Result<()> { ranges.push(queue_range); - let iterators = queue.iterator(1, &ranges, ShardIdent::new_full(-1))?; + let iterators = queue.iterator(1, &ranges, ShardIdent::MASTERCHAIN)?; let mut iterator_manager = StatesIteratorsManager::new(iterators); let mut read_count = 0; @@ -505,7 +516,7 @@ async fn test_queue() -> anyhow::Result<()> { ranges.push(queue_range); - let iterators = queue.iterator(1, &ranges, ShardIdent::new_full(-1))?; + let iterators = queue.iterator(1, &ranges, ShardIdent::MASTERCHAIN)?; let mut iterator_manager = StatesIteratorsManager::new(iterators); let mut read_count = 0; @@ -528,7 +539,7 @@ async fn test_queue() -> anyhow::Result<()> { assert_eq!(read_count, 2000); // test commit all diffs and check statistics - queue.commit_diff(&[(block2, true)])?; + queue.commit_diff(&[(mc_block, true), (block2, true)])?; test_statistics_check_statistics( &queue, dest_1_low_priority, @@ -619,7 +630,7 @@ async fn test_iteration_from_two_shards() -> anyhow::Result<()> { // create second block with queue diff let block2 = BlockIdShort { - shard: ShardIdent::new_full(-1), + shard: ShardIdent::MASTERCHAIN, seqno: 0, }; let mut diff = QueueDiffWithMessages::new(); @@ -678,7 +689,7 @@ async fn test_iteration_from_two_shards() -> anyhow::Result<()> { }; let queue_range2 = QueueShardRange { - shard_ident: ShardIdent::new_full(-1), + shard_ident: ShardIdent::MASTERCHAIN, from: QueueKey { lt: 10000, hash: HashBytes::default(), @@ -705,7 +716,7 @@ async fn test_iteration_from_two_shards() -> anyhow::Result<()> { }; let stat_range2 = QueueShardRange { - shard_ident: ShardIdent::new_full(-1), + shard_ident: ShardIdent::MASTERCHAIN, from: QueueKey { lt: 1, hash: HashBytes::default(), @@ -725,7 +736,7 @@ async fn test_iteration_from_two_shards() -> anyhow::Result<()> { .unwrap_or_default(); assert_eq!(stat, 30000); - let iterators = queue.iterator(1, &ranges, ShardIdent::new_full(-1))?; + let iterators = queue.iterator(1, &ranges, ShardIdent::MASTERCHAIN)?; let mut iterator_manager = StatesIteratorsManager::new(iterators); let mut read_count = 0; @@ -1053,14 +1064,19 @@ async fn test_queue_tail() -> anyhow::Result<()> { let queue: QueueImpl = queue_factory.create(); - let block_mc1 = BlockIdShort { - shard: ShardIdent::new_full(-1), + + let block_mc1 = BlockId { + shard: ShardIdent::MASTERCHAIN, seqno: 0, + root_hash: Default::default(), + file_hash: Default::default(), }; - let block_mc2 = BlockIdShort { - shard: ShardIdent::new_full(-1), + let block_mc2 = BlockId { + shard: ShardIdent::MASTERCHAIN, seqno: 1, + root_hash: Default::default(), + file_hash: Default::default(), }; let mut diff_mc1 = QueueDiffWithMessages::new(); let mut diff_mc2 = QueueDiffWithMessages::new(); @@ -1106,7 +1122,7 @@ async fn test_queue_tail() -> anyhow::Result<()> { // apply two diffs queue.apply_diff( diff_mc1, - block_mc1, + block_mc1.as_short_id(), &HashBytes::from([1; 32]), statistics_mc1, max_message, @@ -1115,31 +1131,114 @@ async fn test_queue_tail() -> anyhow::Result<()> { let max_message = *diff_mc2.messages.keys().last().unwrap(); queue.apply_diff( diff_mc2, - block_mc2, + block_mc2.as_short_id(), &HashBytes::from([2; 32]), statistics_mc2, max_message, )?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); assert_eq!(diff_len_mc, 2); // commit first diff queue.commit_diff(&[(block_mc1, true)])?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); assert_eq!(diff_len_mc, 2); // trim first diff - queue.trim_diffs(&ShardIdent::new_full(-1), &end_key_mc1)?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + queue.trim_diffs(&ShardIdent::MASTERCHAIN, &end_key_mc1)?; + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); assert_eq!(diff_len_mc, 1); // clear uncommitted state with second diff queue.clear_uncommitted_state()?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); assert_eq!(diff_len_mc, 0); Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_version() -> anyhow::Result<()> { + let (storage, _tmp_dir) = Storage::new_temp().await?; + + let queue_factory = QueueFactoryStdImpl { + uncommitted_state_factory: UncommittedStateImplFactory { + storage: storage.clone(), + }, + committed_state_factory: CommittedStateImplFactory { storage }, + config: QueueConfig { + gc_interval: Duration::from_secs(1), + }, + }; + + let queue: QueueImpl = + queue_factory.create(); + + let block_mc1 = BlockId { + shard: ShardIdent::MASTERCHAIN, + seqno: 0, + root_hash: HashBytes::from([11; 32]), + file_hash: HashBytes::from([12; 32]), + }; + + let block_mc2 = BlockId { + shard: ShardIdent::MASTERCHAIN, + seqno: 1, + + root_hash: HashBytes::from([1; 32]), + file_hash: HashBytes::from([2; 32]), + }; + + let mut diff_mc1 = QueueDiffWithMessages::new(); + let mut diff_mc2 = QueueDiffWithMessages::new(); + + let stored_objects = vec![create_stored_object(1, RouterAddr { + workchain: -1, + account: HashBytes::from([1; 32]), + })?]; + + if let Some(stored_object) = stored_objects.first() { + diff_mc1 + .messages + .insert(stored_object.key(), stored_object.clone()); + } + + for stored_object in &stored_objects { + diff_mc2 + .messages + .insert(stored_object.key(), stored_object.clone()); + } + + let statistics_mc1 = (&diff_mc1, block_mc1.shard).into(); + + let max_message = *diff_mc1.messages.keys().last().unwrap(); + + let version = queue.get_last_applied_mc_block_id()?; + assert_eq!(version, None); + + queue.apply_diff( + diff_mc1, + block_mc1.as_short_id(), + &HashBytes::from([1; 32]), + statistics_mc1, + max_message, + )?; + + let version = queue.get_last_applied_mc_block_id()?; + assert_eq!(version, None); + + queue.commit_diff(&[(block_mc1, true)])?; + + let version = queue.get_last_applied_mc_block_id()?; + assert_eq!(version, Some(block_mc1)); + + queue.commit_diff(&[(block_mc2, true)])?; + + let version = queue.get_last_applied_mc_block_id()?; + assert_eq!(version, Some(block_mc2)); + + Ok(()) +} diff --git a/core/src/block_strider/starter/cold_boot.rs b/core/src/block_strider/starter/cold_boot.rs index ecfc9ee56..57ebb734d 100644 --- a/core/src/block_strider/starter/cold_boot.rs +++ b/core/src/block_strider/starter/cold_boot.rs @@ -780,7 +780,7 @@ impl StarterInner { }; internal_queue - .import_from_file(block_id.shard, top_update, file) + .import_from_file(top_update, file, *block_id) .await?; remove_state_file.await; diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index acfc74a30..27eecfd16 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -136,6 +136,7 @@ weedb::tables! { pub shard_internal_messages_uncommitted: tables::ShardInternalMessagesUncommited, pub internal_message_stats: tables::InternalMessageStats, pub internal_message_stats_uncommitted: tables::InternalMessageStatsUncommited, + pub internal_message_var: tables::InternalMessageVar, } } diff --git a/storage/src/db/kv_db/tables.rs b/storage/src/db/kv_db/tables.rs index 64e6f3363..5fe3d5bf4 100644 --- a/storage/src/db/kv_db/tables.rs +++ b/storage/src/db/kv_db/tables.rs @@ -503,6 +503,21 @@ impl ColumnFamilyOptions for InternalMessageStatsUncommited { } } +pub struct InternalMessageVar; +impl ColumnFamily for InternalMessageVar { + const NAME: &'static str = "int_msg_var"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessageVar { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + fn archive_data_merge( _: &[u8], current_value: Option<&[u8]>, diff --git a/storage/src/store/internal_queue/mod.rs b/storage/src/store/internal_queue/mod.rs index 13cee4188..c9621be32 100644 --- a/storage/src/store/internal_queue/mod.rs +++ b/storage/src/store/internal_queue/mod.rs @@ -1,7 +1,7 @@ use std::fs::File; use anyhow::Result; -use everscale_types::models::{IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardIdent}; +use everscale_types::models::{BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions}; use tycho_util::FastHashMap; use weedb::rocksdb::{DBRawIterator, WriteBatch}; @@ -18,6 +18,8 @@ pub mod model; pub struct InternalQueueStorage { db: BaseDb, } +// Constant for the last applied mc block id key +const INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY: &[u8] = b"last_applied_mc_block_id"; impl InternalQueueStorage { pub fn new(db: BaseDb) -> Self { @@ -41,9 +43,9 @@ impl InternalQueueStorage { pub async fn import_from_file( &self, - shard_ident: ShardIdent, top_update: &OutMsgQueueUpdates, file: File, + block_id: BlockId, ) -> Result<()> { use everscale_types::boc::ser::BocHeader; @@ -69,6 +71,7 @@ impl InternalQueueStorage { let messages_cf = this.db.shard_internal_messages.cf(); let stats_cf = this.db.internal_message_stats.cf(); + let var_cf = this.db.internal_message_var.cf(); let mut batch = weedb::rocksdb::WriteBatch::default(); @@ -108,7 +111,7 @@ impl InternalQueueStorage { let key = ShardsInternalMessagesKey { partition, - shard_ident, + shard_ident: block_id.shard, internal_message_key: QueueKey { lt: int_msg_info.created_lt, hash: *msg_hash, @@ -126,10 +129,17 @@ impl InternalQueueStorage { } let queue_diff = part.queue_diff(); + + batch.put_cf( + &var_cf, + INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY, + block_id.to_vec(), + ); + for (partition, statistics) in statistics.drain() { for (dest, count) in statistics.iter() { let key = StatKey { - shard_ident, + shard_ident: block_id.shard, partition, min_message: queue_diff.min_message, max_message: queue_diff.max_message, @@ -149,112 +159,6 @@ impl InternalQueueStorage { .await? } - pub fn commit>(&self, ranges: I) -> Result<()> { - let snapshot = self.db.rocksdb().snapshot(); - - let db = self.db.rocksdb().as_ref(); - let mut batch = WriteBatch::default(); - - let mut commit_range = |source_iter: &mut DBRawIterator<'_>, - from_key: &[u8], - to_key: &[u8], - source_cf: &BoundedCfHandle<'_>, - target_cf: &BoundedCfHandle<'_>| { - source_iter.seek(from_key); - - loop { - let (key, value) = match source_iter.item() { - Some(item) => item, - None => return source_iter.status(), - }; - - if key > to_key { - break; - } - - batch.delete_cf(source_cf, key); - batch.put_cf(target_cf, key, value); - - source_iter.next(); - } - - Ok(()) - }; - - let messages = &self.db.shard_internal_messages; - let messages_cf = &messages.cf(); - - let uncommited_messages = &self.db.shard_internal_messages_uncommitted; - let uncommited_messages_cf = &uncommited_messages.cf(); - - let mut uncommited_messages_iter = { - let mut readopts = uncommited_messages.new_read_config(); - readopts.set_snapshot(&snapshot); - db.raw_iterator_cf_opt(uncommited_messages_cf, readopts) - }; - - let stats = &self.db.internal_message_stats; - let stats_cf = &stats.cf(); - - let uncommited_stats = &self.db.internal_message_stats_uncommitted; - let uncommited_stats_cf = &uncommited_stats.cf(); - - let mut uncommited_stats_iter = { - let mut readopts = uncommited_stats.new_read_config(); - readopts.set_snapshot(&snapshot); - db.raw_iterator_cf_opt(uncommited_stats_cf, readopts) - }; - - for range in ranges { - // Commit messages for one range - let from_message_key = ShardsInternalMessagesKey { - partition: range.partition, - shard_ident: range.shard_ident, - internal_message_key: range.from, - }; - let to_message_key = ShardsInternalMessagesKey { - partition: range.partition, - shard_ident: range.shard_ident, - internal_message_key: range.to, - }; - - commit_range( - &mut uncommited_messages_iter, - &from_message_key.to_vec(), - &to_message_key.to_vec(), - uncommited_messages_cf, - messages_cf, - )?; - - // Commit stats for one range - let from_stat_key = StatKey { - shard_ident: range.shard_ident, - partition: range.partition, - min_message: range.from, - max_message: QueueKey::MIN, - dest: RouterAddr::MIN, - }; - let to_stat_key = StatKey { - shard_ident: range.shard_ident, - partition: range.partition, - min_message: range.to, - max_message: QueueKey::MAX, - dest: RouterAddr::MAX, - }; - - commit_range( - &mut uncommited_stats_iter, - &from_stat_key.to_vec(), - &to_stat_key.to_vec(), - uncommited_stats_cf, - stats_cf, - )?; - } - - // Apply batch - self.db.rocksdb().write(batch).map_err(Into::into) - } - pub fn delete>(&self, ranges: I) -> Result<()> { fn delete_range<'a>( batch: &mut WriteBatch, @@ -368,6 +272,20 @@ impl InternalQueueStorage { db.compact_range_cf(stats_cf, None::<[u8; 0]>, None::<[u8; 0]>); Ok(()) } + + /// Retrieves the queue version from the `internal_message_version` column family under the key `mc_version` + pub fn get_last_applied_mc_block_id(&self) -> Result> { + let cf = self.db.internal_message_var.cf(); + let data = self + .db + .rocksdb() + .get_cf(&cf, INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY)?; + if let Some(bytes) = data { + return Ok(Some(BlockId::from_slice(&bytes))); + } + + Ok(None) + } } pub struct InternalQueueTransaction { @@ -406,6 +324,124 @@ impl InternalQueueTransaction { self.batch.put_cf(&cf, key.to_vec(), self.buffer.as_slice()); } + + pub fn commit_messages>( + &mut self, + snapshot: &InternalQueueSnapshot, + ranges: I, + ) -> Result<()> { + let db = self.db.rocksdb().as_ref(); + + let mut commit_range = |source_iter: &mut DBRawIterator<'_>, + from_key: &[u8], + to_key: &[u8], + source_cf: &BoundedCfHandle<'_>, + target_cf: &BoundedCfHandle<'_>| { + source_iter.seek(from_key); + + loop { + let (key, value) = match source_iter.item() { + Some(item) => item, + None => return source_iter.status(), + }; + + if key > to_key { + break; + } + + self.batch.delete_cf(source_cf, key); + self.batch.put_cf(target_cf, key, value); + + source_iter.next(); + } + + Ok(()) + }; + + let messages = &self.db.shard_internal_messages; + let messages_cf = &messages.cf(); + + let uncommited_messages = &self.db.shard_internal_messages_uncommitted; + let uncommited_messages_cf = &uncommited_messages.cf(); + + let mut uncommited_messages_iter = { + let mut readopts = uncommited_messages.new_read_config(); + readopts.set_snapshot(&snapshot.snapshot); + db.raw_iterator_cf_opt(uncommited_messages_cf, readopts) + }; + + let stats = &self.db.internal_message_stats; + let stats_cf = &stats.cf(); + + let uncommited_stats = &self.db.internal_message_stats_uncommitted; + let uncommited_stats_cf = &uncommited_stats.cf(); + + let mut uncommited_stats_iter = { + let mut readopts = uncommited_stats.new_read_config(); + readopts.set_snapshot(&snapshot.snapshot); + db.raw_iterator_cf_opt(uncommited_stats_cf, readopts) + }; + + for range in ranges { + // Commit messages for one range + let from_message_key = ShardsInternalMessagesKey { + partition: range.partition, + shard_ident: range.shard_ident, + internal_message_key: range.from, + }; + let to_message_key = ShardsInternalMessagesKey { + partition: range.partition, + shard_ident: range.shard_ident, + internal_message_key: range.to, + }; + + commit_range( + &mut uncommited_messages_iter, + &from_message_key.to_vec(), + &to_message_key.to_vec(), + uncommited_messages_cf, + messages_cf, + )?; + + // Commit stats for one range + let from_stat_key = StatKey { + shard_ident: range.shard_ident, + partition: range.partition, + min_message: range.from, + max_message: QueueKey::MIN, + dest: RouterAddr::MIN, + }; + let to_stat_key = StatKey { + shard_ident: range.shard_ident, + partition: range.partition, + min_message: range.to, + max_message: QueueKey::MAX, + dest: RouterAddr::MAX, + }; + + commit_range( + &mut uncommited_stats_iter, + &from_stat_key.to_vec(), + &to_stat_key.to_vec(), + uncommited_stats_cf, + stats_cf, + )?; + } + + Ok(()) + } + + /// Stores the queue version in the `internal_message_version` column family under the key `mc_version` + pub fn set_last_applied_mc_block_id(&mut self, mc_block_id: &BlockId) { + // Retrieve the column family handle for "internal_message_version" + let cf = self.db.internal_message_var.cf(); + // Convert the version into a little-endian byte array and store it + self.batch.put_cf( + &cf, + INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY, + mc_block_id.to_vec(), + ); + } } pub struct InternalQueueSnapshot { From aa50d989d75a47668487a4caba76433d55a3aa6b Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Wed, 12 Feb 2025 22:06:34 +0000 Subject: [PATCH 2/4] fix(collator): remove all collated block ids from mismatched one --- collator/src/manager/blocks_cache.rs | 37 +++++++++++++++++++--------- 1 file changed, 26 insertions(+), 11 deletions(-) diff --git a/collator/src/manager/blocks_cache.rs b/collator/src/manager/blocks_cache.rs index 438360ad7..f8d117a2e 100644 --- a/collator/src/manager/blocks_cache.rs +++ b/collator/src/manager/blocks_cache.rs @@ -366,17 +366,19 @@ impl BlocksCache { block_mismatch = res.block_mismatch; received_and_collated = res.received_and_collated; - // if collated block mismatched remove it from last collated blocks ids + // if collated block mismatched remove it and all next from last collated blocks ids if block_mismatch { masters_guard .data - .remove_last_collated_block_id(&block_id.seqno); + .remove_last_collated_block_ids_from(&block_id.seqno); } last_collated_mc_block_id = masters_guard.data.get_last_collated_block_id().cloned(); applied_mc_queue_range = masters_guard.data.applied_mc_queue_range; } else { + let ref_by_mc_seqno = ctx.ref_by_mc_seqno; + let res = { let mut g = self.shards.entry(block_id.shard).or_default(); if let Some(last_known_synced) = @@ -390,6 +392,14 @@ impl BlocksCache { received_and_collated = res.received_and_collated; block_mismatch = res.block_mismatch; + // if collated block mismatched remove its master and all next from last collated blocks ids + if block_mismatch { + let mut masters_guard = self.masters.lock(); + masters_guard + .data + .remove_last_collated_block_ids_from(&ref_by_mc_seqno); + } + (last_collated_mc_block_id, applied_mc_queue_range) = self.get_last_collated_block_and_applied_mc_queue_range(); }; @@ -492,9 +502,9 @@ impl BlocksCache { } else { let mc_block_entry = occupied_entry.remove(); // clean previous last collated blocks ids - guard.data.remove_prev_from_last_collated_block_ids( - &mc_block_entry.block_id.seqno, - ); + guard + .data + .remove_last_collated_block_ids_before(&mc_block_entry.block_id.seqno); // update range of received blocks guard.data.move_range_start(mc_block_entry.block_id.seqno); extracted_mc_block_entry = Some(mc_block_entry); @@ -535,7 +545,7 @@ impl BlocksCache { // clean previous last collated blocks ids guard .data - .remove_prev_from_last_collated_block_ids(&block_id.seqno); + .remove_last_collated_block_ids_before(&block_id.seqno); occupied_entry }; @@ -648,7 +658,9 @@ impl BlocksCache { }); // remove from last collated blocks ids for removed_seqno in removed_seqno_list { - guard.data.remove_last_collated_block_id(&removed_seqno); + guard + .data + .remove_last_collated_block_ids_from(&removed_seqno); } } } @@ -705,7 +717,9 @@ impl BlocksCache { }); // remove from last collated blocks ids for removed_seqno in removed_seqno_list { - guard.data.remove_last_collated_block_id(&removed_seqno); + guard + .data + .remove_last_collated_block_ids_from(&removed_seqno); } } else if let Some(mut shard_cache) = self.shards.get_mut(&block_key.shard) { shard_cache.blocks.retain(|key, value| { @@ -916,11 +930,12 @@ impl MasterBlocksCacheData { .insert(block_id.seqno, *block_id); } - fn remove_last_collated_block_id(&mut self, block_seqno: &BlockSeqno) -> Option { - self.last_collated_mc_block_ids.remove(block_seqno) + fn remove_last_collated_block_ids_from(&mut self, from_block_seqno: &BlockSeqno) { + self.last_collated_mc_block_ids + .retain(|seqno, _| seqno <= from_block_seqno); } - fn remove_prev_from_last_collated_block_ids(&mut self, before_block_seqno: &BlockSeqno) { + fn remove_last_collated_block_ids_before(&mut self, before_block_seqno: &BlockSeqno) { self.last_collated_mc_block_ids .retain(|seqno, _| seqno >= before_block_seqno); } From 1bc4ef4069c6a50f8cc65506d74d89157bc31dad Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Thu, 20 Feb 2025 13:37:26 +0700 Subject: [PATCH 3/4] feature(collator): change diffs tail calculation --- collator/src/collator/do_collate/mod.rs | 9 +- collator/src/internal_queue/queue.rs | 56 ++++----- .../internal_queue/state/commited_state.rs | 13 ++- .../internal_queue/state/uncommitted_state.rs | 38 +++++- collator/src/manager/blocks_cache.rs | 2 +- collator/src/manager/mod.rs | 5 - collator/src/queue_adapter.rs | 24 +--- collator/tests/internal_queue.rs | 22 ++-- storage/src/db/kv_db/mod.rs | 2 + storage/src/db/kv_db/tables.rs | 30 +++++ storage/src/store/internal_queue/mod.rs | 109 +++++++++++++++++- storage/src/store/internal_queue/model.rs | 42 +++++++ 12 files changed, 276 insertions(+), 76 deletions(-) diff --git a/collator/src/collator/do_collate/mod.rs b/collator/src/collator/do_collate/mod.rs index 624fda9c3..a6bbfb3c1 100644 --- a/collator/src/collator/do_collate/mod.rs +++ b/collator/src/collator/do_collate/mod.rs @@ -346,11 +346,10 @@ impl CollatorStdImpl { &mc_data.shards_processed_to, ); - // trim outdated diffs and calc queue diffs tail lenght - if let Some(value) = min_processed_to { - mq_adapter.trim_diffs(&shard_id, &value)?; - }; - let diff_tail_len = mq_adapter.get_diffs_count_by_shard(&shard_id) as u32 + 1; + let diff_tail_len = mq_adapter.get_diffs_tail_len( + &shard_id, + &min_processed_to.unwrap_or_default().next_value(), + ) + 1; let span = tracing::Span::current(); let (finalize_phase_result, update_queue_task_result) = rayon::join( diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index 4966ddcb5..7195ebbc1 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -94,10 +94,8 @@ where fn commit_diff(&self, mc_top_blocks: &[(BlockId, bool)]) -> Result<()>; /// remove all data in uncommitted state storage fn clear_uncommitted_state(&self) -> Result<()>; - /// Returns the number of diffs in cache for the given shard - fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; - /// Removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard` - fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>; + /// Returns the diffs tail len for the given shard + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32; /// Load statistics for the given range by accounts fn load_statistics( &self, @@ -290,10 +288,11 @@ where // Add messages to uncommitted_state if there are any if !diff.messages.is_empty() { self.uncommitted_state.add_messages_with_statistics( - block_id_short.shard, + &block_id_short, &diff.partition_router, &diff.messages, &statistics, + &max_message, )?; } @@ -423,33 +422,6 @@ where Ok(()) } - fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize { - let uncommitted_count = self - .uncommitted_diffs - .get(shard_ident) - .map_or(0, |diffs| diffs.len()); - let committed_count = self - .committed_diffs - .get(shard_ident) - .map_or(0, |diffs| diffs.len()); - - uncommitted_count + committed_count - } - - fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> { - if let Some(mut shard_diffs) = self.uncommitted_diffs.get_mut(source_shard) { - shard_diffs - .value_mut() - .retain(|_, diff| diff.max_message() > inclusive_until); - } - if let Some(mut shard_diffs) = self.committed_diffs.get_mut(source_shard) { - shard_diffs - .value_mut() - .retain(|_, diff| diff.max_message() > inclusive_until); - } - Ok(()) - } - fn load_statistics( &self, partition: QueuePartitionIdx, @@ -511,6 +483,26 @@ where None } + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32 { + let uncommitted_tail_len = self + .uncommitted_state + .get_diffs_tail_len(shard_ident, max_message_from); + + let committed_tail_len = self + .committed_state + .get_diffs_tail_len(shard_ident, max_message_from); + + tracing::info!( + target: tracing_targets::MQ, + shard_ident = ?shard_ident, + uncommitted_tail_len, + committed_tail_len, + "Get diffs tail len", + ); + + uncommitted_tail_len + committed_tail_len + } + fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool { self.uncommitted_diffs .get(&block_id_short.shard) diff --git a/collator/src/internal_queue/state/commited_state.rs b/collator/src/internal_queue/state/commited_state.rs index 1d065d055..b4552f435 100644 --- a/collator/src/internal_queue/state/commited_state.rs +++ b/collator/src/internal_queue/state/commited_state.rs @@ -1,7 +1,7 @@ use anyhow::Result; use everscale_types::models::{BlockId, IntAddr, ShardIdent}; -use tycho_block_util::queue::QueuePartitionIdx; -use tycho_storage::model::ShardsInternalMessagesKey; +use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; +use tycho_storage::model::{DiffTailKey, ShardsInternalMessagesKey}; use tycho_storage::{InternalQueueSnapshot, Storage}; use tycho_util::metrics::HistogramGuard; use tycho_util::FastHashMap; @@ -83,6 +83,7 @@ pub trait CommittedState: Send + Sync { /// Get last applied mc block id fn get_last_applied_mc_block_id(&self) -> Result>; + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32; } // IMPLEMENTATION @@ -167,4 +168,12 @@ impl CommittedState for CommittedStateStdImpl { .internal_queue_storage() .get_last_applied_mc_block_id() } + + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 { + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + snapshot.calc_diffs_tail_committed(&DiffTailKey { + shard_ident: *shard_ident, + max_message: *from, + }) + } } diff --git a/collator/src/internal_queue/state/uncommitted_state.rs b/collator/src/internal_queue/state/uncommitted_state.rs index c2aa6d553..7c2a0def2 100644 --- a/collator/src/internal_queue/state/uncommitted_state.rs +++ b/collator/src/internal_queue/state/uncommitted_state.rs @@ -2,9 +2,9 @@ use std::collections::BTreeMap; use std::sync::Arc; use anyhow::Result; -use everscale_types::models::{BlockId, IntAddr, ShardIdent}; +use everscale_types::models::{BlockId, BlockIdShort, IntAddr, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr}; -use tycho_storage::model::{QueueRange, ShardsInternalMessagesKey, StatKey}; +use tycho_storage::model::{DiffTailKey, QueueRange, ShardsInternalMessagesKey, StatKey}; use tycho_storage::{InternalQueueSnapshot, InternalQueueTransaction, Storage}; use tycho_util::metrics::HistogramGuard; use tycho_util::{FastHashMap, FastHashSet}; @@ -85,10 +85,11 @@ pub trait LocalUncommittedState { fn add_messages_with_statistics( &self, - source: ShardIdent, + block_id_short: &BlockIdShort, partition_router: &PartitionRouter, messages: &BTreeMap>, statistics: &DiffStatistics, + max_message: &QueueKey, ) -> Result<()>; /// Load statistics for given partition and ranges @@ -99,6 +100,9 @@ pub trait LocalUncommittedState { partition: QueuePartitionIdx, ranges: &[QueueShardRange], ) -> Result<()>; + + /// Get diffs tail length + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32; } // IMPLEMENTATION @@ -169,15 +173,17 @@ impl UncommittedState for UncommittedStateStdImpl { fn add_messages_with_statistics( &self, - source: ShardIdent, + block_id_short: &BlockIdShort, partition_router: &PartitionRouter, messages: &BTreeMap>, statistics: &DiffStatistics, + max_message: &QueueKey, ) -> Result<()> { let mut tx = self.storage.internal_queue_storage().begin_transaction(); - Self::add_messages(&mut tx, source, partition_router, messages)?; + Self::add_messages(&mut tx, block_id_short.shard, partition_router, messages)?; Self::add_statistics(&mut tx, statistics)?; + Self::add_diff_tail(&mut tx, block_id_short, max_message); let _histogram = HistogramGuard::begin("tycho_internal_queue_add_messages_with_statistics_write_time"); @@ -207,6 +213,14 @@ impl UncommittedState for UncommittedStateStdImpl { Ok(()) } + + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32 { + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + snapshot.calc_diffs_tail_uncommitted(&DiffTailKey { + shard_ident: *shard_ident, + max_message: *from, + }) + } } impl UncommittedStateStdImpl { @@ -276,4 +290,18 @@ impl UncommittedStateStdImpl { Ok(()) } + + fn add_diff_tail( + internal_queue_tx: &mut InternalQueueTransaction, + block_id_short: &BlockIdShort, + max_message: &QueueKey, + ) { + internal_queue_tx.insert_diff_tail_uncommitted( + &DiffTailKey { + shard_ident: block_id_short.shard, + max_message: *max_message, + }, + block_id_short.seqno.to_le_bytes().as_slice(), + ); + } } diff --git a/collator/src/manager/blocks_cache.rs b/collator/src/manager/blocks_cache.rs index f8d117a2e..c87436c6b 100644 --- a/collator/src/manager/blocks_cache.rs +++ b/collator/src/manager/blocks_cache.rs @@ -932,7 +932,7 @@ impl MasterBlocksCacheData { fn remove_last_collated_block_ids_from(&mut self, from_block_seqno: &BlockSeqno) { self.last_collated_mc_block_ids - .retain(|seqno, _| seqno <= from_block_seqno); + .retain(|seqno, _| seqno < from_block_seqno); } fn remove_last_collated_block_ids_before(&mut self, before_block_seqno: &BlockSeqno) { diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 96b10ceb2..ce54c75f2 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -1543,11 +1543,6 @@ where )?; } - // trim diffs tails for all shards - for (shard_id, min_processed_to) in &min_processed_to_by_shards { - self.mq_adapter.trim_diffs(shard_id, min_processed_to)?; - } - // sync all applied blocks // and refresh collation session by the last one // with re-init of collators state diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 664e1727f..968cd3934 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -70,18 +70,16 @@ where ) -> Result<()>; fn clear_uncommitted_state(&self) -> Result<()>; - /// Removes all diffs from the cache that are less than `inclusive_until` which source shard is `source_shard` - fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()>; /// Get diffs for the given blocks from committed and uncommitted state fn get_diffs(&self, blocks: FastHashMap) -> Vec<(ShardIdent, ShortQueueDiff)>; /// Get diff for the given block from committed and uncommitted state fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option; - /// Returns the number of diffs in cache for the given shard - fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; /// Check if diff exists in the cache fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool; /// Get last applied mc block id from committed state fn get_last_applied_mc_block_id(&self) -> Result>; + /// Get diffs tail len from uncommitted state and committed state + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32; } impl MessageQueueAdapterStdImpl { @@ -200,16 +198,6 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI self.queue.clear_uncommitted_state() } - fn trim_diffs(&self, source_shard: &ShardIdent, inclusive_until: &QueueKey) -> Result<()> { - tracing::info!( - target: tracing_targets::MQ_ADAPTER, - source_shard = ?source_shard, - inclusive_until = ?inclusive_until, - "Trimming diffs" - ); - self.queue.trim_diffs(source_shard, inclusive_until) - } - fn get_diffs(&self, blocks: FastHashMap) -> Vec<(ShardIdent, ShortQueueDiff)> { self.queue.get_diffs(blocks) } @@ -218,10 +206,6 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI self.queue.get_diff(shard_ident, seqno) } - fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize { - self.queue.get_diffs_count_by_shard(shard_ident) - } - fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool { self.queue.is_diff_exists(block_id_short) } @@ -229,4 +213,8 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI fn get_last_applied_mc_block_id(&self) -> Result> { self.queue.get_last_applied_mc_block_id() } + + fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32 { + self.queue.get_diffs_tail_len(shard_ident, max_message_from) + } } diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index 52f3b6b23..a4cdca648 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -1137,24 +1137,32 @@ async fn test_queue_tail() -> anyhow::Result<()> { max_message, )?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN); + // length 2 in uncommitted state assert_eq!(diff_len_mc, 2); // commit first diff queue.commit_diff(&[(block_mc1, true)])?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); - + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN); + // one diff moved to committed state. one diff left in uncommitted state + // uncommitted: 1; committed: 1 assert_eq!(diff_len_mc, 2); - // trim first diff - queue.trim_diffs(&ShardIdent::MASTERCHAIN, &end_key_mc1)?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); + // exclude committed diff by range + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &end_key_mc1.next_value()); + // uncommitted: 1; committed: 0 (1) assert_eq!(diff_len_mc, 1); // clear uncommitted state with second diff queue.clear_uncommitted_state()?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN); + // uncommitted: 0; committed: 1 + assert_eq!(diff_len_mc, 1); + + // exclude committed diff by range + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &end_key_mc1.next_value()); + // uncommitted: 0; committed: 0 (1) assert_eq!(diff_len_mc, 0); Ok(()) diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 27eecfd16..4ad369866 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -137,6 +137,8 @@ weedb::tables! { pub internal_message_stats: tables::InternalMessageStats, pub internal_message_stats_uncommitted: tables::InternalMessageStatsUncommited, pub internal_message_var: tables::InternalMessageVar, + pub internal_message_diffs_tail: tables::InternalMessageDiffsTail, + pub internal_message_diffs_tail_uncommitted: tables::InternalMessageDiffsTailUncommitted, } } diff --git a/storage/src/db/kv_db/tables.rs b/storage/src/db/kv_db/tables.rs index 5fe3d5bf4..b6683293d 100644 --- a/storage/src/db/kv_db/tables.rs +++ b/storage/src/db/kv_db/tables.rs @@ -518,6 +518,36 @@ impl ColumnFamilyOptions for InternalMessageVar { } } +pub struct InternalMessageDiffsTailUncommitted; +impl ColumnFamily for InternalMessageDiffsTailUncommitted { + const NAME: &'static str = "int_msg_diffs_tail_uncommitted"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessageDiffsTailUncommitted { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + +pub struct InternalMessageDiffsTail; +impl ColumnFamily for InternalMessageDiffsTail { + const NAME: &'static str = "int_msg_diffs_tail"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessageDiffsTail { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + fn archive_data_merge( _: &[u8], current_value: Option<&[u8]>, diff --git a/storage/src/store/internal_queue/mod.rs b/storage/src/store/internal_queue/mod.rs index c9621be32..a57b8e12a 100644 --- a/storage/src/store/internal_queue/mod.rs +++ b/storage/src/store/internal_queue/mod.rs @@ -8,7 +8,7 @@ use weedb::rocksdb::{DBRawIterator, WriteBatch}; use weedb::{BoundedCfHandle, ColumnFamily, OwnedRawIterator, OwnedSnapshot, Table}; use crate::db::*; -use crate::model::{QueueRange, ShardsInternalMessagesKey, StatKey}; +use crate::model::{DiffTailKey, QueueRange, ShardsInternalMessagesKey, StatKey}; use crate::util::StoredValue; use crate::QueueStateReader; @@ -72,6 +72,7 @@ impl InternalQueueStorage { let messages_cf = this.db.shard_internal_messages.cf(); let stats_cf = this.db.internal_message_stats.cf(); let var_cf = this.db.internal_message_var.cf(); + let diffs_tail_cf = this.db.internal_message_diffs_tail.cf(); let mut batch = weedb::rocksdb::WriteBatch::default(); @@ -130,6 +131,19 @@ impl InternalQueueStorage { let queue_diff = part.queue_diff(); + // insert diff tail + let diff_tail_key = DiffTailKey { + shard_ident: block_id.shard, + max_message: queue_diff.max_message, + }; + + batch.put_cf( + &diffs_tail_cf, + diff_tail_key.to_vec(), + block_id.seqno.to_le_bytes().as_slice(), + ); + + // insert last applied diff batch.put_cf( &var_cf, INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY, @@ -182,9 +196,11 @@ impl InternalQueueStorage { let mut msgs_to_compact = Vec::new(); let mut stats_to_compact = Vec::new(); + let mut diffs_tail_to_compact = Vec::new(); let messages_cf = &self.db.shard_internal_messages.cf(); let stats_cf = &self.db.internal_message_stats.cf(); + let diffs_tail_cf = &self.db.internal_message_diffs_tail.cf(); for range in ranges { // Delete messages in one range @@ -228,6 +244,26 @@ impl InternalQueueStorage { &bump, &mut stats_to_compact, ); + + // Delete diffs tail in one range + let start_diff_tail_key = DiffTailKey { + shard_ident: range.shard_ident, + max_message: range.from, + }; + + let end_diff_tail_key = DiffTailKey { + shard_ident: range.shard_ident, + max_message: range.to, + }; + + delete_range( + &mut batch, + diffs_tail_cf, + &start_diff_tail_key.to_vec(), + &end_diff_tail_key.to_vec(), + &bump, + &mut diffs_tail_to_compact, + ); } let db = self.db.rocksdb().as_ref(); @@ -239,6 +275,9 @@ impl InternalQueueStorage { for (start_key, end_key) in stats_to_compact { db.compact_range_cf(stats_cf, Some(start_key), Some(end_key)); } + for (start_key, end_key) in diffs_tail_to_compact { + db.compact_range_cf(diffs_tail_cf, Some(start_key), Some(end_key)); + } Ok(()) } @@ -265,11 +304,19 @@ impl InternalQueueStorage { &[0xff; StatKey::SIZE_HINT], ); + let diffs_tail_cf = &self.db.internal_message_diffs_tail_uncommitted.cf(); + clear_table( + diffs_tail_cf, + &[0x00; StatKey::SIZE_HINT], + &[0xff; StatKey::SIZE_HINT], + ); + let db = self.db.rocksdb().as_ref(); db.write(batch)?; db.compact_range_cf(messages_cf, None::<[u8; 0]>, None::<[u8; 0]>); db.compact_range_cf(stats_cf, None::<[u8; 0]>, None::<[u8; 0]>); + db.compact_range_cf(diffs_tail_cf, None::<[u8; 0]>, None::<[u8; 0]>); Ok(()) } @@ -307,6 +354,11 @@ impl InternalQueueTransaction { self.batch.put_cf(&cf, key.to_vec(), count.to_le_bytes()); } + pub fn insert_diff_tail_uncommitted(&mut self, key: &DiffTailKey, value: &[u8]) { + let cf = self.db.internal_message_diffs_tail_uncommitted.cf(); + self.batch.put_cf(&cf, key.to_vec(), value); + } + pub fn insert_message_uncommitted( &mut self, key: &ShardsInternalMessagesKey, @@ -376,12 +428,21 @@ impl InternalQueueTransaction { let uncommited_stats = &self.db.internal_message_stats_uncommitted; let uncommited_stats_cf = &uncommited_stats.cf(); + let diff_tail_committed_cf = &self.db.internal_message_diffs_tail.cf(); + let diff_tail_uncommitted_cf = &self.db.internal_message_diffs_tail_uncommitted.cf(); + let mut uncommited_stats_iter = { let mut readopts = uncommited_stats.new_read_config(); readopts.set_snapshot(&snapshot.snapshot); db.raw_iterator_cf_opt(uncommited_stats_cf, readopts) }; + let mut uncommited_diff_tail_iter = { + let mut readopts = uncommited_stats.new_read_config(); + readopts.set_snapshot(&snapshot.snapshot); + db.raw_iterator_cf_opt(diff_tail_uncommitted_cf, readopts) + }; + for range in ranges { // Commit messages for one range let from_message_key = ShardsInternalMessagesKey { @@ -426,6 +487,25 @@ impl InternalQueueTransaction { uncommited_stats_cf, stats_cf, )?; + + // Collect diffs tails range + let from_diff_tail_key = DiffTailKey { + shard_ident: range.shard_ident, + max_message: range.from, + }; + + let to_diff_tail_key = DiffTailKey { + shard_ident: range.shard_ident, + max_message: range.to, + }; + + commit_range( + &mut uncommited_diff_tail_iter, + &from_diff_tail_key.to_vec(), + &to_diff_tail_key.to_vec(), + diff_tail_uncommitted_cf, + diff_tail_committed_cf, + )?; } Ok(()) @@ -466,6 +546,14 @@ impl InternalQueueSnapshot { self.iter_messages(&self.db.shard_internal_messages_uncommitted, from, to) } + pub fn calc_diffs_tail_committed(&self, from: &DiffTailKey) -> u32 { + self.calc_diffs_tail(&self.db.internal_message_diffs_tail, from) + } + + pub fn calc_diffs_tail_uncommitted(&self, from: &DiffTailKey) -> u32 { + self.calc_diffs_tail(&self.db.internal_message_diffs_tail_uncommitted, from) + } + fn iter_messages( &self, table: &Table, @@ -488,6 +576,25 @@ impl InternalQueueSnapshot { } } + fn calc_diffs_tail(&self, table: &Table, from: &DiffTailKey) -> u32 { + let mut read_config = table.new_read_config(); + read_config.set_snapshot(&self.snapshot); + + let cf = table.cf(); + let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config); + + let from_key = from.to_vec(); + iter.seek(&from_key); + + let mut count = 0; + while let Some((_, _)) = iter.item() { + count += 1; + iter.next(); + } + + count + } + pub fn collect_committed_stats_in_range( &self, shard_ident: ShardIdent, diff --git a/storage/src/store/internal_queue/model.rs b/storage/src/store/internal_queue/model.rs index 215702fcd..76f3c9438 100644 --- a/storage/src/store/internal_queue/model.rs +++ b/storage/src/store/internal_queue/model.rs @@ -144,6 +144,21 @@ impl StatKey { } } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct DiffTailKey { + pub shard_ident: ShardIdent, + pub max_message: QueueKey, +} + +impl DiffTailKey { + pub fn new(shard_ident: ShardIdent, max_message: QueueKey) -> Self { + Self { + shard_ident, + max_message, + } + } +} + impl StoredValue for StatKey { const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueuePartitionIdx::SIZE_HINT @@ -181,6 +196,33 @@ impl StoredValue for StatKey { } } +impl StoredValue for DiffTailKey { + const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueueKey::SIZE_HINT; + type OnStackSlice = [u8; Self::SIZE_HINT]; + + fn serialize(&self, buffer: &mut T) { + self.shard_ident.serialize(buffer); + self.max_message.serialize(buffer); + } + + fn deserialize(reader: &mut &[u8]) -> Self + where + Self: Sized, + { + if reader.len() < Self::SIZE_HINT { + panic!("Insufficient data for deserialization"); + } + + let shard_ident = ShardIdent::deserialize(reader); + let max_message = QueueKey::deserialize(reader); + + Self { + shard_ident, + max_message, + } + } +} + pub struct QueueRange { pub shard_ident: ShardIdent, pub partition: QueuePartitionIdx, From 210c7332b8fcc718e555645cd666ceb55c5e03ac Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Thu, 20 Feb 2025 16:35:52 +0700 Subject: [PATCH 4/4] feature(collator): persistent cache for diffs --- 11.log | 0 block-util/src/queue/proto.rs | 2 +- collator/src/collator/do_collate/finalize.rs | 43 +- collator/src/collator/do_collate/mod.rs | 93 +++- collator/src/collator/do_collate/phase.rs | 3 + .../messages_reader/internals_reader.rs | 22 +- collator/src/collator/messages_reader/mod.rs | 20 +- collator/src/collator/mod.rs | 3 +- collator/src/internal_queue/queue.rs | 86 ++-- .../internal_queue/state/commited_state.rs | 24 +- .../internal_queue/state/uncommitted_state.rs | 60 ++- collator/src/internal_queue/types.rs | 73 +++ collator/src/manager/mod.rs | 2 +- collator/src/queue_adapter.rs | 18 +- collator/tests/internal_queue.rs | 48 +- storage/src/db/kv_db/mod.rs | 2 + storage/src/db/kv_db/tables.rs | 30 ++ storage/src/store/internal_queue/mod.rs | 451 +++++++++++++++--- storage/src/store/internal_queue/model.rs | 120 +++++ 19 files changed, 905 insertions(+), 195 deletions(-) create mode 100644 11.log diff --git a/11.log b/11.log new file mode 100644 index 000000000..e69de29bb diff --git a/block-util/src/queue/proto.rs b/block-util/src/queue/proto.rs index ede7a86e6..8e8812f5c 100644 --- a/block-util/src/queue/proto.rs +++ b/block-util/src/queue/proto.rs @@ -165,7 +165,7 @@ pub struct QueueKey { } impl QueueKey { - const SIZE_HINT: usize = 8 + 32; + pub const SIZE_HINT: usize = 8 + 32; pub const MIN: Self = Self { lt: 0, diff --git a/collator/src/collator/do_collate/finalize.rs b/collator/src/collator/do_collate/finalize.rs index 9bc59fb92..17f676fb6 100644 --- a/collator/src/collator/do_collate/finalize.rs +++ b/collator/src/collator/do_collate/finalize.rs @@ -26,11 +26,16 @@ use crate::collator::types::{ BlockCollationData, ExecuteResult, FinalizeBlockResult, FinalizeMessagesReaderResult, PreparedInMsg, PreparedOutMsg, }; -use crate::internal_queue::types::EnqueuedMessage; +use crate::internal_queue::types::{ + EnqueuedMessage +}; use crate::queue_adapter::MessageQueueAdapter; use crate::tracing_targets; use crate::types::processed_upto::{ProcessedUptoInfoExtension, ProcessedUptoInfoStuff}; -use crate::types::{BlockCandidate, CollationSessionInfo, CollatorConfig, McData, ShardHashesExt}; +use crate::types::{ + BlockCandidate, CollationSessionInfo, CollatorConfig, McData, + ShardHashesExt, +}; use crate::utils::block::detect_top_processed_to_anchor; pub struct FinalizeState { @@ -69,36 +74,6 @@ impl Phase { .cloned() .unwrap_or_default(); - // getting top shard blocks - let top_shard_blocks = if self.state.collation_data.block_id_short.is_masterchain() { - self.state - .collation_data - .top_shard_blocks - .iter() - .map(|b| (b.block_id.shard, b.block_id.seqno)) - .collect() - } else { - let mut top_blocks: FastHashMap = self - .state - .mc_data - .shards - .iter() - .filter(|(shard, descr)| { - descr.top_sc_block_updated && shard != &self.state.shard_id - }) - .map(|(shard_ident, descr)| (*shard_ident, descr.seqno)) - .collect(); - - top_blocks.insert( - self.state.mc_data.block_id.shard, - self.state.mc_data.block_id.seqno, - ); - - top_blocks - }; - - let diffs = mq_adapter.get_diffs(top_shard_blocks); - // get queue diff and check for pending internals let create_queue_diff_elapsed; let FinalizedMessagesReader { @@ -111,8 +86,8 @@ impl Phase { "tycho_do_collate_create_queue_diff_time", &labels, ); - let finalize_message_reader_res = - messages_reader.finalize(self.extra.executor.min_next_lt(), diffs)?; + let finalize_message_reader_res = messages_reader + .finalize(self.extra.executor.min_next_lt(), &self.state.diffs_info)?; create_queue_diff_elapsed = histogram_create_queue_diff.finish(); finalize_message_reader_res }; diff --git a/collator/src/collator/do_collate/mod.rs b/collator/src/collator/do_collate/mod.rs index a6bbfb3c1..064f6522b 100644 --- a/collator/src/collator/do_collate/mod.rs +++ b/collator/src/collator/do_collate/mod.rs @@ -9,7 +9,7 @@ use humantime::format_duration; use phase::{ActualState, Phase}; use prepare::PrepareState; use tycho_block_util::config::{apply_price_factor, compute_gas_price_factor}; -use tycho_block_util::queue::QueueKey; +use tycho_block_util::queue::{QueueDiffStuff, QueueKey}; use tycho_block_util::state::MinRefMcStateTracker; use tycho_storage::{NewBlockMeta, StoreStateHint}; use tycho_util::futures::JoinTask; @@ -26,13 +26,16 @@ use super::types::{ use super::{CollatorStdImpl, ForceMasterCollation}; use crate::collator::do_collate::finalize::FinalizeBlockContext; use crate::collator::types::RandSeed; -use crate::internal_queue::types::EnqueuedMessage; +use crate::internal_queue::types::{ + DiffStatistics, EnqueuedMessage, PartitionRouter, QueueShardRange, +}; use crate::queue_adapter::MessageQueueAdapter; use crate::tracing_targets; use crate::types::{ BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig, - DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, ProcessedTo, ShardDescriptionShort, - TopBlockDescription, TopShardBlockInfo, + DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, ProcessedTo, + ShardDescriptionExt as OtherShardDescriptionExt, ShardDescriptionShort, TopBlockDescription, + TopShardBlockInfo, }; #[cfg(test)] @@ -110,12 +113,82 @@ impl CollatorStdImpl { )?; let anchors_cache = std::mem::take(&mut self.anchors_cache); + + let mq_adapter = self.mq_adapter.clone(); + let state_node_adapter = self.state_node_adapter.clone(); + + // getting top shard blocks + let top_shard_blocks = if collation_data.block_id_short.is_masterchain() { + collation_data + .top_shard_blocks + .iter() + .map(|b| b.block_id) + .collect() + } else { + let mut top_blocks: Vec = mc_data + .shards + .iter() + .filter(|(shard, descr)| descr.top_sc_block_updated && shard != &self.shard_id) + .map(|(shard_ident, descr)| descr.get_block_id(*shard_ident)) + .collect(); + + top_blocks.push(mc_data.block_id); + + top_blocks + }; + + let mut diffs_info = FastHashMap::default(); + + for top_shard_block in top_shard_blocks.iter() { + if top_shard_block.seqno == 0 { + continue; + } + let diff: QueueDiffStuff = state_node_adapter + .load_diff(&top_shard_block) + .await? + .ok_or_else(|| anyhow!("Top shard block diff not found. block{top_shard_block:?}"))?; + let partition_router = PartitionRouter::with_partitions( + &diff.diff().router_partitions_src, + &diff.diff().router_partitions_dst, + ); + + // TODO use dynamic shards + let range_mc = QueueShardRange { + shard_ident: ShardIdent::MASTERCHAIN, + from: diff.diff().min_message, + to: diff.diff().max_message, + }; + + let range_shard = QueueShardRange { + shard_ident: ShardIdent::new_full(0), + from: diff.diff().min_message, + to: diff.diff().max_message, + }; + + let ranges = vec![range_mc, range_shard]; + let queue_statistic = mq_adapter.get_statistics(0, &ranges)?; + + let mut diff_statistics = FastHashMap::default(); + diff_statistics.insert(0, queue_statistic.statistics().clone()); + + let diff_statistic = DiffStatistics::new( + top_shard_block.shard, + diff.diff().min_message, + diff.diff().max_message, + diff_statistics, + queue_statistic.shard_messages_count(), + ); + + diffs_info.insert(top_shard_block.shard, (partition_router, diff_statistic)); + } + let state = Box::new(ActualState { collation_config, collation_data, mc_data, prev_shard_data, shard_id: self.shard_id, + diffs_info, }); let CollationResult { @@ -127,7 +200,7 @@ impl CollatorStdImpl { } = tycho_util::sync::rayon_run_fifo({ let collation_session = self.collation_session.clone(); let config = self.config.clone(); - let mq_adapter = self.mq_adapter.clone(); + let span = tracing::Span::current(); move || { let _span = span.enter(); @@ -238,6 +311,7 @@ impl CollatorStdImpl { let labels = [("workchain", shard_id.workchain().to_string())]; let mc_data = state.mc_data.clone(); + let block_id = state.collation_data.block_id_short.clone(); // prepare execution let histogram_prepare = HistogramGuard::begin_with_labels("tycho_do_collate_prepare_time", &labels); @@ -351,6 +425,15 @@ impl CollatorStdImpl { &min_processed_to.unwrap_or_default().next_value(), ) + 1; + tracing::info!(target: "local_debug", + "block tail len {:?} {} {:?}", + block_id, + diff_tail_len, + min_processed_to + ); + + // let diff_tail_len = 1; + let span = tracing::Span::current(); let (finalize_phase_result, update_queue_task_result) = rayon::join( || { diff --git a/collator/src/collator/do_collate/phase.rs b/collator/src/collator/do_collate/phase.rs index 1e4d5dcff..dfe7fc697 100644 --- a/collator/src/collator/do_collate/phase.rs +++ b/collator/src/collator/do_collate/phase.rs @@ -1,8 +1,10 @@ use std::sync::Arc; use everscale_types::models::{CollationConfig, ShardIdent}; +use tycho_util::FastHashMap; use super::{BlockCollationData, PrevData}; +use crate::internal_queue::types::{DiffStatistics, PartitionRouter}; use crate::types::McData; pub struct Phase { @@ -18,4 +20,5 @@ pub struct ActualState { pub mc_data: Arc, pub prev_shard_data: PrevData, pub shard_id: ShardIdent, + pub diffs_info: FastHashMap, } diff --git a/collator/src/collator/messages_reader/internals_reader.rs b/collator/src/collator/messages_reader/internals_reader.rs index 135d90d56..5aec3eb90 100644 --- a/collator/src/collator/messages_reader/internals_reader.rs +++ b/collator/src/collator/messages_reader/internals_reader.rs @@ -416,7 +416,7 @@ impl InternalsPartitionReader { while current_block_seqno < self.block_seqno { let diff = self .mq_adapter - .get_diff(self.for_shard_id, current_block_seqno) + .get_diff(&self.for_shard_id, current_block_seqno)? .ok_or_else(|| { anyhow!( "cannot get diff for block {}:{}", @@ -425,9 +425,7 @@ impl InternalsPartitionReader { ) })?; - messages_count += diff - .statistics() - .get_messages_count_by_shard(&self.for_shard_id); + messages_count += diff.get_messages_count_by_shard(&self.for_shard_id); if messages_count > max_messages as u64 { break; @@ -446,14 +444,14 @@ impl InternalsPartitionReader { let shard_range_to = if shard_id == self.for_shard_id { if range_seqno != self.block_seqno { - let diff = - self.mq_adapter - .get_diff(shard_id, range_seqno) - .ok_or_else(|| { - anyhow!("cannot get diff for block {shard_id}:{range_seqno}") - })?; - - *diff.max_message() + let diff = self + .mq_adapter + .get_diff(&shard_id, range_seqno)? + .ok_or_else(|| { + anyhow!("cannot get diff for block {shard_id}:{range_seqno}") + })?; + + diff.max_message } else { QueueKey::max_for_lt(self.prev_state_gen_lt) } diff --git a/collator/src/collator/messages_reader/mod.rs b/collator/src/collator/messages_reader/mod.rs index 1b4c1aa88..d0998b748 100644 --- a/collator/src/collator/messages_reader/mod.rs +++ b/collator/src/collator/messages_reader/mod.rs @@ -6,7 +6,7 @@ use anyhow::{Context, Result}; use everscale_types::cell::HashBytes; use everscale_types::models::{MsgsExecutionParams, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; -use tycho_util::FastHashSet; +use tycho_util::{FastHashMap, FastHashSet}; use self::externals_reader::*; use self::internals_reader::*; @@ -15,9 +15,8 @@ pub(super) use self::reader_state::*; use super::messages_buffer::{DisplayMessageGroup, MessageGroup, MessagesBufferLimits}; use super::types::{AnchorsCache, MsgsExecutionParamsExtension}; use crate::collator::messages_buffer::DebugMessageGroup; -use crate::internal_queue::queue::ShortQueueDiff; use crate::internal_queue::types::{ - EnqueuedMessage, PartitionRouter, QueueDiffWithMessages, QueueStatistics, + DiffStatistics, EnqueuedMessage, PartitionRouter, QueueDiffWithMessages, QueueStatistics, }; use crate::queue_adapter::MessageQueueAdapter; use crate::tracing_targets; @@ -367,7 +366,7 @@ impl MessagesReader { pub fn finalize( mut self, current_next_lt: u64, - diffs: Vec<(ShardIdent, ShortQueueDiff)>, + diffs_info: &FastHashMap, ) -> Result { let mut has_unprocessed_messages = self.has_messages_in_buffers() || self.has_pending_new_messages() @@ -435,7 +434,7 @@ impl MessagesReader { &mut queue_diff_with_msgs.partition_router, aggregated_stats, self.for_shard_id, - diffs, + diffs_info, )?; // metrics: accounts count in isolated partitions @@ -498,7 +497,7 @@ impl MessagesReader { partition_router: &mut PartitionRouter, aggregated_stats: QueueStatistics, for_shard_id: ShardIdent, - top_block_diffs: Vec<(ShardIdent, ShortQueueDiff)>, + diffs_info: &FastHashMap, ) -> Result> { let par_0_msgs_count_limit = msgs_exec_params.par_0_int_msgs_count_limit as u64; let mut moved_from_par_0_accounts = FastHashSet::default(); @@ -531,7 +530,7 @@ impl MessagesReader { dest_int_address, ); // if we have account for another shard then take info from that shard - let acc_shard_diff_info = top_block_diffs + let acc_shard_diff_info = diffs_info .iter() .find(|(shard_id, _)| shard_id.contains_address(&dest_int_address)) .map(|(_, diff)| diff); @@ -546,14 +545,13 @@ impl MessagesReader { ); msgs_count } - Some(diff) => { + Some((router, statistics)) => { tracing::trace!(target: tracing_targets::COLLATOR, "use diff for address {} because we have diff", dest_int_address, ); // getting remote shard partition from diff - let remote_shard_partition = - diff.router().get_partition(None, &dest_int_address); + let remote_shard_partition = router.get_partition(None, &dest_int_address); tracing::trace!(target: tracing_targets::COLLATOR, "remote shard partition for address {} is {}", @@ -571,7 +569,7 @@ impl MessagesReader { } // if remote partition == 0 then we need to check statistics - let remote_msgs_count = match diff.statistics().partition(0) { + let remote_msgs_count = match statistics.partition(0) { None => { tracing::trace!(target: tracing_targets::COLLATOR, "use aggregated stats for address {} because we do not have partition 0 stats in diff", diff --git a/collator/src/collator/mod.rs b/collator/src/collator/mod.rs index 784e3decc..70a6f93c5 100644 --- a/collator/src/collator/mod.rs +++ b/collator/src/collator/mod.rs @@ -45,6 +45,7 @@ mod messages_reader; mod types; pub use error::CollationCancelReason; +use tycho_util::FastHashMap; pub use types::ForceMasterCollation; #[cfg(test)] @@ -1315,7 +1316,7 @@ impl CollatorStdImpl { mut reader_state, .. } = messages_reader.finalize( 0, // can pass 0 because new messages reader was not initialized in this case - vec![], + &FastHashMap::default(), )?; std::mem::swap(&mut working_state.reader_state, &mut reader_state); diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index 7195ebbc1..50c8e0a8d 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -1,3 +1,4 @@ +use std::cmp::max; use std::collections::BTreeMap; use std::marker::PhantomData; use std::sync::Arc; @@ -8,6 +9,7 @@ use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; use serde::{Deserialize, Serialize}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; +use tycho_storage::model::DiffInfo; use tycho_util::metrics::HistogramGuard; use tycho_util::{serde_helpers, FastDashMap, FastHashMap, FastHashSet}; @@ -23,8 +25,8 @@ use crate::internal_queue::types::{ DiffStatistics, InternalMessageValue, PartitionRouter, QueueDiffWithMessages, QueueShardRange, QueueStatistics, }; -use crate::tracing_targets; use crate::types::ProcessedTo; +use crate::{internal_queue, tracing_targets}; // FACTORY @@ -105,9 +107,9 @@ where /// Get diffs for the given blocks from committed and uncommitted state fn get_diffs(&self, blocks: FastHashMap) -> Vec<(ShardIdent, ShortQueueDiff)>; /// Get diff for the given blocks from committed and uncommitted state - fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option; + fn get_diff(&self, shard_ident: &ShardIdent, seqno: u32) -> Result>; /// Check if diff exists in the cache - fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool; + fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result; /// Get last applied mc block id from committed state fn get_last_applied_mc_block_id(&self) -> Result>; } @@ -252,22 +254,36 @@ where .or_default(); // Check for duplicate diffs based on the block_id_short.seqno and hash - let shard_diff = shard_diffs.get(&block_id_short.seqno); + // let shard_diff = shard_diffs.get(&block_id_short.seqno); + + let shard_diff = internal_queue::queue::Queue::get_diff( + self, + &block_id_short.shard, + block_id_short.seqno, + )?; // Check if the diff is already applied // return if hash is the same if let Some(shard_diff) = shard_diff { // Check if the diff is already applied with different hash - if shard_diff.hash() != hash { + if shard_diff.hash != *hash { bail!( "Duplicate diff with different hash: block_id={}, existing diff_hash={}, new diff_hash={}", - block_id_short, shard_diff.hash(), hash, + block_id_short, shard_diff.hash, hash, ) } return Ok(()); } - let last_applied_seqno = shard_diffs.keys().last().cloned(); + let last_applied_seqno_uncommitted = self + .uncommitted_state + .get_last_applied_block_seqno(&block_id_short.shard)?; + + let last_applied_seqno_committed = self + .committed_state + .get_last_applied_block_seqno(&block_id_short.shard)?; + + let last_applied_seqno = max(last_applied_seqno_uncommitted, last_applied_seqno_committed); if let Some(last_applied_seqno) = last_applied_seqno { // Check if the diff is already applied @@ -286,15 +302,14 @@ where } // Add messages to uncommitted_state if there are any - if !diff.messages.is_empty() { - self.uncommitted_state.add_messages_with_statistics( - &block_id_short, - &diff.partition_router, - &diff.messages, - &statistics, - &max_message, - )?; - } + self.uncommitted_state.add_messages_with_statistics( + &block_id_short, + &diff.partition_router, + &diff.messages, + &statistics, + &max_message, + *hash, + )?; let short_diff = ShortQueueDiff::new( diff.processed_to, @@ -409,14 +424,9 @@ where fn clear_uncommitted_state(&self) -> Result<()> { self.uncommitted_state.truncate()?; - let diffs_before_clear: usize = - self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); self.uncommitted_diffs.clear(); - let diffs_after_clear: usize = self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); tracing::info!( target: tracing_targets::MQ, - diffs_before_clear, - diffs_after_clear, "Cleared uncommitted diffs.", ); Ok(()) @@ -467,20 +477,16 @@ where result } - fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option { - if let Some(shard_diffs) = self.uncommitted_diffs.get(&shard_ident) { - if let Some(diff) = shard_diffs.get(&seqno) { - return Some(diff.clone()); - } + fn get_diff(&self, shard_ident: &ShardIdent, seqno: u32) -> Result> { + if let Some(diff) = self.uncommitted_state.get_diff_info(&shard_ident, seqno)? { + return Ok(Some(diff)); } - if let Some(shard_diffs) = self.committed_diffs.get(&shard_ident) { - if let Some(diff) = shard_diffs.get(&seqno) { - return Some(diff.clone()); - } + if let Some(diff) = self.committed_state.get_diff_info(&shard_ident, seqno)? { + return Ok(Some(diff)); } - None + Ok(None) } fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, max_message_from: &QueueKey) -> u32 { @@ -493,7 +499,8 @@ where .get_diffs_tail_len(shard_ident, max_message_from); tracing::info!( - target: tracing_targets::MQ, + // target: tracing_targets::MQ, + target: "local_debug", shard_ident = ?shard_ident, uncommitted_tail_len, committed_tail_len, @@ -503,14 +510,13 @@ where uncommitted_tail_len + committed_tail_len } - fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool { - self.uncommitted_diffs - .get(&block_id_short.shard) - .is_some_and(|diffs| diffs.contains_key(&block_id_short.seqno)) - || self - .committed_diffs - .get(&block_id_short.shard) - .is_some_and(|diffs| diffs.contains_key(&block_id_short.seqno)) + fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result { + Ok(internal_queue::queue::Queue::get_diff( + self, + &block_id_short.shard, + block_id_short.seqno, + )? + .is_some()) } fn get_last_applied_mc_block_id(&self) -> Result> { diff --git a/collator/src/internal_queue/state/commited_state.rs b/collator/src/internal_queue/state/commited_state.rs index b4552f435..0ae232e21 100644 --- a/collator/src/internal_queue/state/commited_state.rs +++ b/collator/src/internal_queue/state/commited_state.rs @@ -1,7 +1,7 @@ use anyhow::Result; use everscale_types::models::{BlockId, IntAddr, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; -use tycho_storage::model::{DiffTailKey, ShardsInternalMessagesKey}; +use tycho_storage::model::{DiffInfo, DiffInfoKey, DiffTailKey, ShardsInternalMessagesKey}; use tycho_storage::{InternalQueueSnapshot, Storage}; use tycho_util::metrics::HistogramGuard; use tycho_util::FastHashMap; @@ -84,6 +84,8 @@ pub trait CommittedState: Send + Sync { /// Get last applied mc block id fn get_last_applied_mc_block_id(&self) -> Result>; fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32; + fn get_diff_info(&self, shard_ident: &ShardIdent, seqno: u32) -> Result>; + fn get_last_applied_block_seqno(&self, shard_ident: &ShardIdent) -> Result>; } // IMPLEMENTATION @@ -176,4 +178,24 @@ impl CommittedState for CommittedStateStdImpl { max_message: *from, }) } + + fn get_diff_info(&self, shard_ident: &ShardIdent, seqno: u32) -> Result> { + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + let diff_info_bytes = snapshot.get_diff_info_committed(&DiffInfoKey { + shard_ident: *shard_ident, + seqno, + })?; + + if let Some(diff_info_bytes) = diff_info_bytes { + let diff_info = tl_proto::deserialize(&diff_info_bytes)?; + Ok(Some(diff_info)) + } else { + Ok(None) + } + } + + fn get_last_applied_block_seqno(&self, shard_ident: &ShardIdent) -> Result> { + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + snapshot.get_last_applied_block_seqno_committed(shard_ident) + } } diff --git a/collator/src/internal_queue/state/uncommitted_state.rs b/collator/src/internal_queue/state/uncommitted_state.rs index 7c2a0def2..7a8381613 100644 --- a/collator/src/internal_queue/state/uncommitted_state.rs +++ b/collator/src/internal_queue/state/uncommitted_state.rs @@ -3,8 +3,11 @@ use std::sync::Arc; use anyhow::Result; use everscale_types::models::{BlockId, BlockIdShort, IntAddr, ShardIdent}; +use everscale_types::prelude::HashBytes; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr}; -use tycho_storage::model::{DiffTailKey, QueueRange, ShardsInternalMessagesKey, StatKey}; +use tycho_storage::model::{ + DiffInfo, DiffInfoKey, DiffTailKey, QueueRange, ShardsInternalMessagesKey, StatKey, +}; use tycho_storage::{InternalQueueSnapshot, InternalQueueTransaction, Storage}; use tycho_util::metrics::HistogramGuard; use tycho_util::{FastHashMap, FastHashSet}; @@ -90,6 +93,7 @@ pub trait LocalUncommittedState { messages: &BTreeMap>, statistics: &DiffStatistics, max_message: &QueueKey, + diff_hash: HashBytes, ) -> Result<()>; /// Load statistics for given partition and ranges @@ -103,6 +107,12 @@ pub trait LocalUncommittedState { /// Get diffs tail length fn get_diffs_tail_len(&self, shard_ident: &ShardIdent, from: &QueueKey) -> u32; + + /// Get diff info + fn get_diff_info(&self, shard_ident: &ShardIdent, seqno: u32) -> Result>; + + /// Get last applied block seqno by shard ident + fn get_last_applied_block_seqno(&self, shard_ident: &ShardIdent) -> Result>; } // IMPLEMENTATION @@ -178,15 +188,18 @@ impl UncommittedState for UncommittedStateStdImpl { messages: &BTreeMap>, statistics: &DiffStatistics, max_message: &QueueKey, + diff_hash: HashBytes, ) -> Result<()> { let mut tx = self.storage.internal_queue_storage().begin_transaction(); Self::add_messages(&mut tx, block_id_short.shard, partition_router, messages)?; Self::add_statistics(&mut tx, statistics)?; Self::add_diff_tail(&mut tx, block_id_short, max_message); + Self::add_diff_info(&mut tx, block_id_short, statistics, max_message, diff_hash); let _histogram = HistogramGuard::begin("tycho_internal_queue_add_messages_with_statistics_write_time"); + tracing::info!(target: "local_debug", "WRITE {:?}", block_id_short); tx.write() } @@ -221,6 +234,26 @@ impl UncommittedState for UncommittedStateStdImpl { max_message: *from, }) } + + fn get_diff_info(&self, shard_ident: &ShardIdent, seqno: u32) -> Result> { + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + let diff_info_bytes = snapshot.get_diff_info_uncommitted(&DiffInfoKey { + shard_ident: *shard_ident, + seqno, + })?; + + if let Some(diff_info_bytes) = diff_info_bytes { + let diff_info = tl_proto::deserialize(&diff_info_bytes)?; + Ok(Some(diff_info)) + } else { + Ok(None) + } + } + + fn get_last_applied_block_seqno(&self, shard_ident: &ShardIdent) -> Result> { + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + snapshot.get_last_applied_block_seqno_uncommitted(shard_ident) + } } impl UncommittedStateStdImpl { @@ -304,4 +337,29 @@ impl UncommittedStateStdImpl { block_id_short.seqno.to_le_bytes().as_slice(), ); } + + fn add_diff_info( + internal_queue_tx: &mut InternalQueueTransaction, + block_id_short: &BlockIdShort, + diff_statistics: &DiffStatistics, + max_message: &QueueKey, + hash: HashBytes, + ) { + let shard_messages_count = diff_statistics.shards_messages_count(); + + let key = DiffInfoKey { + shard_ident: block_id_short.shard, + seqno: block_id_short.seqno, + }; + + let diff_info = DiffInfo { + max_message: *max_message, + shards_messages_count: shard_messages_count.clone(), + hash, + }; + + let serialized_diff_info = tl_proto::serialize(diff_info); + + internal_queue_tx.insert_diff_info_uncommitted(&key, &serialized_diff_info); + } } diff --git a/collator/src/internal_queue/types.rs b/collator/src/internal_queue/types.rs index 54df9d91f..4be982b56 100644 --- a/collator/src/internal_queue/types.rs +++ b/collator/src/internal_queue/types.rs @@ -350,6 +350,27 @@ impl QueueStatistics { } } } + + pub fn shard_messages_count(&self) -> FastHashMap { + let mut shards_messages_count = FastHashMap::default(); + + for stat in self.statistics.iter() { + let (addr, msg_count) = stat; + // TODO after split/merge implementation we should use detailed counter for 256 shards + let dest_shard = if addr.is_masterchain() { + ShardIdent::MASTERCHAIN + } else { + ShardIdent::new_full(0) + }; + + shards_messages_count + .entry(dest_shard) + .and_modify(|count: &mut u64| *count += *msg_count) + .or_insert(*msg_count); + } + + shards_messages_count + } } impl PartialEq for QueueStatistics { @@ -402,6 +423,10 @@ impl DiffStatistics { .copied() .unwrap_or_default() } + + pub fn shards_messages_count(&self) -> &FastHashMap { + &self.inner.shards_messages_count + } } #[derive(Debug, Clone)] struct DiffStatisticsInner { @@ -412,6 +437,26 @@ struct DiffStatisticsInner { shards_messages_count: FastHashMap, } +impl DiffStatistics { + pub fn new( + shard_ident: ShardIdent, + min_message: QueueKey, + max_message: QueueKey, + statistics: FastHashMap>, + shards_messages_count: FastHashMap, + ) -> Self { + Self { + inner: Arc::new(DiffStatisticsInner { + shard_ident, + min_message, + max_message, + statistics, + shards_messages_count, + }), + } + } +} + impl From<(&QueueDiffWithMessages, ShardIdent)> for DiffStatistics { fn from(value: (&QueueDiffWithMessages, ShardIdent)) -> Self { let (diff, shard_ident) = value; @@ -463,6 +508,7 @@ impl From<(&QueueDiffWithMessages, ShardIdent)> for mod tests { use std::collections::{BTreeMap, BTreeSet}; + use tycho_storage::model::DiffInfo; use tycho_util::FastHashSet; use super::*; @@ -516,4 +562,31 @@ mod tests { assert_eq!(*src_router.get(&addr4).unwrap(), 10); } } + + #[test] + fn test_diff_info_value_serialization() { + // 1) Create example data + let mut map = FastHashMap::default(); + map.insert(ShardIdent::MASTERCHAIN, 123); + map.insert(ShardIdent::BASECHAIN, 999); + + let original = DiffInfo { + max_message: QueueKey { + lt: 42, + hash: HashBytes::from([0xAB; 32]), + }, + shards_messages_count: map, + hash: Default::default(), + }; + + // 2) Serialize + let serialized = tl_proto::serialize(&original); + + // 3) Deserialize + let deserialized = tl_proto::deserialize::(&serialized) + .expect("Failed to deserialize DiffInfoValue"); + + // 4) Compare original and deserialized + assert_eq!(original, deserialized); + } } diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index ce54c75f2..e8e2f14b1 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -502,7 +502,7 @@ where } // skip already applied diff - if mq_adapter.is_diff_exists(&block_entry.block_id.as_short_id()) { + if mq_adapter.is_diff_exists(&block_entry.block_id.as_short_id())? { return Ok(()); } diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 968cd3934..dccfcf898 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -3,6 +3,7 @@ use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; use tracing::instrument; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; +use tycho_storage::model::DiffInfo; use tycho_util::metrics::HistogramGuard; use tycho_util::FastHashMap; @@ -73,9 +74,9 @@ where /// Get diffs for the given blocks from committed and uncommitted state fn get_diffs(&self, blocks: FastHashMap) -> Vec<(ShardIdent, ShortQueueDiff)>; /// Get diff for the given block from committed and uncommitted state - fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option; + fn get_diff(&self, shard_ident: &ShardIdent, seqno: u32) -> Result>; /// Check if diff exists in the cache - fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool; + fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result; /// Get last applied mc block id from committed state fn get_last_applied_mc_block_id(&self) -> Result>; /// Get diffs tail len from uncommitted state and committed state @@ -148,7 +149,14 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI self.queue .apply_diff(diff, block_id_short, diff_hash, statistics, max_message)?; - tracing::info!(target: tracing_targets::MQ_ADAPTER, + // tracing::info!(target: tracing_targets::MQ_ADAPTER, + // new_messages_len = len, + // elapsed = ?time.elapsed(), + // processed_to = ?processed_to, + // "Diff applied", + // ); + + tracing::info!(target: "local_debug", new_messages_len = len, elapsed = ?time.elapsed(), processed_to = ?processed_to, @@ -202,11 +210,11 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI self.queue.get_diffs(blocks) } - fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option { + fn get_diff(&self, shard_ident: &ShardIdent, seqno: u32) -> Result> { self.queue.get_diff(shard_ident, seqno) } - fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool { + fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> Result { self.queue.is_diff_exists(block_id_short) } diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index a4cdca648..3a264506e 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -1049,7 +1049,7 @@ fn create_dump_msg_envelope(message: Lazy) -> Lazy { .unwrap() } #[tokio::test(flavor = "multi_thread", worker_threads = 4)] -async fn test_queue_tail() -> anyhow::Result<()> { +async fn test_queue_tail_and_diff_info() -> anyhow::Result<()> { let (storage, _tmp_dir) = Storage::new_temp().await?; let queue_factory = QueueFactoryStdImpl { @@ -1067,14 +1067,14 @@ async fn test_queue_tail() -> anyhow::Result<()> { let block_mc1 = BlockId { shard: ShardIdent::MASTERCHAIN, - seqno: 0, + seqno: 1, root_hash: Default::default(), file_hash: Default::default(), }; let block_mc2 = BlockId { shard: ShardIdent::MASTERCHAIN, - seqno: 1, + seqno: 2, root_hash: Default::default(), file_hash: Default::default(), }; @@ -1137,29 +1137,69 @@ async fn test_queue_tail() -> anyhow::Result<()> { max_message, )?; + // -- test case 1 let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN); - // length 2 in uncommitted state assert_eq!(diff_len_mc, 2); + // first diff has only one message with lt=1 + let diff_info_mc1 = queue + .get_diff(&ShardIdent::MASTERCHAIN, block_mc1.seqno)? + .unwrap(); + assert_eq!(diff_info_mc1.max_message, QueueKey::min_for_lt(1)); + + // second diff has three messages with lt=2,3,4 + let diff_info_mc2 = queue + .get_diff(&ShardIdent::MASTERCHAIN, block_mc2.seqno)? + .unwrap(); + assert_eq!(diff_info_mc2.max_message, QueueKey::min_for_lt(4)); + + // -- test case 2 // commit first diff queue.commit_diff(&[(block_mc1, true)])?; + let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN); // one diff moved to committed state. one diff left in uncommitted state // uncommitted: 1; committed: 1 assert_eq!(diff_len_mc, 2); + // first diff has only one message with lt=1 + let diff_info_mc1 = queue + .get_diff(&ShardIdent::MASTERCHAIN, block_mc1.seqno)? + .unwrap(); + + assert_eq!(diff_info_mc1.max_message, QueueKey::min_for_lt(1)); + + // second diff has three messages with lt=2,3,4 + let diff_info_mc2 = queue + .get_diff(&ShardIdent::MASTERCHAIN, block_mc2.seqno)? + .unwrap(); + assert_eq!(diff_info_mc2.max_message, QueueKey::min_for_lt(4)); + + // -- test case 3 // exclude committed diff by range let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &end_key_mc1.next_value()); // uncommitted: 1; committed: 0 (1) assert_eq!(diff_len_mc, 1); + // -- test case 4 // clear uncommitted state with second diff queue.clear_uncommitted_state()?; let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &QueueKey::MIN); // uncommitted: 0; committed: 1 assert_eq!(diff_len_mc, 1); + // first diff has only one message with lt=1 + let diff_info_mc1 = queue + .get_diff(&ShardIdent::MASTERCHAIN, block_mc1.seqno)? + .unwrap(); + assert_eq!(diff_info_mc1.max_message, QueueKey::min_for_lt(1)); + + // second diff removed because it was located in uncommitted state + let diff_info_mc2 = queue.get_diff(&ShardIdent::MASTERCHAIN, block_mc2.seqno)?; + assert!(diff_info_mc2.is_none()); + + // -- test case 5 // exclude committed diff by range let diff_len_mc = queue.get_diffs_tail_len(&ShardIdent::MASTERCHAIN, &end_key_mc1.next_value()); // uncommitted: 0; committed: 0 (1) diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index 4ad369866..84d261897 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -139,6 +139,8 @@ weedb::tables! { pub internal_message_var: tables::InternalMessageVar, pub internal_message_diffs_tail: tables::InternalMessageDiffsTail, pub internal_message_diffs_tail_uncommitted: tables::InternalMessageDiffsTailUncommitted, + pub internal_message_diff_info: tables::InternalMessageDiffInfo, + pub internal_message_diff_info_uncommitted: tables::InternalMessageDiffInfoUncommitted, } } diff --git a/storage/src/db/kv_db/tables.rs b/storage/src/db/kv_db/tables.rs index b6683293d..c7c3e5188 100644 --- a/storage/src/db/kv_db/tables.rs +++ b/storage/src/db/kv_db/tables.rs @@ -548,6 +548,36 @@ impl ColumnFamilyOptions for InternalMessageDiffsTail { } } +pub struct InternalMessageDiffInfo; +impl ColumnFamily for InternalMessageDiffInfo { + const NAME: &'static str = "int_msg_diff_info"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessageDiffInfo { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + +pub struct InternalMessageDiffInfoUncommitted; +impl ColumnFamily for InternalMessageDiffInfoUncommitted { + const NAME: &'static str = "int_msg_diff_info_uncommitted"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessageDiffInfoUncommitted { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + fn archive_data_merge( _: &[u8], current_value: Option<&[u8]>, diff --git a/storage/src/store/internal_queue/mod.rs b/storage/src/store/internal_queue/mod.rs index a57b8e12a..d869e02a4 100644 --- a/storage/src/store/internal_queue/mod.rs +++ b/storage/src/store/internal_queue/mod.rs @@ -5,10 +5,12 @@ use everscale_types::models::{BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpd use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions}; use tycho_util::FastHashMap; use weedb::rocksdb::{DBRawIterator, WriteBatch}; -use weedb::{BoundedCfHandle, ColumnFamily, OwnedRawIterator, OwnedSnapshot, Table}; +use weedb::{rocksdb, BoundedCfHandle, ColumnFamily, OwnedRawIterator, OwnedSnapshot, Table}; use crate::db::*; -use crate::model::{DiffTailKey, QueueRange, ShardsInternalMessagesKey, StatKey}; +use crate::model::{ + DiffInfo, DiffInfoKey, DiffTailKey, QueueRange, ShardsInternalMessagesKey, StatKey, +}; use crate::util::StoredValue; use crate::QueueStateReader; @@ -73,6 +75,7 @@ impl InternalQueueStorage { let stats_cf = this.db.internal_message_stats.cf(); let var_cf = this.db.internal_message_var.cf(); let diffs_tail_cf = this.db.internal_message_diffs_tail.cf(); + let diff_infos_cf = this.db.internal_message_diff_info.cf(); let mut batch = weedb::rocksdb::WriteBatch::default(); @@ -80,6 +83,8 @@ impl InternalQueueStorage { let mut statistics: FastHashMap> = FastHashMap::default(); while let Some(mut part) = reader.read_next_queue_diff()? { + let mut shards_messages_count = FastHashMap::default(); + while let Some(cell) = part.read_next_message()? { let msg_hash = cell.repr_hash(); let msg = cell.parse::>()?; @@ -105,6 +110,14 @@ impl InternalQueueStorage { account: dest.address, }; + // TODO after split/merge implementation we should use detailed counter for 256 shards + let dest_shard = ShardIdent::new_full(dest_addr.workchain as i32); + + shards_messages_count + .entry(dest_shard) + .and_modify(|count| *count += 1) + .or_insert(1); + let queue_diff = part.queue_diff(); let partition = get_partition(&queue_diff.router_partitions_dst, &dest_addr) .or_else(|| get_partition(&queue_diff.router_partitions_src, &src_addr)) @@ -133,27 +146,38 @@ impl InternalQueueStorage { // insert diff tail let diff_tail_key = DiffTailKey { - shard_ident: block_id.shard, + shard_ident: queue_diff.shard_ident, max_message: queue_diff.max_message, }; batch.put_cf( &diffs_tail_cf, diff_tail_key.to_vec(), - block_id.seqno.to_le_bytes().as_slice(), + queue_diff.seqno.to_le_bytes(), ); - // insert last applied diff + // insert diff info + let diff_info_key = DiffInfoKey { + shard_ident: queue_diff.shard_ident, + seqno: queue_diff.seqno, + }; + + let diff_info = DiffInfo { + max_message: queue_diff.max_message, + shards_messages_count, + hash: queue_diff.hash, + }; + batch.put_cf( - &var_cf, - INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY, - block_id.to_vec(), + &diff_infos_cf, + diff_info_key.to_vec(), + tl_proto::serialize(diff_info), ); for (partition, statistics) in statistics.drain() { for (dest, count) in statistics.iter() { let key = StatKey { - shard_ident: block_id.shard, + shard_ident: queue_diff.shard_ident, partition, min_message: queue_diff.min_message, max_message: queue_diff.max_message, @@ -165,6 +189,13 @@ impl InternalQueueStorage { } } + // insert last applied diff + batch.put_cf( + &var_cf, + INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY, + block_id.to_vec(), + ); + reader.finish()?; this.db.rocksdb().write(batch)?; @@ -197,10 +228,12 @@ impl InternalQueueStorage { let mut msgs_to_compact = Vec::new(); let mut stats_to_compact = Vec::new(); let mut diffs_tail_to_compact = Vec::new(); + let mut diff_info_to_compact = Vec::new(); let messages_cf = &self.db.shard_internal_messages.cf(); let stats_cf = &self.db.internal_message_stats.cf(); let diffs_tail_cf = &self.db.internal_message_diffs_tail.cf(); + let diff_info_cf = &self.db.internal_message_diff_info.cf(); for range in ranges { // Delete messages in one range @@ -245,7 +278,7 @@ impl InternalQueueStorage { &mut stats_to_compact, ); - // Delete diffs tail in one range + // delete tail and info let start_diff_tail_key = DiffTailKey { shard_ident: range.shard_ident, max_message: range.from, @@ -256,14 +289,45 @@ impl InternalQueueStorage { max_message: range.to, }; - delete_range( + let from_diff_tail_bytes = start_diff_tail_key.to_vec(); + let to_diff_tail_bytes = end_diff_tail_key.to_vec(); + + let (min_seqno, max_seqno) = delete_diff_tails_and_collect_seqno( &mut batch, - diffs_tail_cf, - &start_diff_tail_key.to_vec(), - &end_diff_tail_key.to_vec(), - &bump, - &mut diffs_tail_to_compact, - ); + self.db.rocksdb().as_ref(), + &self.db.internal_message_diffs_tail, + range.shard_ident, + &from_diff_tail_bytes, + &to_diff_tail_bytes, + )?; + + diffs_tail_to_compact.push(( + bump.alloc_slice_copy(&from_diff_tail_bytes), + bump.alloc_slice_copy(&to_diff_tail_bytes), + )); + + // if we found some valid seqnos, also delete the [min_seqno .. max_seqno] from diff_info + if min_seqno != u32::MAX && max_seqno != 0 { + let from_diff_info = DiffInfoKey { + shard_ident: range.shard_ident, + seqno: min_seqno, + } + .to_vec(); + let to_diff_info = DiffInfoKey { + shard_ident: range.shard_ident, + seqno: max_seqno, + } + .to_vec(); + + // Range-delete for diff_info + batch.delete_range_cf(diff_info_cf, &from_diff_info, &to_diff_info); + batch.delete_cf(diff_info_cf, &to_diff_info); + + diff_info_to_compact.push(( + bump.alloc_slice_copy(&from_diff_info), + bump.alloc_slice_copy(&to_diff_info), + )); + } } let db = self.db.rocksdb().as_ref(); @@ -278,6 +342,9 @@ impl InternalQueueStorage { for (start_key, end_key) in diffs_tail_to_compact { db.compact_range_cf(diffs_tail_cf, Some(start_key), Some(end_key)); } + for (start_key, end_key) in diff_info_to_compact { + db.compact_range_cf(diff_info_cf, Some(start_key), Some(end_key)); + } Ok(()) } @@ -311,12 +378,20 @@ impl InternalQueueStorage { &[0xff; StatKey::SIZE_HINT], ); + let diffs_info_cf = &self.db.internal_message_diff_info_uncommitted.cf(); + clear_table( + diffs_info_cf, + &[0x00; StatKey::SIZE_HINT], + &[0xff; StatKey::SIZE_HINT], + ); + let db = self.db.rocksdb().as_ref(); db.write(batch)?; db.compact_range_cf(messages_cf, None::<[u8; 0]>, None::<[u8; 0]>); db.compact_range_cf(stats_cf, None::<[u8; 0]>, None::<[u8; 0]>); db.compact_range_cf(diffs_tail_cf, None::<[u8; 0]>, None::<[u8; 0]>); + db.compact_range_cf(diffs_info_cf, None::<[u8; 0]>, None::<[u8; 0]>); Ok(()) } @@ -359,6 +434,11 @@ impl InternalQueueTransaction { self.batch.put_cf(&cf, key.to_vec(), value); } + pub fn insert_diff_info_uncommitted(&mut self, key: &DiffInfoKey, value: &[u8]) { + let cf = self.db.internal_message_diff_info_uncommitted.cf(); + self.batch.put_cf(&cf, key.to_vec(), value); + } + pub fn insert_message_uncommitted( &mut self, key: &ShardsInternalMessagesKey, @@ -377,6 +457,36 @@ impl InternalQueueTransaction { self.batch.put_cf(&cf, key.to_vec(), self.buffer.as_slice()); } + fn commit_range( + batch: &mut WriteBatch, + source_iter: &mut DBRawIterator<'_>, + from_key: &[u8], + to_key: &[u8], + source_cf: &BoundedCfHandle<'_>, + target_cf: &BoundedCfHandle<'_>, + ) -> Result<()> { + source_iter.seek(from_key); + + loop { + let (key, value) = match source_iter.item() { + Some(item) => item, + None => return source_iter.status().map_err(Into::into), + }; + + if key > to_key { + break; + } + + // Move from uncommitted => committed + batch.delete_cf(source_cf, key); + batch.put_cf(target_cf, key, value); + + source_iter.next(); + } + + Ok(()) + } + pub fn commit_messages>( &mut self, snapshot: &InternalQueueSnapshot, @@ -384,67 +494,56 @@ impl InternalQueueTransaction { ) -> Result<()> { let db = self.db.rocksdb().as_ref(); - let mut commit_range = |source_iter: &mut DBRawIterator<'_>, - from_key: &[u8], - to_key: &[u8], - source_cf: &BoundedCfHandle<'_>, - target_cf: &BoundedCfHandle<'_>| { - source_iter.seek(from_key); + // -- Prepare CF handles -- + let messages_cf = self.db.shard_internal_messages.cf(); + let uncommited_messages_cf = self.db.shard_internal_messages_uncommitted.cf(); - loop { - let (key, value) = match source_iter.item() { - Some(item) => item, - None => return source_iter.status(), - }; - - if key > to_key { - break; - } - - self.batch.delete_cf(source_cf, key); - self.batch.put_cf(target_cf, key, value); + let stats_cf = self.db.internal_message_stats.cf(); + let uncommited_stats_cf = self.db.internal_message_stats_uncommitted.cf(); - source_iter.next(); - } - - Ok(()) - }; + let diff_tail_committed_cf = self.db.internal_message_diffs_tail.cf(); + let diff_tail_uncommitted_cf = self.db.internal_message_diffs_tail_uncommitted.cf(); - let messages = &self.db.shard_internal_messages; - let messages_cf = &messages.cf(); - - let uncommited_messages = &self.db.shard_internal_messages_uncommitted; - let uncommited_messages_cf = &uncommited_messages.cf(); + let diff_info_committed_cf = self.db.internal_message_diff_info.cf(); + let diff_info_uncommitted_cf = self.db.internal_message_diff_info_uncommitted.cf(); + // -- Prepare iterators -- let mut uncommited_messages_iter = { - let mut readopts = uncommited_messages.new_read_config(); + let mut readopts = self + .db + .shard_internal_messages_uncommitted + .new_read_config(); readopts.set_snapshot(&snapshot.snapshot); - db.raw_iterator_cf_opt(uncommited_messages_cf, readopts) + db.raw_iterator_cf_opt(&uncommited_messages_cf, readopts) }; - let stats = &self.db.internal_message_stats; - let stats_cf = &stats.cf(); - - let uncommited_stats = &self.db.internal_message_stats_uncommitted; - let uncommited_stats_cf = &uncommited_stats.cf(); - - let diff_tail_committed_cf = &self.db.internal_message_diffs_tail.cf(); - let diff_tail_uncommitted_cf = &self.db.internal_message_diffs_tail_uncommitted.cf(); - let mut uncommited_stats_iter = { - let mut readopts = uncommited_stats.new_read_config(); + let mut readopts = self.db.internal_message_stats_uncommitted.new_read_config(); readopts.set_snapshot(&snapshot.snapshot); - db.raw_iterator_cf_opt(uncommited_stats_cf, readopts) + db.raw_iterator_cf_opt(&uncommited_stats_cf, readopts) }; let mut uncommited_diff_tail_iter = { - let mut readopts = uncommited_stats.new_read_config(); + let mut readopts = self + .db + .internal_message_diff_info_uncommitted + .new_read_config(); readopts.set_snapshot(&snapshot.snapshot); - db.raw_iterator_cf_opt(diff_tail_uncommitted_cf, readopts) + db.raw_iterator_cf_opt(&diff_tail_uncommitted_cf, readopts) }; + let mut uncommited_diff_info_iter = { + let mut readopts = self + .db + .internal_message_diff_info_uncommitted + .new_read_config(); + readopts.set_snapshot(&snapshot.snapshot); + db.raw_iterator_cf_opt(&diff_info_uncommitted_cf, readopts) + }; + + // -- Process each range -- for range in ranges { - // Commit messages for one range + // 1) Commit messages in [from..to] let from_message_key = ShardsInternalMessagesKey { partition: range.partition, shard_ident: range.shard_ident, @@ -456,15 +555,16 @@ impl InternalQueueTransaction { internal_message_key: range.to, }; - commit_range( + Self::commit_range( + &mut self.batch, &mut uncommited_messages_iter, &from_message_key.to_vec(), &to_message_key.to_vec(), - uncommited_messages_cf, - messages_cf, + &uncommited_messages_cf, + &messages_cf, )?; - // Commit stats for one range + // 2) Commit stats in [from..to] let from_stat_key = StatKey { shard_ident: range.shard_ident, partition: range.partition, @@ -480,32 +580,89 @@ impl InternalQueueTransaction { dest: RouterAddr::MAX, }; - commit_range( + Self::commit_range( + &mut self.batch, &mut uncommited_stats_iter, &from_stat_key.to_vec(), &to_stat_key.to_vec(), - uncommited_stats_cf, - stats_cf, + &uncommited_stats_cf, + &stats_cf, )?; - // Collect diffs tails range + // 3) Collect diff tails in [from..to] let from_diff_tail_key = DiffTailKey { shard_ident: range.shard_ident, max_message: range.from, }; - let to_diff_tail_key = DiffTailKey { shard_ident: range.shard_ident, max_message: range.to, }; - commit_range( - &mut uncommited_diff_tail_iter, - &from_diff_tail_key.to_vec(), - &to_diff_tail_key.to_vec(), - diff_tail_uncommitted_cf, - diff_tail_committed_cf, - )?; + let from_diff_tail_bytes = from_diff_tail_key.to_vec(); + let to_diff_tail_bytes = to_diff_tail_key.to_vec(); + + // Track min/max seqno encountered + uncommited_diff_tail_iter.seek(&from_diff_tail_bytes); + + let mut min_seqno = u32::MAX; + let mut max_seqno = 0; + + loop { + let Some((raw_key, raw_value)) = uncommited_diff_tail_iter.item() else { + match uncommited_diff_tail_iter.status() { + Ok(()) => break, + Err(e) => return Err(e.into()), + } + }; + + if raw_key > &to_diff_tail_bytes[..] { + break; + } + + // block_seqno is stored in the first 4 bytes + if raw_value.len() < 4 { + return Err(anyhow::anyhow!("Invalid diff tail value length")); + } + let block_seqno = u32::from_le_bytes(raw_value[..4].try_into()?); + + if block_seqno < min_seqno { + min_seqno = block_seqno; + } + if block_seqno > max_seqno { + max_seqno = block_seqno; + } + + self.batch.delete_cf(&diff_tail_uncommitted_cf, raw_key); + self.batch + .put_cf(&diff_tail_committed_cf, raw_key, raw_value); + + uncommited_diff_tail_iter.next(); + } + + // 4) Commit diff info [min_seqno..max_seqno] + if min_seqno != u32::MAX && max_seqno != 0 { + let from_diff_info_key = DiffInfoKey { + shard_ident: range.shard_ident, + seqno: min_seqno, + } + .to_vec(); + let to_diff_info_key = DiffInfoKey { + shard_ident: range.shard_ident, + seqno: max_seqno, + } + .to_vec(); + + // Move the diff info + Self::commit_range( + &mut self.batch, + &mut uncommited_diff_info_iter, + &from_diff_info_key, + &to_diff_info_key, + &diff_info_uncommitted_cf, + &diff_info_committed_cf, + )?; + } } Ok(()) @@ -554,6 +711,63 @@ impl InternalQueueSnapshot { self.calc_diffs_tail(&self.db.internal_message_diffs_tail_uncommitted, from) } + pub fn get_diff_info_committed(&self, key: &DiffInfoKey) -> Result>> { + self.get_diff_info(&self.db.internal_message_diff_info, key) + } + + pub fn get_diff_info_uncommitted(&self, key: &DiffInfoKey) -> Result>> { + self.get_diff_info(&self.db.internal_message_diff_info_uncommitted, key) + } + + pub fn get_last_applied_diff_info( + &self, + table: &Table, + shard_ident: &ShardIdent, + ) -> Result> { + let mut read_config = table.new_read_config(); + read_config.set_snapshot(&self.snapshot); + + // Set the range to iterate over all the keys in the table for the given shard + let from = DiffInfoKey::new(*shard_ident, 0); + read_config.set_iterate_lower_bound(from.to_vec().to_vec()); + + let to = DiffInfoKey::new(*shard_ident, u32::MAX); + read_config.set_iterate_upper_bound(to.to_vec().to_vec()); + + let cf = table.cf(); + let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config); + + let key = DiffInfoKey::new(*shard_ident, u32::MAX); + + iter.seek_for_prev(key.to_vec().as_slice()); + + let value = match iter.key() { + Some(mut value) => { + let key = DiffInfoKey::deserialize(&mut value); + key.seqno + } + None => return Ok(None), + }; + + Ok(Some(value)) + } + + pub fn get_last_applied_block_seqno_uncommitted( + &self, + shard_ident: &ShardIdent, + ) -> Result> { + let table = &self.db.internal_message_diff_info_uncommitted; + self.get_last_applied_diff_info(table, shard_ident) + } + + pub fn get_last_applied_block_seqno_committed( + &self, + shard_ident: &ShardIdent, + ) -> Result> { + let table = &self.db.internal_message_diff_info; + self.get_last_applied_diff_info(table, shard_ident) + } + fn iter_messages( &self, table: &Table, @@ -580,11 +794,18 @@ impl InternalQueueSnapshot { let mut read_config = table.new_read_config(); read_config.set_snapshot(&self.snapshot); + let from_bytes = from.to_vec(); + read_config.set_iterate_lower_bound(from_bytes.as_slice()); + let to = DiffTailKey { + shard_ident: from.shard_ident, + max_message: QueueKey::MAX, + }; + read_config.set_iterate_upper_bound(to.to_vec().to_vec()); + let cf = table.cf(); let mut iter = self.db.rocksdb().raw_iterator_cf_opt(&cf, read_config); - let from_key = from.to_vec(); - iter.seek(&from_key); + iter.seek(&from_bytes); let mut count = 0; while let Some((_, _)) = iter.item() { @@ -595,6 +816,20 @@ impl InternalQueueSnapshot { count } + pub fn get_diff_info( + &self, + table: &Table, + key: &DiffInfoKey, + ) -> Result>> { + let mut read_config = table.new_read_config(); + read_config.set_snapshot(&self.snapshot); + + let cf = table.cf(); + let data = self.db.rocksdb().get_cf(&cf, key.to_vec().as_slice())?; + + Ok(data) + } + pub fn collect_committed_stats_in_range( &self, shard_ident: ShardIdent, @@ -721,3 +956,61 @@ pub struct InternalQueueMessage<'a> { pub prefix: u64, pub message_boc: &'a [u8], } + +fn delete_diff_tails_and_collect_seqno( + batch: &mut WriteBatch, + db: &rocksdb::DB, + diffs_tail_table: &Table, + shard_ident: ShardIdent, + from_key: &[u8], + to_key: &[u8], +) -> Result<(u32, u32)> { + let read_opts = diffs_tail_table.new_read_config(); + let mut iter = db.raw_iterator_cf_opt(&diffs_tail_table.cf(), read_opts); + + iter.seek(from_key); + + let mut min_seqno = u32::MAX; + let mut max_seqno = 0; + + loop { + let Some((raw_key, raw_value)) = iter.item() else { + match iter.status() { + Ok(()) => break, + Err(e) => return Err(e.into()), + } + }; + + if raw_key > to_key { + break; + } + + let current_tail_key = DiffTailKey::from_slice(raw_key); + if current_tail_key.shard_ident != shard_ident { + break; + } + + if raw_value.len() < 4 { + return Err(anyhow::anyhow!( + "Invalid diff tail value length: {} < 4", + raw_value.len() + )); + } + let block_seqno = u32::from_le_bytes(raw_value[..4].try_into()?); + + if block_seqno < min_seqno { + min_seqno = block_seqno; + } + if block_seqno > max_seqno { + max_seqno = block_seqno; + } + + // Delete this tail + batch.delete_cf(&diffs_tail_table.cf(), raw_key); + + // Move to next + iter.next(); + } + + Ok((min_seqno, max_seqno)) +} diff --git a/storage/src/store/internal_queue/model.rs b/storage/src/store/internal_queue/model.rs index 76f3c9438..51129aad0 100644 --- a/storage/src/store/internal_queue/model.rs +++ b/storage/src/store/internal_queue/model.rs @@ -1,6 +1,9 @@ use everscale_types::cell::HashBytes; use everscale_types::models::ShardIdent; +use tl_proto::{TlError, TlPacket, TlRead, TlResult, TlWrite}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr}; +use tycho_block_util::tl; +use tycho_util::FastHashMap; use crate::util::{StoredValue, StoredValueBuffer}; #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -159,6 +162,18 @@ impl DiffTailKey { } } +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct DiffInfoKey { + pub shard_ident: ShardIdent, + pub seqno: u32, +} + +impl DiffInfoKey { + pub fn new(shard_ident: ShardIdent, seqno: u32) -> Self { + Self { shard_ident, seqno } + } +} + impl StoredValue for StatKey { const SIZE_HINT: usize = ShardIdent::SIZE_HINT + QueuePartitionIdx::SIZE_HINT @@ -223,6 +238,33 @@ impl StoredValue for DiffTailKey { } } +impl StoredValue for DiffInfoKey { + const SIZE_HINT: usize = ShardIdent::SIZE_HINT + 4; + type OnStackSlice = [u8; Self::SIZE_HINT]; + + fn serialize(&self, buffer: &mut T) { + self.shard_ident.serialize(buffer); + buffer.write_raw_slice(&self.seqno.to_be_bytes()); + } + + fn deserialize(reader: &mut &[u8]) -> Self + where + Self: Sized, + { + if reader.len() < Self::SIZE_HINT { + panic!("Insufficient data for deserialization"); + } + + let shard_ident = ShardIdent::deserialize(reader); + let mut seqno_bytes = [0u8; 4]; + seqno_bytes.copy_from_slice(&reader[..4]); + let seqno = u32::from_be_bytes(seqno_bytes); + *reader = &reader[4..]; + + Self { shard_ident, seqno } + } +} + pub struct QueueRange { pub shard_ident: ShardIdent, pub partition: QueuePartitionIdx, @@ -252,3 +294,81 @@ impl StoredValue for QueuePartitionIdx { partition } } + +#[test] +fn diff_info_key_serialization() { + let key = DiffInfoKey::new(ShardIdent::MASTERCHAIN, 10); + let mut buffer = Vec::with_capacity(DiffInfoKey::SIZE_HINT); + key.serialize(&mut buffer); + let key2 = DiffInfoKey::deserialize(&mut buffer.as_slice()); + assert_eq!(key, key2); +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct DiffInfo { + pub max_message: QueueKey, + pub shards_messages_count: FastHashMap, + pub hash: HashBytes, +} + +impl DiffInfo { + pub fn get_messages_count_by_shard(&self, shard_ident: &ShardIdent) -> u64 { + self.shards_messages_count + .get(shard_ident) + .copied() + .unwrap_or_default() + } +} + +impl TlWrite for DiffInfo { + type Repr = tl_proto::Boxed; + + fn max_size_hint(&self) -> usize { + QueueKey::SIZE_HINT + + 4 + + self.shards_messages_count.len() * (tl::shard_ident::SIZE_HINT + 8) + + tl::hash_bytes::SIZE_HINT + } + + fn write_to(&self, packet: &mut P) { + self.max_message.write_to(packet); + packet.write_u32(self.shards_messages_count.len() as u32); + + for (shard_ident, count) in &self.shards_messages_count { + tl::shard_ident::write(shard_ident, packet); + packet.write_u64(*count); + } + + tl::hash_bytes::write(&self.hash, packet); + } +} + +impl<'tl> TlRead<'tl> for DiffInfo { + type Repr = tl_proto::Boxed; + + fn read_from(data: &mut &'tl [u8]) -> TlResult { + let max_message = QueueKey::read_from(data)?; + + let len = u32::read_from(data)? as usize; + if len > 10_000_000 { + return Err(TlError::InvalidData); + } + + let mut shards_messages_count = + FastHashMap::with_capacity_and_hasher(len, Default::default()); + + for _ in 0..len { + let shard_ident = tl::shard_ident::read(data)?; + let count = u64::read_from(data)?; + shards_messages_count.insert(shard_ident, count); + } + + let hash = tl::hash_bytes::read(data)?; + + Ok(DiffInfo { + max_message, + shards_messages_count, + hash, + }) + } +}