Skip to content

Commit

Permalink
Merge branch 'feature/int-queue-persistent'
Browse files Browse the repository at this point in the history
  • Loading branch information
Rexagon committed Jun 8, 2024
2 parents 95841d4 + 748057d commit a012a6f
Show file tree
Hide file tree
Showing 37 changed files with 804 additions and 711 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use futures_util::future::BoxFuture;
use tracing_subscriber::Layer;
use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
use tycho_collator::collator::CollatorStdImplFactory;
use tycho_collator::internal_queue::persistent::persistent_state::PersistentStateImplFactory;
use tycho_collator::internal_queue::queue::{QueueFactory, QueueFactoryStdImpl};
use tycho_collator::internal_queue::session::session_state::SessionStateImplFactory;
use tycho_collator::internal_queue::state::persistent::persistent_state::PersistentStateImplFactory;
use tycho_collator::internal_queue::state::session::session_state::SessionStateImplFactory;
use tycho_collator::manager::CollationManager;
use tycho_collator::mempool::MempoolAdapterStdImpl;
use tycho_collator::queue_adapter::MessageQueueAdapterStdImpl;
Expand Down
2 changes: 2 additions & 0 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ base64 = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
futures-util = { workspace = true }
humantime = { workspace = true }
indexmap = { workspace = true }
metrics = { workspace = true }
parking_lot = { workspace = true }
Expand All @@ -30,6 +31,7 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true }
ton_executor = { workspace = true }
trait-variant = { workspace = true }
weedb = { workspace = true }

