Skip to content

Commit

Permalink
feature(collator): int queue version
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Feb 12, 2025
1 parent 164d06b commit 52f38bd
Show file tree
Hide file tree
Showing 10 changed files with 348 additions and 166 deletions.
45 changes: 27 additions & 18 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;

use anyhow::{bail, Result};
use anyhow::{anyhow, bail, Result};
use everscale_types::cell::HashBytes;
use everscale_types::models::{BlockIdShort, ShardIdent};
use everscale_types::models::{BlockId, BlockIdShort, ShardIdent};
use serde::{Deserialize, Serialize};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_util::metrics::HistogramGuard;
Expand Down Expand Up @@ -91,7 +91,7 @@ where
max_message: QueueKey,
) -> Result<()>;
/// Move messages from uncommitted state to committed state and update gc ranges
fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()>;
fn commit_diff(&self, mc_top_blocks: &[(BlockId, bool)]) -> Result<()>;
/// remove all data in uncommitted state storage
fn clear_uncommitted_state(&self) -> Result<()>;
/// Returns the number of diffs in cache for the given shard
Expand All @@ -110,6 +110,8 @@ where
fn get_diff(&self, shard_ident: ShardIdent, seqno: u32) -> Option<ShortQueueDiff>;
/// Check if diff exists in the cache
fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool;
/// Get last applied mc block id from committed state
fn get_last_applied_mc_block_id(&self) -> Result<Option<BlockId>>;
}

// IMPLEMENTATION
Expand Down Expand Up @@ -309,29 +311,33 @@ where
Ok(())
}

fn commit_diff(&self, mc_top_blocks: &[(BlockIdShort, bool)]) -> Result<()> {
fn commit_diff(&self, mc_top_blocks: &[(BlockId, bool)]) -> Result<()> {
let mc_block_id = mc_top_blocks
.iter()
.find(|(block_id, _)| block_id.is_masterchain())
.map(|(block_id, _)| block_id)
.ok_or_else(|| anyhow!("Masterchain block not found in commit_diff"))?;

let mut partitions = FastHashSet::default();
// insert default partition because we doesn't store it in router
partitions.insert(QueuePartitionIdx::default());
let mut shards_to_commit = FastHashMap::default();
let mut gc_ranges = FastHashMap::default();

for (block_id_short, top_shard_block_changed) in mc_top_blocks {
for (block_id, top_shard_block_changed) in mc_top_blocks {
let mut diffs_to_commit = vec![];

// find all uncommited diffs for the given shard top block
let prev_shard_uncommitted_diffs =
self.uncommitted_diffs.get_mut(&block_id_short.shard);
let prev_shard_uncommitted_diffs = self.uncommitted_diffs.get_mut(&block_id.shard);

if let Some(mut shard_uncommitted_diffs) = prev_shard_uncommitted_diffs {
// iterate over all uncommitted diffs for the given shard until the top block seqno
shard_uncommitted_diffs
.range(..=block_id_short.seqno)
.for_each(|(block_seqno, shard_diff)| {
shard_uncommitted_diffs.range(..=block_id.seqno).for_each(
|(block_seqno, shard_diff)| {
diffs_to_commit.push(*block_seqno);

let current_last_key = shards_to_commit
.entry(block_id_short.shard)
.entry(block_id.shard)
.or_insert_with(|| *shard_diff.max_message());

// Add all partitions from the router to the partitions set
Expand All @@ -346,7 +352,7 @@ where
}

// find min processed_to for each shard for GC
if *block_seqno == block_id_short.seqno && *top_shard_block_changed {
if *block_seqno == block_id.seqno && *top_shard_block_changed {
for (shard_ident, processed_to_key) in shard_diff.processed_to().iter()
{
let last_key = gc_ranges
Expand All @@ -358,16 +364,15 @@ where
}
}
}
});
},
);

// remove all diffs from uncommitted state that are going to be committed
for seqno in diffs_to_commit {
if let Some(diff) = shard_uncommitted_diffs.remove(&seqno) {
// Move the diff to committed_diffs
let mut shard_committed_diffs = self
.committed_diffs
.entry(block_id_short.shard)
.or_default();
let mut shard_committed_diffs =
self.committed_diffs.entry(block_id.shard).or_default();
shard_committed_diffs.insert(seqno, diff);
}
}
Expand All @@ -385,7 +390,7 @@ where

// move all uncommitted diffs messages to committed state
self.uncommitted_state
.commit(partitions.clone(), &commit_ranges)?;
.commit(partitions.clone(), &commit_ranges, mc_block_id)?;

let uncommitted_diffs_count: usize =
self.uncommitted_diffs.iter().map(|r| r.value().len()).sum();
Expand Down Expand Up @@ -515,4 +520,8 @@ where
.get(&block_id_short.shard)
.is_some_and(|diffs| diffs.contains_key(&block_id_short.seqno))
}

fn get_last_applied_mc_block_id(&self) -> Result<Option<BlockId>> {
self.committed_state.get_last_applied_mc_block_id()
}
}
11 changes: 10 additions & 1 deletion collator/src/internal_queue/state/commited_state.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use anyhow::Result;
use everscale_types::models::{IntAddr, ShardIdent};
use everscale_types::models::{BlockId, IntAddr, ShardIdent};
use tycho_block_util::queue::QueuePartitionIdx;
use tycho_storage::model::ShardsInternalMessagesKey;
use tycho_storage::{InternalQueueSnapshot, Storage};
Expand Down Expand Up @@ -80,6 +80,9 @@ pub trait CommittedState<V: InternalMessageValue>: Send + Sync {
partition: QueuePartitionIdx,
range: &[QueueShardRange],
) -> Result<()>;

/// Get last applied mc block id
fn get_last_applied_mc_block_id(&self) -> Result<Option<BlockId>>;
}

// IMPLEMENTATION
Expand Down Expand Up @@ -158,4 +161,10 @@ impl<V: InternalMessageValue> CommittedState<V> for CommittedStateStdImpl {

Ok(())
}

fn get_last_applied_mc_block_id(&self) -> Result<Option<BlockId>> {
self.storage
.internal_queue_storage()
.get_last_applied_mc_block_id()
}
}
11 changes: 9 additions & 2 deletions collator/src/internal_queue/state/uncommitted_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use std::sync::Arc;

