From 52f38bd7904b65a03193f8f867de244e5c14969a Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Wed, 12 Feb 2025 16:22:01 +0100 Subject: [PATCH] feature(collator): int queue version --- collator/src/internal_queue/queue.rs | 45 +-- .../internal_queue/state/commited_state.rs | 11 +- .../internal_queue/state/uncommitted_state.rs | 11 +- collator/src/manager/mod.rs | 16 +- collator/src/queue_adapter.rs | 12 +- collator/tests/internal_queue.rs | 145 ++++++++-- core/src/block_strider/starter/cold_boot.rs | 2 +- storage/src/db/kv_db/mod.rs | 1 + storage/src/db/kv_db/tables.rs | 15 + storage/src/store/internal_queue/mod.rs | 256 ++++++++++-------- 10 files changed, 348 insertions(+), 166 deletions(-) diff --git a/collator/src/internal_queue/queue.rs b/collator/src/internal_queue/queue.rs index 4bc2ce986..4966ddcb5 100644 --- a/collator/src/internal_queue/queue.rs +++ b/collator/src/internal_queue/queue.rs @@ -3,9 +3,9 @@ use std::marker::PhantomData; use std::sync::Arc; use std::time::Duration; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail, Result}; use everscale_types::cell::HashBytes; -use everscale_types::models::{BlockIdShort, ShardIdent}; +use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; use serde::{Deserialize, Serialize}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; use tycho_util::metrics::HistogramGuard; @@ -91,7 +91,7 @@ where max_message: QueueKey, ) -> Result<()>; /// Move messages from uncommitted state to committed state and update gc ranges - fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()>; + fn commit_diff(&self, mc_top_blocks: &[(BlockId, bool)]) -> Result<()>; /// remove all data in uncommitted state storage fn clear_uncommitted_state(&self) -> Result<()>; /// Returns the number of diffs in cache for the given shard @@ -110,6 +110,8 @@ where fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option; /// Check if diff exists in the cache fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool; + /// Get last applied mc block id from committed state + fn get_last_applied_mc_block_id(&self) -> Result>; } // IMPLEMENTATION @@ -309,29 +311,33 @@ where Ok(()) } - fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()> { + fn commit_diff(&self, mc_top_blocks: &[(BlockId, bool)]) -> Result<()> { + let mc_block_id = mc_top_blocks + .iter() + .find(|(block_id, _)| block_id.is_masterchain()) + .map(|(block_id, _)| block_id) + .ok_or_else(|| anyhow!("Masterchain block not found in commit_diff"))?; + let mut partitions = FastHashSet::default(); // insert default partition because we doesn't store it in router partitions.insert(QueuePartitionIdx::default()); let mut shards_to_commit = FastHashMap::default(); let mut gc_ranges = FastHashMap::default(); - for (block_id_short, top_shard_block_changed) in mc_top_blocks { + for (block_id, top_shard_block_changed) in mc_top_blocks { let mut diffs_to_commit = vec![]; // find all uncommited diffs for the given shard top block - let prev_shard_uncommitted_diffs = - self.uncommitted_diffs.get_mut(&block_id_short.shard); + let prev_shard_uncommitted_diffs = self.uncommitted_diffs.get_mut(&block_id.shard); if let Some(mut shard_uncommitted_diffs) = prev_shard_uncommitted_diffs { // iterate over all uncommitted diffs for the given shard until the top block seqno - shard_uncommitted_diffs - .range(..=block_id_short.seqno) - .for_each(|(block_seqno, shard_diff)| { + shard_uncommitted_diffs.range(..=block_id.seqno).for_each( + |(block_seqno, shard_diff)| { diffs_to_commit.push(*block_seqno); let current_last_key = shards_to_commit - .entry(block_id_short.shard) + .entry(block_id.shard) .or_insert_with(|| *shard_diff.max_message()); // Add all partitions from the router to the partitions set @@ -346,7 +352,7 @@ where } // find min processed_to for each shard for GC - if *block_seqno == block_id_short.seqno && *top_shard_block_changed { + if *block_seqno == block_id.seqno && *top_shard_block_changed { for (shard_ident, processed_to_key) in shard_diff.processed_to().iter() { let last_key = gc_ranges @@ -358,16 +364,15 @@ where } } } - }); + }, + ); // remove all diffs from uncommitted state that are going to be committed for seqno in diffs_to_commit { if let Some(diff) = shard_uncommitted_diffs.remove(&seqno) { // Move the diff to committed_diffs - let mut shard_committed_diffs = self - .committed_diffs - .entry(block_id_short.shard) - .or_default(); + let mut shard_committed_diffs = + self.committed_diffs.entry(block_id.shard).or_default(); shard_committed_diffs.insert(seqno, diff); } } @@ -385,7 +390,7 @@ where // move all uncommitted diffs messages to committed state self.uncommitted_state - .commit(partitions.clone(), &commit_ranges)?; + .commit(partitions.clone(), &commit_ranges, mc_block_id)?; let uncommitted_diffs_count: usize = self.uncommitted_diffs.iter().map(|r| r.value().len()).sum(); @@ -515,4 +520,8 @@ where .get(&block_id_short.shard) .is_some_and(|diffs| diffs.contains_key(&block_id_short.seqno)) } + + fn get_last_applied_mc_block_id(&self) -> Result> { + self.committed_state.get_last_applied_mc_block_id() + } } diff --git a/collator/src/internal_queue/state/commited_state.rs b/collator/src/internal_queue/state/commited_state.rs index 6aad7ced4..1d065d055 100644 --- a/collator/src/internal_queue/state/commited_state.rs +++ b/collator/src/internal_queue/state/commited_state.rs @@ -1,5 +1,5 @@ use anyhow::Result; -use everscale_types::models::{IntAddr, ShardIdent}; +use everscale_types::models::{BlockId, IntAddr, ShardIdent}; use tycho_block_util::queue::QueuePartitionIdx; use tycho_storage::model::ShardsInternalMessagesKey; use tycho_storage::{InternalQueueSnapshot, Storage}; @@ -80,6 +80,9 @@ pub trait CommittedState: Send + Sync { partition: QueuePartitionIdx, range: &[QueueShardRange], ) -> Result<()>; + + /// Get last applied mc block id + fn get_last_applied_mc_block_id(&self) -> Result>; } // IMPLEMENTATION @@ -158,4 +161,10 @@ impl CommittedState for CommittedStateStdImpl { Ok(()) } + + fn get_last_applied_mc_block_id(&self) -> Result> { + self.storage + .internal_queue_storage() + .get_last_applied_mc_block_id() + } } diff --git a/collator/src/internal_queue/state/uncommitted_state.rs b/collator/src/internal_queue/state/uncommitted_state.rs index 126a0d9ed..c2aa6d553 100644 --- a/collator/src/internal_queue/state/uncommitted_state.rs +++ b/collator/src/internal_queue/state/uncommitted_state.rs @@ -2,7 +2,7 @@ use std::collections::BTreeMap; use std::sync::Arc; use anyhow::Result; -use everscale_types::models::{IntAddr, ShardIdent}; +use everscale_types::models::{BlockId, IntAddr, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr}; use tycho_storage::model::{QueueRange, ShardsInternalMessagesKey, StatKey}; use tycho_storage::{InternalQueueSnapshot, InternalQueueTransaction, Storage}; @@ -77,6 +77,7 @@ pub trait LocalUncommittedState { &self, partitions: FastHashSet, ranges: &[QueueShardRange], + mc_block_id: &BlockId, ) -> Result<()>; /// Delete all uncommitted messages and statistics @@ -143,6 +144,7 @@ impl UncommittedState for UncommittedStateStdImpl { &self, partitions: FastHashSet, ranges: &[QueueShardRange], + mc_block_id: &BlockId, ) -> Result<()> { let ranges = partitions.iter().flat_map(|&partition| { ranges.iter().map(move |range| QueueRange { @@ -152,8 +154,13 @@ impl UncommittedState for UncommittedStateStdImpl { to: range.to, }) }); + let snapshot = self.storage.internal_queue_storage().make_snapshot(); + let mut tx = self.storage.internal_queue_storage().begin_transaction(); + + tx.commit_messages(&snapshot, ranges)?; + tx.set_last_applied_mc_block_id(mc_block_id); - self.storage.internal_queue_storage().commit(ranges) + tx.write() } fn truncate(&self) -> Result<()> { diff --git a/collator/src/manager/mod.rs b/collator/src/manager/mod.rs index 4aa3acaae..96b10ceb2 100644 --- a/collator/src/manager/mod.rs +++ b/collator/src/manager/mod.rs @@ -473,9 +473,9 @@ where let mut top_blocks: Vec<_> = top_shard_blocks_info .iter() - .map(|(id, updated)| (id.as_short_id(), *updated)) + .map(|(id, updated)| (*id, *updated)) .collect(); - top_blocks.push((block_id.as_short_id(), true)); + top_blocks.push((*block_id, true)); if let Err(err) = self.mq_adapter.commit_diff(top_blocks) { bail!( @@ -1442,7 +1442,7 @@ where // collect top blocks queue diffs already applied to let queue_diffs_applied_to_top_blocks = if let Some(applied_to_mc_block_id) = - self.get_queue_diffs_applied_to_mc_block_id(last_collated_mc_block_id) + self.get_queue_diffs_applied_to_mc_block_id(last_collated_mc_block_id)? { self.get_top_blocks_seqno(&applied_to_mc_block_id).await? } else { @@ -1702,18 +1702,18 @@ where fn get_queue_diffs_applied_to_mc_block_id( &self, last_collated_mc_block_id: Option, - ) -> Option { + ) -> Result> { let last_processed_mc_block_id = *self.last_processed_mc_block_id.lock(); match (last_processed_mc_block_id, last_collated_mc_block_id) { (Some(last_processed), Some(last_collated)) => { if last_processed.seqno > last_collated.seqno { - Some(last_processed) + Ok(Some(last_processed)) } else { - Some(last_collated) + Ok(Some(last_collated)) } } - (Some(mc_block_id), _) | (_, Some(mc_block_id)) => Some(mc_block_id), - _ => None, + (Some(mc_block_id), _) | (_, Some(mc_block_id)) => Ok(Some(mc_block_id)), + _ => self.mq_adapter.get_last_applied_mc_block_id(), } } diff --git a/collator/src/queue_adapter.rs b/collator/src/queue_adapter.rs index 20a74e232..664e1727f 100644 --- a/collator/src/queue_adapter.rs +++ b/collator/src/queue_adapter.rs @@ -1,6 +1,6 @@ use anyhow::Result; use everscale_types::cell::HashBytes; -use everscale_types::models::{BlockIdShort, ShardIdent}; +use everscale_types::models::{BlockId, BlockIdShort, ShardIdent}; use tracing::instrument; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx}; use tycho_util::metrics::HistogramGuard; @@ -53,7 +53,7 @@ where /// Commit previously applied diff, saving changes to committed state (waiting for the operation to complete). /// Return `None` if specified diff does not exist. - fn commit_diff(&self, mc_top_blocks: Vec<(BlockIdShort, bool)>) -> Result<()>; + fn commit_diff(&self, mc_top_blocks: Vec<(BlockId, bool)>) -> Result<()>; /// Add new messages to the iterator fn add_message_to_iterator( @@ -80,6 +80,8 @@ where fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize; /// Check if diff exists in the cache fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool; + /// Get last applied mc block id from committed state + fn get_last_applied_mc_block_id(&self) -> Result>; } impl MessageQueueAdapterStdImpl { @@ -157,7 +159,7 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI Ok(()) } - fn commit_diff(&self, mc_top_blocks: Vec<(BlockIdShort, bool)>) -> Result<()> { + fn commit_diff(&self, mc_top_blocks: Vec<(BlockId, bool)>) -> Result<()> { let time = std::time::Instant::now(); self.queue.commit_diff(&mc_top_blocks)?; @@ -223,4 +225,8 @@ impl MessageQueueAdapter for MessageQueueAdapterStdI fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool { self.queue.is_diff_exists(block_id_short) } + + fn get_last_applied_mc_block_id(&self) -> Result> { + self.queue.get_last_applied_mc_block_id() + } } diff --git a/collator/tests/internal_queue.rs b/collator/tests/internal_queue.rs index 608fd7e8a..52f3b6b23 100644 --- a/collator/tests/internal_queue.rs +++ b/collator/tests/internal_queue.rs @@ -222,10 +222,19 @@ async fn test_queue() -> anyhow::Result<()> { let queue: QueueImpl = queue_factory.create(); + let mc_block = BlockId { + shard: ShardIdent::MASTERCHAIN, + seqno: 10, + root_hash: Default::default(), + file_hash: Default::default(), + }; + // create first block with queue diff - let block1 = BlockIdShort { + let block1 = BlockId { shard: ShardIdent::new_full(0), seqno: 0, + root_hash: Default::default(), + file_hash: Default::default(), }; let mut diff = QueueDiffWithMessages::new(); @@ -320,7 +329,7 @@ async fn test_queue() -> anyhow::Result<()> { queue.apply_diff( diff_with_messages, - block1, + block1.as_short_id(), &HashBytes::from([1; 32]), diff_statistics, max_message, @@ -328,9 +337,11 @@ async fn test_queue() -> anyhow::Result<()> { // end block 1 diff // create second block with queue diff - let block2 = BlockIdShort { + let block2 = BlockId { shard: ShardIdent::new_full(0), seqno: 1, + root_hash: Default::default(), + file_hash: Default::default(), }; let mut diff = QueueDiffWithMessages::new(); @@ -425,7 +436,7 @@ async fn test_queue() -> anyhow::Result<()> { queue.apply_diff( diff_with_messages, - block2, + block2.as_short_id(), &HashBytes::from([1; 32]), diff_statistics, max_message, @@ -440,7 +451,7 @@ async fn test_queue() -> anyhow::Result<()> { dest_3_normal_priority, )?; - queue.commit_diff(&[(block1, true)])?; + queue.commit_diff(&[(mc_block, true), (block1, true)])?; test_statistics_check_statistics( &queue, dest_1_low_priority, @@ -466,7 +477,7 @@ async fn test_queue() -> anyhow::Result<()> { ranges.push(queue_range); - let iterators = queue.iterator(1, &ranges, ShardIdent::new_full(-1))?; + let iterators = queue.iterator(1, &ranges, ShardIdent::MASTERCHAIN)?; let mut iterator_manager = StatesIteratorsManager::new(iterators); let mut read_count = 0; @@ -505,7 +516,7 @@ async fn test_queue() -> anyhow::Result<()> { ranges.push(queue_range); - let iterators = queue.iterator(1, &ranges, ShardIdent::new_full(-1))?; + let iterators = queue.iterator(1, &ranges, ShardIdent::MASTERCHAIN)?; let mut iterator_manager = StatesIteratorsManager::new(iterators); let mut read_count = 0; @@ -528,7 +539,7 @@ async fn test_queue() -> anyhow::Result<()> { assert_eq!(read_count, 2000); // test commit all diffs and check statistics - queue.commit_diff(&[(block2, true)])?; + queue.commit_diff(&[(mc_block, true), (block2, true)])?; test_statistics_check_statistics( &queue, dest_1_low_priority, @@ -619,7 +630,7 @@ async fn test_iteration_from_two_shards() -> anyhow::Result<()> { // create second block with queue diff let block2 = BlockIdShort { - shard: ShardIdent::new_full(-1), + shard: ShardIdent::MASTERCHAIN, seqno: 0, }; let mut diff = QueueDiffWithMessages::new(); @@ -678,7 +689,7 @@ async fn test_iteration_from_two_shards() -> anyhow::Result<()> { }; let queue_range2 = QueueShardRange { - shard_ident: ShardIdent::new_full(-1), + shard_ident: ShardIdent::MASTERCHAIN, from: QueueKey { lt: 10000, hash: HashBytes::default(), @@ -705,7 +716,7 @@ async fn test_iteration_from_two_shards() -> anyhow::Result<()> { }; let stat_range2 = QueueShardRange { - shard_ident: ShardIdent::new_full(-1), + shard_ident: ShardIdent::MASTERCHAIN, from: QueueKey { lt: 1, hash: HashBytes::default(), @@ -725,7 +736,7 @@ async fn test_iteration_from_two_shards() -> anyhow::Result<()> { .unwrap_or_default(); assert_eq!(stat, 30000); - let iterators = queue.iterator(1, &ranges, ShardIdent::new_full(-1))?; + let iterators = queue.iterator(1, &ranges, ShardIdent::MASTERCHAIN)?; let mut iterator_manager = StatesIteratorsManager::new(iterators); let mut read_count = 0; @@ -1053,14 +1064,19 @@ async fn test_queue_tail() -> anyhow::Result<()> { let queue: QueueImpl = queue_factory.create(); - let block_mc1 = BlockIdShort { - shard: ShardIdent::new_full(-1), + + let block_mc1 = BlockId { + shard: ShardIdent::MASTERCHAIN, seqno: 0, + root_hash: Default::default(), + file_hash: Default::default(), }; - let block_mc2 = BlockIdShort { - shard: ShardIdent::new_full(-1), + let block_mc2 = BlockId { + shard: ShardIdent::MASTERCHAIN, seqno: 1, + root_hash: Default::default(), + file_hash: Default::default(), }; let mut diff_mc1 = QueueDiffWithMessages::new(); let mut diff_mc2 = QueueDiffWithMessages::new(); @@ -1106,7 +1122,7 @@ async fn test_queue_tail() -> anyhow::Result<()> { // apply two diffs queue.apply_diff( diff_mc1, - block_mc1, + block_mc1.as_short_id(), &HashBytes::from([1; 32]), statistics_mc1, max_message, @@ -1115,31 +1131,114 @@ async fn test_queue_tail() -> anyhow::Result<()> { let max_message = *diff_mc2.messages.keys().last().unwrap(); queue.apply_diff( diff_mc2, - block_mc2, + block_mc2.as_short_id(), &HashBytes::from([2; 32]), statistics_mc2, max_message, )?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); assert_eq!(diff_len_mc, 2); // commit first diff queue.commit_diff(&[(block_mc1, true)])?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); assert_eq!(diff_len_mc, 2); // trim first diff - queue.trim_diffs(&ShardIdent::new_full(-1), &end_key_mc1)?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + queue.trim_diffs(&ShardIdent::MASTERCHAIN, &end_key_mc1)?; + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); assert_eq!(diff_len_mc, 1); // clear uncommitted state with second diff queue.clear_uncommitted_state()?; - let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::new_full(-1)); + let diff_len_mc = queue.get_diffs_count_by_shard(&ShardIdent::MASTERCHAIN); assert_eq!(diff_len_mc, 0); Ok(()) } + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn test_version() -> anyhow::Result<()> { + let (storage, _tmp_dir) = Storage::new_temp().await?; + + let queue_factory = QueueFactoryStdImpl { + uncommitted_state_factory: UncommittedStateImplFactory { + storage: storage.clone(), + }, + committed_state_factory: CommittedStateImplFactory { storage }, + config: QueueConfig { + gc_interval: Duration::from_secs(1), + }, + }; + + let queue: QueueImpl = + queue_factory.create(); + + let block_mc1 = BlockId { + shard: ShardIdent::MASTERCHAIN, + seqno: 0, + root_hash: HashBytes::from([11; 32]), + file_hash: HashBytes::from([12; 32]), + }; + + let block_mc2 = BlockId { + shard: ShardIdent::MASTERCHAIN, + seqno: 1, + + root_hash: HashBytes::from([1; 32]), + file_hash: HashBytes::from([2; 32]), + }; + + let mut diff_mc1 = QueueDiffWithMessages::new(); + let mut diff_mc2 = QueueDiffWithMessages::new(); + + let stored_objects = vec![create_stored_object(1, RouterAddr { + workchain: -1, + account: HashBytes::from([1; 32]), + })?]; + + if let Some(stored_object) = stored_objects.first() { + diff_mc1 + .messages + .insert(stored_object.key(), stored_object.clone()); + } + + for stored_object in &stored_objects { + diff_mc2 + .messages + .insert(stored_object.key(), stored_object.clone()); + } + + let statistics_mc1 = (&diff_mc1, block_mc1.shard).into(); + + let max_message = *diff_mc1.messages.keys().last().unwrap(); + + let version = queue.get_last_applied_mc_block_id()?; + assert_eq!(version, None); + + queue.apply_diff( + diff_mc1, + block_mc1.as_short_id(), + &HashBytes::from([1; 32]), + statistics_mc1, + max_message, + )?; + + let version = queue.get_last_applied_mc_block_id()?; + assert_eq!(version, None); + + queue.commit_diff(&[(block_mc1, true)])?; + + let version = queue.get_last_applied_mc_block_id()?; + assert_eq!(version, Some(block_mc1)); + + queue.commit_diff(&[(block_mc2, true)])?; + + let version = queue.get_last_applied_mc_block_id()?; + assert_eq!(version, Some(block_mc2)); + + Ok(()) +} diff --git a/core/src/block_strider/starter/cold_boot.rs b/core/src/block_strider/starter/cold_boot.rs index ecfc9ee56..57ebb734d 100644 --- a/core/src/block_strider/starter/cold_boot.rs +++ b/core/src/block_strider/starter/cold_boot.rs @@ -780,7 +780,7 @@ impl StarterInner { }; internal_queue - .import_from_file(block_id.shard, top_update, file) + .import_from_file(top_update, file, *block_id) .await?; remove_state_file.await; diff --git a/storage/src/db/kv_db/mod.rs b/storage/src/db/kv_db/mod.rs index acfc74a30..27eecfd16 100644 --- a/storage/src/db/kv_db/mod.rs +++ b/storage/src/db/kv_db/mod.rs @@ -136,6 +136,7 @@ weedb::tables! { pub shard_internal_messages_uncommitted: tables::ShardInternalMessagesUncommited, pub internal_message_stats: tables::InternalMessageStats, pub internal_message_stats_uncommitted: tables::InternalMessageStatsUncommited, + pub internal_message_var: tables::InternalMessageVar, } } diff --git a/storage/src/db/kv_db/tables.rs b/storage/src/db/kv_db/tables.rs index 64e6f3363..5fe3d5bf4 100644 --- a/storage/src/db/kv_db/tables.rs +++ b/storage/src/db/kv_db/tables.rs @@ -503,6 +503,21 @@ impl ColumnFamilyOptions for InternalMessageStatsUncommited { } } +pub struct InternalMessageVar; +impl ColumnFamily for InternalMessageVar { + const NAME: &'static str = "int_msg_var"; + + fn read_options(opts: &mut ReadOptions) { + opts.set_verify_checksums(true); + } +} + +impl ColumnFamilyOptions for InternalMessageVar { + fn options(opts: &mut Options, caches: &mut Caches) { + zstd_block_based_table_factory(opts, caches); + } +} + fn archive_data_merge( _: &[u8], current_value: Option<&[u8]>, diff --git a/storage/src/store/internal_queue/mod.rs b/storage/src/store/internal_queue/mod.rs index 13cee4188..c9621be32 100644 --- a/storage/src/store/internal_queue/mod.rs +++ b/storage/src/store/internal_queue/mod.rs @@ -1,7 +1,7 @@ use std::fs::File; use anyhow::Result; -use everscale_types::models::{IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardIdent}; +use everscale_types::models::{BlockId, IntAddr, Message, MsgInfo, OutMsgQueueUpdates, ShardIdent}; use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr, RouterPartitions}; use tycho_util::FastHashMap; use weedb::rocksdb::{DBRawIterator, WriteBatch}; @@ -18,6 +18,8 @@ pub mod model; pub struct InternalQueueStorage { db: BaseDb, } +// Constant for the last applied mc block id key +const INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY: &[u8] = b"last_applied_mc_block_id"; impl InternalQueueStorage { pub fn new(db: BaseDb) -> Self { @@ -41,9 +43,9 @@ impl InternalQueueStorage { pub async fn import_from_file( &self, - shard_ident: ShardIdent, top_update: &OutMsgQueueUpdates, file: File, + block_id: BlockId, ) -> Result<()> { use everscale_types::boc::ser::BocHeader; @@ -69,6 +71,7 @@ impl InternalQueueStorage { let messages_cf = this.db.shard_internal_messages.cf(); let stats_cf = this.db.internal_message_stats.cf(); + let var_cf = this.db.internal_message_var.cf(); let mut batch = weedb::rocksdb::WriteBatch::default(); @@ -108,7 +111,7 @@ impl InternalQueueStorage { let key = ShardsInternalMessagesKey { partition, - shard_ident, + shard_ident: block_id.shard, internal_message_key: QueueKey { lt: int_msg_info.created_lt, hash: *msg_hash, @@ -126,10 +129,17 @@ impl InternalQueueStorage { } let queue_diff = part.queue_diff(); + + batch.put_cf( + &var_cf, + INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY, + block_id.to_vec(), + ); + for (partition, statistics) in statistics.drain() { for (dest, count) in statistics.iter() { let key = StatKey { - shard_ident, + shard_ident: block_id.shard, partition, min_message: queue_diff.min_message, max_message: queue_diff.max_message, @@ -149,112 +159,6 @@ impl InternalQueueStorage { .await? } - pub fn commit>(&self, ranges: I) -> Result<()> { - let snapshot = self.db.rocksdb().snapshot(); - - let db = self.db.rocksdb().as_ref(); - let mut batch = WriteBatch::default(); - - let mut commit_range = |source_iter: &mut DBRawIterator<'_>, - from_key: &[u8], - to_key: &[u8], - source_cf: &BoundedCfHandle<'_>, - target_cf: &BoundedCfHandle<'_>| { - source_iter.seek(from_key); - - loop { - let (key, value) = match source_iter.item() { - Some(item) => item, - None => return source_iter.status(), - }; - - if key > to_key { - break; - } - - batch.delete_cf(source_cf, key); - batch.put_cf(target_cf, key, value); - - source_iter.next(); - } - - Ok(()) - }; - - let messages = &self.db.shard_internal_messages; - let messages_cf = &messages.cf(); - - let uncommited_messages = &self.db.shard_internal_messages_uncommitted; - let uncommited_messages_cf = &uncommited_messages.cf(); - - let mut uncommited_messages_iter = { - let mut readopts = uncommited_messages.new_read_config(); - readopts.set_snapshot(&snapshot); - db.raw_iterator_cf_opt(uncommited_messages_cf, readopts) - }; - - let stats = &self.db.internal_message_stats; - let stats_cf = &stats.cf(); - - let uncommited_stats = &self.db.internal_message_stats_uncommitted; - let uncommited_stats_cf = &uncommited_stats.cf(); - - let mut uncommited_stats_iter = { - let mut readopts = uncommited_stats.new_read_config(); - readopts.set_snapshot(&snapshot); - db.raw_iterator_cf_opt(uncommited_stats_cf, readopts) - }; - - for range in ranges { - // Commit messages for one range - let from_message_key = ShardsInternalMessagesKey { - partition: range.partition, - shard_ident: range.shard_ident, - internal_message_key: range.from, - }; - let to_message_key = ShardsInternalMessagesKey { - partition: range.partition, - shard_ident: range.shard_ident, - internal_message_key: range.to, - }; - - commit_range( - &mut uncommited_messages_iter, - &from_message_key.to_vec(), - &to_message_key.to_vec(), - uncommited_messages_cf, - messages_cf, - )?; - - // Commit stats for one range - let from_stat_key = StatKey { - shard_ident: range.shard_ident, - partition: range.partition, - min_message: range.from, - max_message: QueueKey::MIN, - dest: RouterAddr::MIN, - }; - let to_stat_key = StatKey { - shard_ident: range.shard_ident, - partition: range.partition, - min_message: range.to, - max_message: QueueKey::MAX, - dest: RouterAddr::MAX, - }; - - commit_range( - &mut uncommited_stats_iter, - &from_stat_key.to_vec(), - &to_stat_key.to_vec(), - uncommited_stats_cf, - stats_cf, - )?; - } - - // Apply batch - self.db.rocksdb().write(batch).map_err(Into::into) - } - pub fn delete>(&self, ranges: I) -> Result<()> { fn delete_range<'a>( batch: &mut WriteBatch, @@ -368,6 +272,20 @@ impl InternalQueueStorage { db.compact_range_cf(stats_cf, None::<[u8; 0]>, None::<[u8; 0]>); Ok(()) } + + /// Retrieves the queue version from the `internal_message_version` column family under the key `mc_version` + pub fn get_last_applied_mc_block_id(&self) -> Result> { + let cf = self.db.internal_message_var.cf(); + let data = self + .db + .rocksdb() + .get_cf(&cf, INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY)?; + if let Some(bytes) = data { + return Ok(Some(BlockId::from_slice(&bytes))); + } + + Ok(None) + } } pub struct InternalQueueTransaction { @@ -406,6 +324,124 @@ impl InternalQueueTransaction { self.batch.put_cf(&cf, key.to_vec(), self.buffer.as_slice()); } + + pub fn commit_messages>( + &mut self, + snapshot: &InternalQueueSnapshot, + ranges: I, + ) -> Result<()> { + let db = self.db.rocksdb().as_ref(); + + let mut commit_range = |source_iter: &mut DBRawIterator<'_>, + from_key: &[u8], + to_key: &[u8], + source_cf: &BoundedCfHandle<'_>, + target_cf: &BoundedCfHandle<'_>| { + source_iter.seek(from_key); + + loop { + let (key, value) = match source_iter.item() { + Some(item) => item, + None => return source_iter.status(), + }; + + if key > to_key { + break; + } + + self.batch.delete_cf(source_cf, key); + self.batch.put_cf(target_cf, key, value); + + source_iter.next(); + } + + Ok(()) + }; + + let messages = &self.db.shard_internal_messages; + let messages_cf = &messages.cf(); + + let uncommited_messages = &self.db.shard_internal_messages_uncommitted; + let uncommited_messages_cf = &uncommited_messages.cf(); + + let mut uncommited_messages_iter = { + let mut readopts = uncommited_messages.new_read_config(); + readopts.set_snapshot(&snapshot.snapshot); + db.raw_iterator_cf_opt(uncommited_messages_cf, readopts) + }; + + let stats = &self.db.internal_message_stats; + let stats_cf = &stats.cf(); + + let uncommited_stats = &self.db.internal_message_stats_uncommitted; + let uncommited_stats_cf = &uncommited_stats.cf(); + + let mut uncommited_stats_iter = { + let mut readopts = uncommited_stats.new_read_config(); + readopts.set_snapshot(&snapshot.snapshot); + db.raw_iterator_cf_opt(uncommited_stats_cf, readopts) + }; + + for range in ranges { + // Commit messages for one range + let from_message_key = ShardsInternalMessagesKey { + partition: range.partition, + shard_ident: range.shard_ident, + internal_message_key: range.from, + }; + let to_message_key = ShardsInternalMessagesKey { + partition: range.partition, + shard_ident: range.shard_ident, + internal_message_key: range.to, + }; + + commit_range( + &mut uncommited_messages_iter, + &from_message_key.to_vec(), + &to_message_key.to_vec(), + uncommited_messages_cf, + messages_cf, + )?; + + // Commit stats for one range + let from_stat_key = StatKey { + shard_ident: range.shard_ident, + partition: range.partition, + min_message: range.from, + max_message: QueueKey::MIN, + dest: RouterAddr::MIN, + }; + let to_stat_key = StatKey { + shard_ident: range.shard_ident, + partition: range.partition, + min_message: range.to, + max_message: QueueKey::MAX, + dest: RouterAddr::MAX, + }; + + commit_range( + &mut uncommited_stats_iter, + &from_stat_key.to_vec(), + &to_stat_key.to_vec(), + uncommited_stats_cf, + stats_cf, + )?; + } + + Ok(()) + } + + /// Stores the queue version in the `internal_message_version` column family under the key `mc_version` + pub fn set_last_applied_mc_block_id(&mut self, mc_block_id: &BlockId) { + // Retrieve the column family handle for "internal_message_version" + let cf = self.db.internal_message_var.cf(); + // Convert the version into a little-endian byte array and store it + self.batch.put_cf( + &cf, + INT_QUEUE_LAST_APPLIED_MC_BLOCK_ID_KEY, + mc_block_id.to_vec(), + ); + } } pub struct InternalQueueSnapshot {