Skip to content

Commit

Permalink
Merge branch 'feature/int-queue-session-rocksdb'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Jul 3, 2024
2 parents 69ff05b + 4bf8af8 commit 6dec9bb
Show file tree
Hide file tree
Showing 33 changed files with 1,453 additions and 1,397 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ use tracing_subscriber::Layer;
use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
use tycho_collator::collator::CollatorStdImplFactory;
use tycho_collator::internal_queue::queue::{QueueFactory, QueueFactoryStdImpl};
use tycho_collator::internal_queue::state::persistent::persistent_state::PersistentStateImplFactory;
use tycho_collator::internal_queue::state::session::session_state::SessionStateImplFactory;
use tycho_collator::internal_queue::state::persistent_state::PersistentStateImplFactory;
use tycho_collator::internal_queue::state::session_state::SessionStateImplFactory;
use tycho_collator::manager::CollationManager;
use tycho_collator::mempool::MempoolAdapterStdImpl;
use tycho_collator::queue_adapter::MessageQueueAdapterStdImpl;
Expand Down Expand Up @@ -589,8 +589,7 @@ impl Node {
// Create collator
tracing::info!("starting collator");

let shards = vec![];
let session_state_factory = SessionStateImplFactory::new(shards);
let session_state_factory = SessionStateImplFactory::new(self.storage.clone());
let persistent_state_factory = PersistentStateImplFactory::new(self.storage.clone());

let queue_factory = QueueFactoryStdImpl {
Expand Down
3 changes: 1 addition & 2 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,11 @@ trait-variant = { workspace = true }
weedb = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true, features = ["preserve_order"] }

rayon = { workspace = true }
everscale-types = { workspace = true }
everscale-crypto = { workspace = true }

# local deps
#tycho-consensus = { workspace = true }
tycho-block-util = { workspace = true }
tycho-consensus = { workspace = true }
tycho-network = { workspace = true }
Expand Down
30 changes: 19 additions & 11 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl CollatorStdImpl {
block_limits
);

// TODO: get from anchor
let created_by = self
.last_imported_anchor_author
.map(|a| HashBytes(a.0))
Expand Down Expand Up @@ -220,7 +221,7 @@ impl CollatorStdImpl {
let mut block_limits_reached = false;

// indicate that there are still unprocessed internals when collation loop finished
let mut has_pending_internals = false;
let has_pending_internals;

let mut fill_msgs_total_elapsed = Duration::ZERO;
let mut execute_msgs_total_elapsed = Duration::ZERO;
Expand Down Expand Up @@ -250,6 +251,7 @@ impl CollatorStdImpl {
};
collation_data.read_ext_msgs += ext_msgs.len() as u64;

let read_timer = std::time::Instant::now();
// 2. Then iterate through existing internals and try to fill the set
let mut remaining_capacity = max_messages_per_set - ext_msgs.len();
while remaining_capacity > 0 && !all_existing_internals_finished {
Expand Down Expand Up @@ -284,9 +286,12 @@ impl CollatorStdImpl {
}
}

tracing::debug!(target: tracing_targets::COLLATOR,
ext_count = ext_msgs.len(), int_count = internal_messages_sources.len(),
"read externals and internals",
tracing::debug!(
target: tracing_targets::COLLATOR,
ext_count = ext_msgs.len(),
int_count = internal_messages_sources.len(),
elapsed = ?read_timer.elapsed(),
"read externals and internals"
);

// 3. Join existing internals and externals
Expand Down Expand Up @@ -479,8 +484,6 @@ impl CollatorStdImpl {
collation_data.processed_upto.processed_offset,
);

has_pending_internals = internal_messages_iterator.peek(true)?.is_some();

process_txs_total_elapsed += timer.elapsed();

if block_limits_reached {
Expand Down Expand Up @@ -521,7 +524,9 @@ impl CollatorStdImpl {
let histogram_create_queue_diff =
HistogramGuard::begin_with_labels("tycho_do_collate_create_queue_diff_time", labels);

let diff = Arc::new(internal_messages_iterator.take_diff());
// internal_messages_iterator.fill_processed_upto();

let diff = internal_messages_iterator.take_diff();

// update internal messages processed_upto info in collation_data
for (shard_ident, message_key) in diff.processed_upto.iter() {
Expand All @@ -543,6 +548,8 @@ impl CollatorStdImpl {

let create_queue_diff_elapsed = histogram_create_queue_diff.finish();

has_pending_internals = internal_messages_iterator.next(true)?.is_some();

// build block candidate and new state
let finalize_block_timer = std::time::Instant::now();
// TODO: Move into rayon
Expand Down Expand Up @@ -570,12 +577,13 @@ impl CollatorStdImpl {
}
});

let diff_messages_len = diff.messages.len();
let apply_queue_diff_elapsed;
{
let histogram =
HistogramGuard::begin_with_labels("tycho_do_collate_apply_queue_diff_time", labels);
self.mq_adapter
.apply_diff(diff.clone(), candidate.block_id.as_short_id())
.apply_diff(diff, candidate.block_id.as_short_id())
.await?;
apply_queue_diff_elapsed = histogram.finish();
}
Expand Down Expand Up @@ -678,16 +686,16 @@ impl CollatorStdImpl {
new_msgs_created={}, new_msgs_added={}, \
in_msgs={}, out_msgs={}, \
read_ext_msgs={}, read_int_msgs={}, \
read_new_msgs_from_iterator={}, inserted_new_msgs_to_iterator={}",
read_new_msgs_from_iterator={}, inserted_new_msgs_to_iterator={} has_pending_internals={}. block = {:?}",
block_time_diff,
total_elapsed.as_millis(), elapsed_from_prev_block.as_millis(), collation_mngmnt_overhead.as_millis(),
collation_data.start_lt, collation_data.next_lt, collation_data.execute_count_all,
collation_data.execute_count_ext, collation_data.execute_count_int, collation_data.execute_count_new_int,
collation_data.int_enqueue_count, collation_data.int_dequeue_count,
collation_data.new_msgs_created, diff.messages.len(),
collation_data.new_msgs_created, diff_messages_len,
collation_data.in_msgs.len(), collation_data.out_msgs.len(),
collation_data.read_ext_msgs, collation_data.read_int_msgs_from_iterator,
collation_data.read_new_msgs_from_iterator, collation_data.inserted_new_msgs_to_iterator,
collation_data.read_new_msgs_from_iterator, collation_data.inserted_new_msgs_to_iterator, has_pending_internals, collation_data.block_id_short,
);

assert_eq!(
Expand Down
11 changes: 5 additions & 6 deletions collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ impl CollatorStdImpl {

async fn load_has_internals(&mut self) -> Result<()> {
let mut iterator = self.init_internal_mq_iterator().await?;
let has_internals = iterator.peek(true)?.is_some();
let has_internals = iterator.next(true)?.is_some();
self.update_working_state_pending_internals(Some(has_internals))?;
Ok(())
}
Expand Down Expand Up @@ -593,13 +593,12 @@ impl CollatorStdImpl {
});
}

ranges_to.insert(ShardIdent::new_full(-1), InternalMessageKey {
lt: self.working_state().mc_data.mc_state_stuff().state().gen_lt,
hash: HashBytes([255; 32]),
});
ranges_to.insert(ShardIdent::new_full(-1), InternalMessageKey::MAX);

// for current shard read until last message
ranges_to.insert(self.shard_id, InternalMessageKey::MAX);
if !self.shard_id.is_masterchain() {
ranges_to.insert(self.shard_id, InternalMessageKey::MAX);
}

let internal_messages_iterator = self
.mq_adapter
Expand Down
Loading

0 comments on commit 6dec9bb

Please sign in to comment.