use anyhow::Result;
use everscale_types::models::{IntAddr, ShardIdent};
use everscale_types::models::{BlockId, IntAddr, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx, RouterAddr};
use tycho_storage::model::{QueueRange, ShardsInternalMessagesKey, StatKey};
use tycho_storage::{InternalQueueSnapshot, InternalQueueTransaction, Storage};
Expand Down Expand Up @@ -77,6 +77,7 @@ pub trait LocalUncommittedState<V: InternalMessageValue> {
&self,
partitions: FastHashSet<QueuePartitionIdx>,
ranges: &[QueueShardRange],
mc_block_id: &BlockId,
) -> Result<()>;

/// Delete all uncommitted messages and statistics
Expand Down Expand Up @@ -143,6 +144,7 @@ impl<V: InternalMessageValue> UncommittedState<V> for UncommittedStateStdImpl {
&self,
partitions: FastHashSet<QueuePartitionIdx>,
ranges: &[QueueShardRange],
mc_block_id: &BlockId,
) -> Result<()> {
let ranges = partitions.iter().flat_map(|&partition| {
ranges.iter().map(move |range| QueueRange {
Expand All @@ -152,8 +154,13 @@ impl<V: InternalMessageValue> UncommittedState<V> for UncommittedStateStdImpl {
to: range.to,
})
});
let snapshot = self.storage.internal_queue_storage().make_snapshot();
let mut tx = self.storage.internal_queue_storage().begin_transaction();

tx.commit_messages(&snapshot, ranges)?;
tx.set_last_applied_mc_block_id(mc_block_id);

self.storage.internal_queue_storage().commit(ranges)
tx.write()
}

fn truncate(&self) -> Result<()> {
Expand Down
16 changes: 8 additions & 8 deletions collator/src/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -473,9 +473,9 @@ where

let mut top_blocks: Vec<_> = top_shard_blocks_info
.iter()
.map(|(id, updated)| (id.as_short_id(), *updated))
.map(|(id, updated)| (*id, *updated))
.collect();
top_blocks.push((block_id.as_short_id(), true));
top_blocks.push((*block_id, true));

if let Err(err) = self.mq_adapter.commit_diff(top_blocks) {
bail!(
Expand Down Expand Up @@ -1442,7 +1442,7 @@ where

// collect top blocks queue diffs already applied to
let queue_diffs_applied_to_top_blocks = if let Some(applied_to_mc_block_id) =
self.get_queue_diffs_applied_to_mc_block_id(last_collated_mc_block_id)
self.get_queue_diffs_applied_to_mc_block_id(last_collated_mc_block_id)?
{
self.get_top_blocks_seqno(&applied_to_mc_block_id).await?
} else {
Expand Down Expand Up @@ -1702,18 +1702,18 @@ where
fn get_queue_diffs_applied_to_mc_block_id(
&self,
last_collated_mc_block_id: Option<BlockId>,
) -> Option<BlockId> {
) -> Result<Option<BlockId>> {
let last_processed_mc_block_id = *self.last_processed_mc_block_id.lock();
match (last_processed_mc_block_id, last_collated_mc_block_id) {
(Some(last_processed), Some(last_collated)) => {
if last_processed.seqno > last_collated.seqno {
Some(last_processed)
Ok(Some(last_processed))
} else {
Some(last_collated)
Ok(Some(last_collated))
}
}
(Some(mc_block_id), _) | (_, Some(mc_block_id)) => Some(mc_block_id),
_ => None,
(Some(mc_block_id), _) | (_, Some(mc_block_id)) => Ok(Some(mc_block_id)),
_ => self.mq_adapter.get_last_applied_mc_block_id(),
}
}

Expand Down
12 changes: 9 additions & 3 deletions collator/src/queue_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use anyhow::Result;
use everscale_types::cell::HashBytes;
use everscale_types::models::{BlockIdShort, ShardIdent};
use everscale_types::models::{BlockId, BlockIdShort, ShardIdent};
use tracing::instrument;
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_util::metrics::HistogramGuard;
Expand Down Expand Up @@ -53,7 +53,7 @@ where

/// Commit previously applied diff, saving changes to committed state (waiting for the operation to complete).
/// Return `None` if specified diff does not exist.
fn commit_diff(&self, mc_top_blocks: Vec<(BlockIdShort, bool)>) -> Result<()>;
fn commit_diff(&self, mc_top_blocks: Vec<(BlockId, bool)>) -> Result<()>;

/// Add new messages to the iterator
fn add_message_to_iterator(
Expand All @@ -80,6 +80,8 @@ where
fn get_diffs_count_by_shard(&self, shard_ident: &ShardIdent) -> usize;
/// Check if diff exists in the cache
fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool;
/// Get last applied mc block id from committed state
fn get_last_applied_mc_block_id(&self) -> Result<Option<BlockId>>;
}

impl<V: InternalMessageValue> MessageQueueAdapterStdImpl<V> {
Expand Down Expand Up @@ -157,7 +159,7 @@ impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdI
Ok(())
}

fn commit_diff(&self, mc_top_blocks: Vec<(BlockIdShort, bool)>) -> Result<()> {
fn commit_diff(&self, mc_top_blocks: Vec<(BlockId, bool)>) -> Result<()> {
let time = std::time::Instant::now();

self.queue.commit_diff(&mc_top_blocks)?;
Expand Down Expand Up @@ -223,4 +225,8 @@ impl<V: InternalMessageValue> MessageQueueAdapter<V> for MessageQueueAdapterStdI
fn is_diff_exists(&self, block_id_short: &BlockIdShort) -> bool {
self.queue.is_diff_exists(block_id_short)
}

fn get_last_applied_mc_block_id(&self) -> Result<Option<BlockId>> {
self.queue.get_last_applied_mc_block_id()
}
}
Loading

0 comments on commit 52f38bd

Please sign in to comment.