Skip to content

Commit

Permalink
feature(collator): persistent cache for diffs
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Feb 23, 2025
1 parent 1bc4ef4 commit 210c733
Show file tree
Hide file tree
Showing 19 changed files with 905 additions and 195 deletions.
Empty file added 11.log
Empty file.
2 changes: 1 addition & 1 deletion block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub struct QueueKey {
}

impl QueueKey {
const SIZE_HINT: usize = 8 + 32;
pub const SIZE_HINT: usize = 8 + 32;

pub const MIN: Self = Self {
lt: 0,
Expand Down
43 changes: 9 additions & 34 deletions collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ use crate::collator::types::{
BlockCollationData, ExecuteResult, FinalizeBlockResult, FinalizeMessagesReaderResult,
PreparedInMsg, PreparedOutMsg,
};
use crate::internal_queue::types::EnqueuedMessage;
use crate::internal_queue::types::{
EnqueuedMessage
};
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
use crate::types::processed_upto::{ProcessedUptoInfoExtension, ProcessedUptoInfoStuff};
use crate::types::{BlockCandidate, CollationSessionInfo, CollatorConfig, McData, ShardHashesExt};
use crate::types::{
BlockCandidate, CollationSessionInfo, CollatorConfig, McData,
ShardHashesExt,
};
use crate::utils::block::detect_top_processed_to_anchor;

pub struct FinalizeState {
Expand Down Expand Up @@ -69,36 +74,6 @@ impl Phase<FinalizeState> {
.cloned()
.unwrap_or_default();

// getting top shard blocks
let top_shard_blocks = if self.state.collation_data.block_id_short.is_masterchain() {
self.state
.collation_data
.top_shard_blocks
.iter()
.map(|b| (b.block_id.shard, b.block_id.seqno))
.collect()
} else {
let mut top_blocks: FastHashMap<ShardIdent, u32> = self
.state
.mc_data
.shards
.iter()
.filter(|(shard, descr)| {
descr.top_sc_block_updated && shard != &self.state.shard_id
})
.map(|(shard_ident, descr)| (*shard_ident, descr.seqno))
.collect();

top_blocks.insert(
self.state.mc_data.block_id.shard,
self.state.mc_data.block_id.seqno,
);

top_blocks
};

let diffs = mq_adapter.get_diffs(top_shard_blocks);

// get queue diff and check for pending internals
let create_queue_diff_elapsed;
let FinalizedMessagesReader {
Expand All @@ -111,8 +86,8 @@ impl Phase<FinalizeState> {
"tycho_do_collate_create_queue_diff_time",
&labels,
);
let finalize_message_reader_res =
messages_reader.finalize(self.extra.executor.min_next_lt(), diffs)?;
let finalize_message_reader_res = messages_reader
.finalize(self.extra.executor.min_next_lt(), &self.state.diffs_info)?;
create_queue_diff_elapsed = histogram_create_queue_diff.finish();
finalize_message_reader_res
};
Expand Down
93 changes: 88 additions & 5 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use humantime::format_duration;
use phase::{ActualState, Phase};
use prepare::PrepareState;
use tycho_block_util::config::{apply_price_factor, compute_gas_price_factor};
use tycho_block_util::queue::QueueKey;
use tycho_block_util::queue::{QueueDiffStuff, QueueKey};
use tycho_block_util::state::MinRefMcStateTracker;
use tycho_storage::{NewBlockMeta, StoreStateHint};
use tycho_util::futures::JoinTask;
Expand All @@ -26,13 +26,16 @@ use super::types::{
use super::{CollatorStdImpl, ForceMasterCollation};
use crate::collator::do_collate::finalize::FinalizeBlockContext;
use crate::collator::types::RandSeed;
use crate::internal_queue::types::EnqueuedMessage;
use crate::internal_queue::types::{
DiffStatistics, EnqueuedMessage, PartitionRouter, QueueShardRange,
};
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
use crate::types::{
BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig,
DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, ProcessedTo, ShardDescriptionShort,
TopBlockDescription, TopShardBlockInfo,
DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, ProcessedTo,
ShardDescriptionExt as OtherShardDescriptionExt, ShardDescriptionShort, TopBlockDescription,
TopShardBlockInfo,
};

#[cfg(test)]
Expand Down Expand Up @@ -110,12 +113,82 @@ impl CollatorStdImpl {
)?;

let anchors_cache = std::mem::take(&mut self.anchors_cache);

let mq_adapter = self.mq_adapter.clone();
let state_node_adapter = self.state_node_adapter.clone();

// getting top shard blocks
let top_shard_blocks = if collation_data.block_id_short.is_masterchain() {
collation_data
.top_shard_blocks
.iter()
.map(|b| b.block_id)
.collect()
} else {
let mut top_blocks: Vec<BlockId> = mc_data
.shards
.iter()
.filter(|(shard, descr)| descr.top_sc_block_updated && shard != &self.shard_id)
.map(|(shard_ident, descr)| descr.get_block_id(*shard_ident))
.collect();

top_blocks.push(mc_data.block_id);

top_blocks
};

let mut diffs_info = FastHashMap::default();

for top_shard_block in top_shard_blocks.iter() {
if top_shard_block.seqno == 0 {
continue;
}
let diff: QueueDiffStuff = state_node_adapter
.load_diff(&top_shard_block)
.await?
.ok_or_else(|| anyhow!("Top shard block diff not found. block{top_shard_block:?}"))?;
let partition_router = PartitionRouter::with_partitions(
&diff.diff().router_partitions_src,
&diff.diff().router_partitions_dst,
);

// TODO use dynamic shards
let range_mc = QueueShardRange {
shard_ident: ShardIdent::MASTERCHAIN,
from: diff.diff().min_message,
to: diff.diff().max_message,
};

let range_shard = QueueShardRange {
shard_ident: ShardIdent::new_full(0),
from: diff.diff().min_message,
to: diff.diff().max_message,
};

let ranges = vec![range_mc, range_shard];
let queue_statistic = mq_adapter.get_statistics(0, &ranges)?;

let mut diff_statistics = FastHashMap::default();
diff_statistics.insert(0, queue_statistic.statistics().clone());

let diff_statistic = DiffStatistics::new(
top_shard_block.shard,
diff.diff().min_message,
diff.diff().max_message,
diff_statistics,
queue_statistic.shard_messages_count(),
);

diffs_info.insert(top_shard_block.shard, (partition_router, diff_statistic));
}

let state = Box::new(ActualState {
collation_config,
collation_data,
mc_data,
prev_shard_data,
shard_id: self.shard_id,
diffs_info,
});

let CollationResult {
Expand All @@ -127,7 +200,7 @@ impl CollatorStdImpl {
} = tycho_util::sync::rayon_run_fifo({
let collation_session = self.collation_session.clone();
let config = self.config.clone();
let mq_adapter = self.mq_adapter.clone();

let span = tracing::Span::current();
move || {
let _span = span.enter();
Expand Down Expand Up @@ -238,6 +311,7 @@ impl CollatorStdImpl {
let labels = [("workchain", shard_id.workchain().to_string())];
let mc_data = state.mc_data.clone();

let block_id = state.collation_data.block_id_short.clone();
// prepare execution
let histogram_prepare =
HistogramGuard::begin_with_labels("tycho_do_collate_prepare_time", &labels);
Expand Down Expand Up @@ -351,6 +425,15 @@ impl CollatorStdImpl {
&min_processed_to.unwrap_or_default().next_value(),
) + 1;

tracing::info!(target: "local_debug",
"block tail len {:?} {} {:?}",
block_id,
diff_tail_len,
min_processed_to
);

// let diff_tail_len = 1;

let span = tracing::Span::current();
let (finalize_phase_result, update_queue_task_result) = rayon::join(
|| {
Expand Down
3 changes: 3 additions & 0 deletions collator/src/collator/do_collate/phase.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::sync::Arc;

use everscale_types::models::{CollationConfig, ShardIdent};
use tycho_util::FastHashMap;

use super::{BlockCollationData, PrevData};
use crate::internal_queue::types::{DiffStatistics, PartitionRouter};
use crate::types::McData;

pub struct Phase<S: PhaseState> {
Expand All @@ -18,4 +20,5 @@ pub struct ActualState {
pub mc_data: Arc<McData>,
pub prev_shard_data: PrevData,
pub shard_id: ShardIdent,
pub diffs_info: FastHashMap<ShardIdent, (PartitionRouter, DiffStatistics)>,
}
22 changes: 10 additions & 12 deletions collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl InternalsPartitionReader {
while current_block_seqno < self.block_seqno {
let diff = self
.mq_adapter
.get_diff(self.for_shard_id, current_block_seqno)
.get_diff(&self.for_shard_id, current_block_seqno)?
.ok_or_else(|| {
anyhow!(
"cannot get diff for block {}:{}",
Expand All @@ -425,9 +425,7 @@ impl InternalsPartitionReader {
)
})?;

messages_count += diff
.statistics()
.get_messages_count_by_shard(&self.for_shard_id);
messages_count += diff.get_messages_count_by_shard(&self.for_shard_id);

if messages_count > max_messages as u64 {
break;
Expand All @@ -446,14 +444,14 @@ impl InternalsPartitionReader {

let shard_range_to = if shard_id == self.for_shard_id {
if range_seqno != self.block_seqno {
let diff =
self.mq_adapter
.get_diff(shard_id, range_seqno)
.ok_or_else(|| {
anyhow!("cannot get diff for block {shard_id}:{range_seqno}")
})?;

*diff.max_message()
let diff = self
.mq_adapter
.get_diff(&shard_id, range_seqno)?
.ok_or_else(|| {
anyhow!("cannot get diff for block {shard_id}:{range_seqno}")
})?;

diff.max_message
} else {
QueueKey::max_for_lt(self.prev_state_gen_lt)
}
Expand Down
20 changes: 9 additions & 11 deletions collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::{Context, Result};
use everscale_types::cell::HashBytes;
use everscale_types::models::{MsgsExecutionParams, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_util::FastHashSet;
use tycho_util::{FastHashMap, FastHashSet};

use self::externals_reader::*;
use self::internals_reader::*;
Expand All @@ -15,9 +15,8 @@ pub(super) use self::reader_state::*;
use super::messages_buffer::{DisplayMessageGroup, MessageGroup, MessagesBufferLimits};
use super::types::{AnchorsCache, MsgsExecutionParamsExtension};
use crate::collator::messages_buffer::DebugMessageGroup;
use crate::internal_queue::queue::ShortQueueDiff;
use crate::internal_queue::types::{
EnqueuedMessage, PartitionRouter, QueueDiffWithMessages, QueueStatistics,
DiffStatistics, EnqueuedMessage, PartitionRouter, QueueDiffWithMessages, QueueStatistics,
};
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
Expand Down Expand Up @@ -367,7 +366,7 @@ impl MessagesReader {
pub fn finalize(
mut self,
current_next_lt: u64,
diffs: Vec<(ShardIdent, ShortQueueDiff)>,
diffs_info: &FastHashMap<ShardIdent, (PartitionRouter, DiffStatistics)>,
) -> Result<FinalizedMessagesReader> {
let mut has_unprocessed_messages = self.has_messages_in_buffers()
|| self.has_pending_new_messages()
Expand Down Expand Up @@ -435,7 +434,7 @@ impl MessagesReader {
&mut queue_diff_with_msgs.partition_router,
aggregated_stats,
self.for_shard_id,
diffs,
diffs_info,
)?;

// metrics: accounts count in isolated partitions
Expand Down Expand Up @@ -498,7 +497,7 @@ impl MessagesReader {
partition_router: &mut PartitionRouter,
aggregated_stats: QueueStatistics,
for_shard_id: ShardIdent,
top_block_diffs: Vec<(ShardIdent, ShortQueueDiff)>,
diffs_info: &FastHashMap<ShardIdent, (PartitionRouter, DiffStatistics)>,
) -> Result<FastHashSet<HashBytes>> {
let par_0_msgs_count_limit = msgs_exec_params.par_0_int_msgs_count_limit as u64;
let mut moved_from_par_0_accounts = FastHashSet::default();
Expand Down Expand Up @@ -531,7 +530,7 @@ impl MessagesReader {
dest_int_address,
);
// if we have account for another shard then take info from that shard
let acc_shard_diff_info = top_block_diffs
let acc_shard_diff_info = diffs_info
.iter()
.find(|(shard_id, _)| shard_id.contains_address(&dest_int_address))
.map(|(_, diff)| diff);
Expand All @@ -546,14 +545,13 @@ impl MessagesReader {
);
msgs_count
}
Some(diff) => {
Some((router, statistics)) => {
tracing::trace!(target: tracing_targets::COLLATOR,
"use diff for address {} because we have diff",
dest_int_address,
);
// getting remote shard partition from diff
let remote_shard_partition =
diff.router().get_partition(None, &dest_int_address);
let remote_shard_partition = router.get_partition(None, &dest_int_address);

tracing::trace!(target: tracing_targets::COLLATOR,
"remote shard partition for address {} is {}",
Expand All @@ -571,7 +569,7 @@ impl MessagesReader {
}

// if remote partition == 0 then we need to check statistics
let remote_msgs_count = match diff.statistics().partition(0) {
let remote_msgs_count = match statistics.partition(0) {
None => {
tracing::trace!(target: tracing_targets::COLLATOR,
"use aggregated stats for address {} because we do not have partition 0 stats in diff",
Expand Down
3 changes: 2 additions & 1 deletion collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod messages_reader;
mod types;

pub use error::CollationCancelReason;
use tycho_util::FastHashMap;
pub use types::ForceMasterCollation;

#[cfg(test)]
Expand Down Expand Up @@ -1315,7 +1316,7 @@ impl CollatorStdImpl {
mut reader_state, ..
} = messages_reader.finalize(
0, // can pass 0 because new messages reader was not initialized in this case
vec![],
&FastHashMap::default(),
)?;
std::mem::swap(&mut working_state.reader_state, &mut reader_state);

Expand Down
Loading

0 comments on commit 210c733

Please sign in to comment.