everscale-types = { workspace = true }
everscale-crypto = { workspace = true }
Expand Down
13 changes: 13 additions & 0 deletions collator/src/collator/do_collate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,12 @@ impl CollatorStdImpl {
.finalize_block(&mut collation_data, exec_manager)
.await?;

tracing::info!(target: tracing_targets::COLLATOR,
"created block candidate: start_lt={}, end_lt={}, txs={}, new_msgs={}, in_msgs={}, out_msgs={}",
collation_data.start_lt, collation_data.next_lt, block_transactions_count,
diff.messages.len(), collation_data.in_msgs.len(), collation_data.out_msgs.len(),
);

self.mq_adapter
.apply_diff(diff.clone(), candidate.block_id.as_short_id())
.await?;
Expand All @@ -439,6 +445,13 @@ impl CollatorStdImpl {
new_state_stuff: new_state_stuff.clone(),
has_pending_internals,
};

// log collation_result
tracing::info!(target: tracing_targets::COLLATOR,
"Created block candidate: collated_file_hash={}, block_id={}",
collation_result.candidate.collated_file_hash, collation_result.candidate.block_id
);

self.listener.on_block_candidate(collation_result).await?;

tracing::info!(target: tracing_targets::COLLATOR,
Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub struct CollatorStdImpl {
/// Updated in the `import_next_anchor()` and `read_next_externals()`
has_pending_externals: bool,

/// State tracker for creating ShardStateStuff locally
/// State tracker for creating `ShardStateStuff` locally
state_tracker: MinRefMcStateTracker,

stats: CollatorStats,
Expand Down
27 changes: 8 additions & 19 deletions collator/src/internal_queue/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use everscale_types::models::ShardIdent;
use tycho_util::FastHashMap;

use crate::internal_queue::error::QueueError;
use crate::internal_queue::snapshot::{IterRange, MessageWithSource, ShardRange};
use crate::internal_queue::snapshot_manager::SnapshotManager;
use crate::internal_queue::state::state_iterator::{IterRange, MessageWithSource, ShardRange};
use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager;
use crate::internal_queue::types::{EnqueuedMessage, InternalMessageKey, Lt, QueueDiff};
pub trait QueueIterator: Send {
/// Get next message
Expand All @@ -32,12 +32,12 @@ pub struct QueueIteratorImpl {
commited_current_position: HashMap<ShardIdent, InternalMessageKey>,
messages_for_current_shard: BinaryHeap<Reverse<Arc<MessageWithSource>>>,
new_messages: HashMap<InternalMessageKey, Arc<EnqueuedMessage>>,
snapshot_manager: SnapshotManager,
snapshot_manager: StatesIteratorsManager,
}

impl QueueIteratorImpl {
pub fn new(
snapshot_manager: SnapshotManager,
snapshot_manager: StatesIteratorsManager,
for_shard: ShardIdent,
) -> Result<Self, QueueError> {
let messages_for_current_shard = BinaryHeap::default();
Expand Down Expand Up @@ -75,7 +75,7 @@ fn update_shard_range(

impl QueueIterator for QueueIteratorImpl {
fn next(&mut self, with_new: bool) -> Result<Option<IterItem>> {
if let Some(next_message) = self.snapshot_manager.next() {
if let Some(next_message) = self.snapshot_manager.next()? {
return Ok(Some(IterItem {
message_with_source: next_message.clone(),
is_new: false,
Expand All @@ -84,17 +84,9 @@ impl QueueIterator for QueueIteratorImpl {

if with_new {
if let Some(next_message) = self.messages_for_current_shard.pop() {
tracing::trace!(
target: crate::tracing_targets::MQ,
"Popping message from current shard messages: {:?}",
next_message.0);
let message_key = next_message.0.message.key();

if self.new_messages.remove(&message_key).is_some() {
tracing::trace!(
target: crate::tracing_targets::MQ,
"Message deleted new messages and in current shard messages: {:?}",
message_key);
return Ok(Some(IterItem {
message_with_source: next_message.0.clone(),
is_new: true,
Expand All @@ -112,7 +104,7 @@ impl QueueIterator for QueueIteratorImpl {
}

fn peek(&mut self, with_new: bool) -> Result<Option<IterItem>> {
if let Some(next_message) = self.snapshot_manager.peek() {
if let Some(next_message) = self.snapshot_manager.peek()? {
return Ok(Some(IterItem {
message_with_source: next_message.clone(),
is_new: false,
Expand Down Expand Up @@ -145,7 +137,7 @@ impl QueueIterator for QueueIteratorImpl {
"Taking diff from iterator. New messages count: {}",
self.new_messages.len());

let mut diff = QueueDiff::new();
let mut diff = QueueDiff::default();
for (shard_id, lt) in self.commited_current_position.iter() {
diff.processed_upto.insert(*shard_id, lt.clone());
}
Expand Down Expand Up @@ -189,10 +181,7 @@ impl QueueIterator for QueueIteratorImpl {
&& self.for_shard.workchain() == dest_workchain as i32
{
let message_with_source = MessageWithSource::new(self.for_shard, message.clone());
tracing::trace!(
target: crate::tracing_targets::MQ,
"Adding messages directly because it's for current shard: {:?}",
message_with_source);

self.messages_for_current_shard
.push(Reverse(Arc::new(message_with_source)));
};
Expand Down
5 changes: 1 addition & 4 deletions collator/src/internal_queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
pub mod error;
pub mod iterator;
pub mod persistent;
pub mod queue;
pub mod session;
mod shard;
pub mod snapshot;
pub mod snapshot_manager;
pub mod state;
pub mod types;
2 changes: 0 additions & 2 deletions collator/src/internal_queue/persistent/mod.rs

This file was deleted.

This file was deleted.

104 changes: 36 additions & 68 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ use everscale_types::models::{BlockIdShort, ShardIdent};
use tokio::sync::{Mutex, RwLock};

use crate::internal_queue::error::QueueError;
use crate::internal_queue::persistent::persistent_state::{
use crate::internal_queue::state::persistent::persistent_state::{
PersistentState, PersistentStateConfig, PersistentStateFactory, PersistentStateImplFactory,
PersistentStateStdImpl,
};
use crate::internal_queue::session::session_state::{
use crate::internal_queue::state::session::session_state::{
SessionState, SessionStateFactory, SessionStateImplFactory, SessionStateStdImpl,
};
use crate::internal_queue::snapshot::{ShardRange, StateSnapshot};
use crate::internal_queue::state::state_iterator::{ShardRange, StateIterator};
use crate::internal_queue::types::QueueDiff;

// FACTORY

pub struct QueueConfig {
Expand Down Expand Up @@ -48,11 +47,11 @@ pub struct QueueFactoryStdImpl {

#[trait_variant::make(Queue: Send)]
pub trait LocalQueue {
async fn snapshot(
async fn iterator(
&self,
ranges: HashMap<ShardIdent, ShardRange>,
ranges: &HashMap<ShardIdent, ShardRange>,
for_shard_id: ShardIdent,
) -> Vec<Box<dyn StateSnapshot>>;
) -> Vec<Box<dyn StateIterator>>;
async fn split_shard(&self, shard_id: &ShardIdent) -> Result<(), QueueError>;
async fn merge_shards(
&self,
Expand Down Expand Up @@ -80,8 +79,8 @@ impl QueueFactory for QueueFactoryStdImpl {
let session_state = self.session_state_factory.create();
let persistent_state = self.persistent_state_factory.create();
QueueImpl {
session_state: Mutex::new(session_state),
persistent_state: RwLock::new(persistent_state),
session_state: Arc::new(Mutex::new(session_state)),
persistent_state: Arc::new(RwLock::new(persistent_state)),
}
}
}
Expand All @@ -91,41 +90,44 @@ where
S: SessionState,
P: PersistentState,
{
session_state: Mutex<S>,
persistent_state: RwLock<P>,
session_state: Arc<Mutex<S>>,
persistent_state: Arc<RwLock<P>>,
}

impl<S, P> Queue for QueueImpl<S, P>
where
S: SessionState + Send,
P: PersistentState + Send + Sync,
{
async fn snapshot(
async fn iterator(
&self,
ranges: HashMap<ShardIdent, ShardRange>,
ranges: &HashMap<ShardIdent, ShardRange>,
for_shard_id: ShardIdent,
) -> Vec<Box<dyn StateSnapshot>> {
let session_state_lock = self.session_state.lock().await;
let _persistent_state_lock = self.persistent_state.read().await;
vec![
// TODO parallel
session_state_lock.snapshot(ranges, for_shard_id).await,
// persistent_state_lock.snapshot().await,
]
) -> Vec<Box<dyn StateIterator>> {
let session_iter = {
let session_state_lock = self.session_state.lock().await;
session_state_lock.iterator(ranges, for_shard_id).await
};

// let persistent_state_lock = self.persistent_state.read().await;
// let persistent_iter = persistent_state_lock.iterator(for_shard_id);

// vec![session_iter, persistent_iter]
vec![session_iter]
}

async fn split_shard(&self, shard_id: &ShardIdent) -> Result<(), QueueError> {
self.session_state.lock().await.split_shard(shard_id).await
let session_state_lock = self.session_state.lock().await;
session_state_lock.split_shard(shard_id).await
}

async fn merge_shards(
&self,
shard_1_id: &ShardIdent,
shard_2_id: &ShardIdent,
) -> Result<(), QueueError> {
self.session_state
.lock()
.await
let session_state_lock = self.session_state.lock().await;
session_state_lock
.merge_shards(shard_1_id, shard_2_id)
.await
}
Expand All @@ -135,63 +137,29 @@ where
diff: Arc<QueueDiff>,
block_id_short: BlockIdShort,
) -> Result<(), QueueError> {
self.session_state
.lock()
.await
.apply_diff(diff, block_id_short)
.await
let session_state_lock = self.session_state.lock().await;
session_state_lock.apply_diff(diff, block_id_short).await
}

async fn add_shard(&self, shard_id: &ShardIdent) -> Result<(), QueueError> {
self.session_state.lock().await.add_shard(shard_id).await
let session_state_lock = self.session_state.lock().await;
session_state_lock.add_shard(shard_id).await
}

async fn commit_diff(
&self,
diff_id: &BlockIdShort,
) -> Result<Option<Arc<QueueDiff>>, QueueError> {
let session_state_lock = self.session_state.lock().await;
let persistent_state_lock = self.persistent_state.write().await;
let diff = session_state_lock.remove_diff(diff_id).await?;
let diff = {
let session_state_lock = self.session_state.lock().await;
session_state_lock.remove_diff(diff_id).await?
};
if let Some(diff) = &diff {
let persistent_state_lock = self.persistent_state.write().await;
persistent_state_lock
.add_messages(*diff_id, diff.messages.clone())
.await?;
}
Ok(diff)
}
}
// #[cfg(test)]
// mod tests {
// use everscale_types::models::ShardIdent;
//
// use super::*;
// use crate::internal_queue::persistent::persistent_state::{
// PersistentStateImplFactory, PersistentStateStdImpl,
// };
//
// #[tokio::test]
// async fn test_new_queue() {
// let base_shard = ShardIdent::new_full(0);
// let config = QueueConfig {
// persistent_state_config: PersistentStateConfig {
// database_url: "db_url".to_string(),
// },
// };
//
// let session_state_factory = SessionStateImplFactory::new(vec![ShardIdent::new_full(0)]);
// let persistent_state_factory =
// PersistentStateImplFactory::new(config.persistent_state_config);
//
// let queue_factory = QueueFactoryStdImpl {
// session_state_factory,
// persistent_state_factory,
// };
//
// let queue = queue_factory.create();
//
// Queue::split_shard(&queue, &base_shard).await.unwrap();
//
// assert_eq!(queue.session_state.lock().await.shards_count().await, 3);
// }
// }
2 changes: 0 additions & 2 deletions collator/src/internal_queue/session/mod.rs

This file was deleted.

Loading

0 comments on commit a012a6f

Please sign in to comment.