Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(collator): int queue version #599

Open
wants to merge 4 commits into
base: feat/imp-sync-long-tail
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file added 11.log
Empty file.
2 changes: 1 addition & 1 deletion block-util/src/queue/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ pub struct QueueKey {
}

impl QueueKey {
const SIZE_HINT: usize = 8 + 32;
pub const SIZE_HINT: usize = 8 + 32;

pub const MIN: Self = Self {
lt: 0,
Expand Down
43 changes: 9 additions & 34 deletions collator/src/collator/do_collate/finalize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,16 @@ use crate::collator::types::{
BlockCollationData, ExecuteResult, FinalizeBlockResult, FinalizeMessagesReaderResult,
PreparedInMsg, PreparedOutMsg,
};
use crate::internal_queue::types::EnqueuedMessage;
use crate::internal_queue::types::{
EnqueuedMessage
};
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
use crate::types::processed_upto::{ProcessedUptoInfoExtension, ProcessedUptoInfoStuff};
use crate::types::{BlockCandidate, CollationSessionInfo, CollatorConfig, McData, ShardHashesExt};
use crate::types::{
BlockCandidate, CollationSessionInfo, CollatorConfig, McData,
ShardHashesExt,
};
use crate::utils::block::detect_top_processed_to_anchor;

pub struct FinalizeState {
Expand Down Expand Up @@ -69,36 +74,6 @@ impl Phase<FinalizeState> {
.cloned()
.unwrap_or_default();

// getting top shard blocks
let top_shard_blocks = if self.state.collation_data.block_id_short.is_masterchain() {
self.state
.collation_data
.top_shard_blocks
.iter()
.map(|b| (b.block_id.shard, b.block_id.seqno))
.collect()
} else {
let mut top_blocks: FastHashMap<ShardIdent, u32> = self
.state
.mc_data
.shards
.iter()
.filter(|(shard, descr)| {
descr.top_sc_block_updated && shard != &self.state.shard_id
})
.map(|(shard_ident, descr)| (*shard_ident, descr.seqno))
.collect();

top_blocks.insert(
self.state.mc_data.block_id.shard,
self.state.mc_data.block_id.seqno,
);

top_blocks
};

let diffs = mq_adapter.get_diffs(top_shard_blocks);

// get queue diff and check for pending internals
let create_queue_diff_elapsed;
let FinalizedMessagesReader {
Expand All @@ -111,8 +86,8 @@ impl Phase<FinalizeState> {
"tycho_do_collate_create_queue_diff_time",
&labels,
);
let finalize_message_reader_res =
messages_reader.finalize(self.extra.executor.min_next_lt(), diffs)?;
let finalize_message_reader_res = messages_reader
.finalize(self.extra.executor.min_next_lt(), &self.state.diffs_info)?;
create_queue_diff_elapsed = histogram_create_queue_diff.finish();
finalize_message_reader_res
};
Expand Down
102 changes: 92 additions & 10 deletions collator/src/collator/do_collate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use humantime::format_duration;
use phase::{ActualState, Phase};
use prepare::PrepareState;
use tycho_block_util::config::{apply_price_factor, compute_gas_price_factor};
use tycho_block_util::queue::QueueKey;
use tycho_block_util::queue::{QueueDiffStuff, QueueKey};
use tycho_block_util::state::MinRefMcStateTracker;
use tycho_storage::{NewBlockMeta, StoreStateHint};
use tycho_util::futures::JoinTask;
Expand All @@ -26,13 +26,16 @@ use super::types::{
use super::{CollatorStdImpl, ForceMasterCollation};
use crate::collator::do_collate::finalize::FinalizeBlockContext;
use crate::collator::types::RandSeed;
use crate::internal_queue::types::EnqueuedMessage;
use crate::internal_queue::types::{
DiffStatistics, EnqueuedMessage, PartitionRouter, QueueShardRange,
};
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
use crate::types::{
BlockCollationResult, BlockIdExt, CollationSessionInfo, CollatorConfig,
DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, ProcessedTo, ShardDescriptionShort,
TopBlockDescription, TopShardBlockInfo,
DisplayBlockIdsIntoIter, DisplayBlockIdsIter, McData, ProcessedTo,
ShardDescriptionExt as OtherShardDescriptionExt, ShardDescriptionShort, TopBlockDescription,
TopShardBlockInfo,
};

#[cfg(test)]
Expand Down Expand Up @@ -110,12 +113,82 @@ impl CollatorStdImpl {
)?;

let anchors_cache = std::mem::take(&mut self.anchors_cache);

let mq_adapter = self.mq_adapter.clone();
let state_node_adapter = self.state_node_adapter.clone();

// getting top shard blocks
let top_shard_blocks = if collation_data.block_id_short.is_masterchain() {
collation_data
.top_shard_blocks
.iter()
.map(|b| b.block_id)
.collect()
} else {
let mut top_blocks: Vec<BlockId> = mc_data
.shards
.iter()
.filter(|(shard, descr)| descr.top_sc_block_updated && shard != &self.shard_id)
.map(|(shard_ident, descr)| descr.get_block_id(*shard_ident))
.collect();

top_blocks.push(mc_data.block_id);

top_blocks
};

let mut diffs_info = FastHashMap::default();

for top_shard_block in top_shard_blocks.iter() {
if top_shard_block.seqno == 0 {
continue;
}
let diff: QueueDiffStuff = state_node_adapter
.load_diff(&top_shard_block)
.await?
.ok_or_else(|| anyhow!("Top shard block diff not found. block{top_shard_block:?}"))?;
let partition_router = PartitionRouter::with_partitions(
&diff.diff().router_partitions_src,
&diff.diff().router_partitions_dst,
);

// TODO use dynamic shards
let range_mc = QueueShardRange {
shard_ident: ShardIdent::MASTERCHAIN,
from: diff.diff().min_message,
to: diff.diff().max_message,
};

let range_shard = QueueShardRange {
shard_ident: ShardIdent::new_full(0),
from: diff.diff().min_message,
to: diff.diff().max_message,
};

let ranges = vec![range_mc, range_shard];
let queue_statistic = mq_adapter.get_statistics(0, &ranges)?;

let mut diff_statistics = FastHashMap::default();
diff_statistics.insert(0, queue_statistic.statistics().clone());

let diff_statistic = DiffStatistics::new(
top_shard_block.shard,
diff.diff().min_message,
diff.diff().max_message,
diff_statistics,
queue_statistic.shard_messages_count(),
);

diffs_info.insert(top_shard_block.shard, (partition_router, diff_statistic));
}

let state = Box::new(ActualState {
collation_config,
collation_data,
mc_data,
prev_shard_data,
shard_id: self.shard_id,
diffs_info,
});

let CollationResult {
Expand All @@ -127,7 +200,7 @@ impl CollatorStdImpl {
} = tycho_util::sync::rayon_run_fifo({
let collation_session = self.collation_session.clone();
let config = self.config.clone();
let mq_adapter = self.mq_adapter.clone();

let span = tracing::Span::current();
move || {
let _span = span.enter();
Expand Down Expand Up @@ -238,6 +311,7 @@ impl CollatorStdImpl {
let labels = [("workchain", shard_id.workchain().to_string())];
let mc_data = state.mc_data.clone();

let block_id = state.collation_data.block_id_short.clone();
// prepare execution
let histogram_prepare =
HistogramGuard::begin_with_labels("tycho_do_collate_prepare_time", &labels);
Expand Down Expand Up @@ -346,11 +420,19 @@ impl CollatorStdImpl {
&mc_data.shards_processed_to,
);

// trim outdated diffs and calc queue diffs tail lenght
if let Some(value) = min_processed_to {
mq_adapter.trim_diffs(&shard_id, &value)?;
};
let diff_tail_len = mq_adapter.get_diffs_count_by_shard(&shard_id) as u32 + 1;
let diff_tail_len = mq_adapter.get_diffs_tail_len(
&shard_id,
&min_processed_to.unwrap_or_default().next_value(),
) + 1;

tracing::info!(target: "local_debug",
"block tail len {:?} {} {:?}",
block_id,
diff_tail_len,
min_processed_to
);

// let diff_tail_len = 1;

let span = tracing::Span::current();
let (finalize_phase_result, update_queue_task_result) = rayon::join(
Expand Down
3 changes: 3 additions & 0 deletions collator/src/collator/do_collate/phase.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::sync::Arc;

use everscale_types::models::{CollationConfig, ShardIdent};
use tycho_util::FastHashMap;

use super::{BlockCollationData, PrevData};
use crate::internal_queue::types::{DiffStatistics, PartitionRouter};
use crate::types::McData;

pub struct Phase<S: PhaseState> {
Expand All @@ -18,4 +20,5 @@ pub struct ActualState {
pub mc_data: Arc<McData>,
pub prev_shard_data: PrevData,
pub shard_id: ShardIdent,
pub diffs_info: FastHashMap<ShardIdent, (PartitionRouter, DiffStatistics)>,
}
22 changes: 10 additions & 12 deletions collator/src/collator/messages_reader/internals_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ impl InternalsPartitionReader {
while current_block_seqno < self.block_seqno {
let diff = self
.mq_adapter
.get_diff(self.for_shard_id, current_block_seqno)
.get_diff(&self.for_shard_id, current_block_seqno)?
.ok_or_else(|| {
anyhow!(
"cannot get diff for block {}:{}",
Expand All @@ -425,9 +425,7 @@ impl InternalsPartitionReader {
)
})?;

messages_count += diff
.statistics()
.get_messages_count_by_shard(&self.for_shard_id);
messages_count += diff.get_messages_count_by_shard(&self.for_shard_id);

if messages_count > max_messages as u64 {
break;
Expand All @@ -446,14 +444,14 @@ impl InternalsPartitionReader {

let shard_range_to = if shard_id == self.for_shard_id {
if range_seqno != self.block_seqno {
let diff =
self.mq_adapter
.get_diff(shard_id, range_seqno)
.ok_or_else(|| {
anyhow!("cannot get diff for block {shard_id}:{range_seqno}")
})?;

*diff.max_message()
let diff = self
.mq_adapter
.get_diff(&shard_id, range_seqno)?
.ok_or_else(|| {
anyhow!("cannot get diff for block {shard_id}:{range_seqno}")
})?;

diff.max_message
} else {
QueueKey::max_for_lt(self.prev_state_gen_lt)
}
Expand Down
20 changes: 9 additions & 11 deletions collator/src/collator/messages_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use anyhow::{Context, Result};
use everscale_types::cell::HashBytes;
use everscale_types::models::{MsgsExecutionParams, ShardIdent};
use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
use tycho_util::FastHashSet;
use tycho_util::{FastHashMap, FastHashSet};

use self::externals_reader::*;
use self::internals_reader::*;
Expand All @@ -15,9 +15,8 @@ pub(super) use self::reader_state::*;
use super::messages_buffer::{DisplayMessageGroup, MessageGroup, MessagesBufferLimits};
use super::types::{AnchorsCache, MsgsExecutionParamsExtension};
use crate::collator::messages_buffer::DebugMessageGroup;
use crate::internal_queue::queue::ShortQueueDiff;
use crate::internal_queue::types::{
EnqueuedMessage, PartitionRouter, QueueDiffWithMessages, QueueStatistics,
DiffStatistics, EnqueuedMessage, PartitionRouter, QueueDiffWithMessages, QueueStatistics,
};
use crate::queue_adapter::MessageQueueAdapter;
use crate::tracing_targets;
Expand Down Expand Up @@ -367,7 +366,7 @@ impl MessagesReader {
pub fn finalize(
mut self,
current_next_lt: u64,
diffs: Vec<(ShardIdent, ShortQueueDiff)>,
diffs_info: &FastHashMap<ShardIdent, (PartitionRouter, DiffStatistics)>,
) -> Result<FinalizedMessagesReader> {
let mut has_unprocessed_messages = self.has_messages_in_buffers()
|| self.has_pending_new_messages()
Expand Down Expand Up @@ -435,7 +434,7 @@ impl MessagesReader {
&mut queue_diff_with_msgs.partition_router,
aggregated_stats,
self.for_shard_id,
diffs,
diffs_info,
)?;

// metrics: accounts count in isolated partitions
Expand Down Expand Up @@ -498,7 +497,7 @@ impl MessagesReader {
partition_router: &mut PartitionRouter,
aggregated_stats: QueueStatistics,
for_shard_id: ShardIdent,
top_block_diffs: Vec<(ShardIdent, ShortQueueDiff)>,
diffs_info: &FastHashMap<ShardIdent, (PartitionRouter, DiffStatistics)>,
) -> Result<FastHashSet<HashBytes>> {
let par_0_msgs_count_limit = msgs_exec_params.par_0_int_msgs_count_limit as u64;
let mut moved_from_par_0_accounts = FastHashSet::default();
Expand Down Expand Up @@ -531,7 +530,7 @@ impl MessagesReader {
dest_int_address,
);
// if we have account for another shard then take info from that shard
let acc_shard_diff_info = top_block_diffs
let acc_shard_diff_info = diffs_info
.iter()
.find(|(shard_id, _)| shard_id.contains_address(&dest_int_address))
.map(|(_, diff)| diff);
Expand All @@ -546,14 +545,13 @@ impl MessagesReader {
);
msgs_count
}
Some(diff) => {
Some((router, statistics)) => {
tracing::trace!(target: tracing_targets::COLLATOR,
"use diff for address {} because we have diff",
dest_int_address,
);
// getting remote shard partition from diff
let remote_shard_partition =
diff.router().get_partition(None, &dest_int_address);
let remote_shard_partition = router.get_partition(None, &dest_int_address);

tracing::trace!(target: tracing_targets::COLLATOR,
"remote shard partition for address {} is {}",
Expand All @@ -571,7 +569,7 @@ impl MessagesReader {
}

// if remote partition == 0 then we need to check statistics
let remote_msgs_count = match diff.statistics().partition(0) {
let remote_msgs_count = match statistics.partition(0) {
None => {
tracing::trace!(target: tracing_targets::COLLATOR,
"use aggregated stats for address {} because we do not have partition 0 stats in diff",
Expand Down
3 changes: 2 additions & 1 deletion collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ mod messages_reader;
mod types;

pub use error::CollationCancelReason;
use tycho_util::FastHashMap;
pub use types::ForceMasterCollation;

#[cfg(test)]
Expand Down Expand Up @@ -1315,7 +1316,7 @@ impl CollatorStdImpl {
mut reader_state, ..
} = messages_reader.finalize(
0, // can pass 0 because new messages reader was not initialized in this case
vec![],
&FastHashMap::default(),
)?;
std::mem::swap(&mut working_state.reader_state, &mut reader_state);

Expand Down
Loading
Loading