Skip to content

Commit

Permalink
feat(collator): int queue persistent
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick authored and Rexagon committed Jun 8, 2024
1 parent 2430d68 commit e9a8016
Show file tree
Hide file tree
Showing 31 changed files with 685 additions and 685 deletions.
4 changes: 2 additions & 2 deletions cli/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,9 @@ use futures_util::future::BoxFuture;
use tracing_subscriber::Layer;
use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
use tycho_collator::collator::CollatorStdImplFactory;
use tycho_collator::internal_queue::persistent::persistent_state::PersistentStateImplFactory;
use tycho_collator::internal_queue::queue::{QueueFactory, QueueFactoryStdImpl};
use tycho_collator::internal_queue::session::session_state::SessionStateImplFactory;
use tycho_collator::internal_queue::state::persistent::persistent_state::PersistentStateImplFactory;
use tycho_collator::internal_queue::state::session::session_state::SessionStateImplFactory;
use tycho_collator::manager::CollationManager;
use tycho_collator::mempool::MempoolAdapterStdImpl;
use tycho_collator::queue_adapter::MessageQueueAdapterStdImpl;
Expand Down
2 changes: 1 addition & 1 deletion collator/src/collator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ pub struct CollatorStdImpl {
/// Updated in the `import_next_anchor()` and `read_next_externals()`
has_pending_externals: bool,

/// State tracker for creating ShardStateStuff locally
/// State tracker for creating `ShardStateStuff` locally
state_tracker: MinRefMcStateTracker,

stats: CollatorStats,
Expand Down
16 changes: 9 additions & 7 deletions collator/src/internal_queue/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use everscale_types::models::ShardIdent;
use tycho_util::FastHashMap;

use crate::internal_queue::error::QueueError;
use crate::internal_queue::snapshot::{IterRange, MessageWithSource, ShardRange};
use crate::internal_queue::snapshot_manager::SnapshotManager;
use crate::internal_queue::state::state_iterator::{IterRange, MessageWithSource, ShardRange};
use crate::internal_queue::state::states_iterators_manager::StatesIteratorsManager;
use crate::internal_queue::types::{EnqueuedMessage, InternalMessageKey, Lt, QueueDiff};
pub trait QueueIterator: Send {
/// Get next message
Expand All @@ -32,12 +32,12 @@ pub struct QueueIteratorImpl {
commited_current_position: HashMap<ShardIdent, InternalMessageKey>,
messages_for_current_shard: BinaryHeap<Reverse<Arc<MessageWithSource>>>,
new_messages: HashMap<InternalMessageKey, Arc<EnqueuedMessage>>,
snapshot_manager: SnapshotManager,
snapshot_manager: StatesIteratorsManager,
}

impl QueueIteratorImpl {
pub fn new(
snapshot_manager: SnapshotManager,
snapshot_manager: StatesIteratorsManager,
for_shard: ShardIdent,
) -> Result<Self, QueueError> {
let messages_for_current_shard = BinaryHeap::default();
Expand Down Expand Up @@ -75,7 +75,7 @@ fn update_shard_range(

impl QueueIterator for QueueIteratorImpl {
fn next(&mut self, with_new: bool) -> Result<Option<IterItem>> {
if let Some(next_message) = self.snapshot_manager.next() {
if let Some(next_message) = self.snapshot_manager.next()? {
return Ok(Some(IterItem {
message_with_source: next_message.clone(),
is_new: false,
Expand Down Expand Up @@ -112,7 +112,7 @@ impl QueueIterator for QueueIteratorImpl {
}

fn peek(&mut self, with_new: bool) -> Result<Option<IterItem>> {
if let Some(next_message) = self.snapshot_manager.peek() {
if let Some(next_message) = self.snapshot_manager.peek()? {
return Ok(Some(IterItem {
message_with_source: next_message.clone(),
is_new: false,
Expand Down Expand Up @@ -145,7 +145,7 @@ impl QueueIterator for QueueIteratorImpl {
"Taking diff from iterator. New messages count: {}",
self.new_messages.len());

let mut diff = QueueDiff::new();
let mut diff = QueueDiff::default();
for (shard_id, lt) in self.commited_current_position.iter() {
diff.processed_upto.insert(*shard_id, lt.clone());
}
Expand Down Expand Up @@ -189,10 +189,12 @@ impl QueueIterator for QueueIteratorImpl {
&& self.for_shard.workchain() == dest_workchain as i32
{
let message_with_source = MessageWithSource::new(self.for_shard, message.clone());

tracing::trace!(
target: crate::tracing_targets::MQ,
"Adding messages directly because it's for current shard: {:?}",
message_with_source);

self.messages_for_current_shard
.push(Reverse(Arc::new(message_with_source)));
};
Expand Down
5 changes: 1 addition & 4 deletions collator/src/internal_queue/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
pub mod error;
pub mod iterator;
pub mod persistent;
pub mod queue;
pub mod session;
mod shard;
pub mod snapshot;
pub mod snapshot_manager;
pub mod state;
pub mod types;
2 changes: 0 additions & 2 deletions collator/src/internal_queue/persistent/mod.rs

This file was deleted.

105 changes: 34 additions & 71 deletions collator/src/internal_queue/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,33 @@ use std::sync::Arc;

use everscale_types::models::{BlockIdShort, ShardIdent};
use tokio::sync::{Mutex, RwLock};
use tracing::info;

use crate::internal_queue::error::QueueError;
use crate::internal_queue::persistent::persistent_state::{
use crate::internal_queue::state::persistent::persistent_state::{
PersistentState, PersistentStateConfig, PersistentStateFactory, PersistentStateImplFactory,
PersistentStateStdImpl,
};
use crate::internal_queue::session::session_state::{
use crate::internal_queue::state::session::session_state::{
SessionState, SessionStateFactory, SessionStateImplFactory, SessionStateStdImpl,
};
use crate::internal_queue::snapshot::{ShardRange, StateSnapshot};
use crate::internal_queue::state::state_iterator::{ShardRange, StateIterator};
use crate::internal_queue::types::QueueDiff;
use crate::tracing_targets;

// FACTORY

pub struct QueueConfig {
pub persistent_state_config: PersistentStateConfig,
}

pub trait QueueFactory {
type Queue: Queue;
type Queue: LocalQueue;

fn create(&self) -> Self::Queue;
}

impl<F, R> QueueFactory for F
where
F: Fn() -> R,
R: Queue,
R: LocalQueue,
{
type Queue = R;

Expand All @@ -50,11 +47,11 @@ pub struct QueueFactoryStdImpl {

#[trait_variant::make(Queue: Send)]
pub trait LocalQueue {
async fn snapshot(
async fn iterator(
&self,
ranges: &HashMap<ShardIdent, ShardRange>,
for_shard_id: ShardIdent,
) -> Vec<Box<dyn StateSnapshot>>;
) -> Vec<Box<dyn StateIterator>>;
async fn split_shard(&self, shard_id: &ShardIdent) -> Result<(), QueueError>;
async fn merge_shards(
&self,
Expand Down Expand Up @@ -82,8 +79,8 @@ impl QueueFactory for QueueFactoryStdImpl {
let session_state = self.session_state_factory.create();
let persistent_state = self.persistent_state_factory.create();
QueueImpl {
session_state: Mutex::new(session_state),
persistent_state: RwLock::new(persistent_state),
session_state: Arc::new(Mutex::new(session_state)),
persistent_state: Arc::new(RwLock::new(persistent_state)),
}
}
}
Expand All @@ -93,43 +90,43 @@ where
S: SessionState,
P: PersistentState,
{
session_state: Mutex<S>,
persistent_state: RwLock<P>,
session_state: Arc<Mutex<S>>,
persistent_state: Arc<RwLock<P>>,
}

impl<S, P> Queue for QueueImpl<S, P>
where
S: SessionState + Send,
P: PersistentState + Send + Sync,
{
async fn snapshot(
async fn iterator(
&self,
ranges: &HashMap<ShardIdent, ShardRange>,
for_shard_id: ShardIdent,
) -> Vec<Box<dyn StateSnapshot>> {
let session_state_lock = self.session_state.lock().await;
) -> Vec<Box<dyn StateIterator>> {
let session_iter = {
let session_state_lock = self.session_state.lock().await;
session_state_lock.iterator(ranges, for_shard_id).await
};

let persistent_state_lock = self.persistent_state.read().await;
vec![
// TODO parallel
session_state_lock
.snapshot(ranges, for_shard_id.clone())
.await,
persistent_state_lock.snapshot(ranges, for_shard_id).await,
]
let persistent_iter = persistent_state_lock.iterator(for_shard_id);

vec![session_iter, persistent_iter]
}

async fn split_shard(&self, shard_id: &ShardIdent) -> Result<(), QueueError> {
self.session_state.lock().await.split_shard(shard_id).await
let session_state_lock = self.session_state.lock().await;
session_state_lock.split_shard(shard_id).await
}

async fn merge_shards(
&self,
shard_1_id: &ShardIdent,
shard_2_id: &ShardIdent,
) -> Result<(), QueueError> {
self.session_state
.lock()
.await
let session_state_lock = self.session_state.lock().await;
session_state_lock
.merge_shards(shard_1_id, shard_2_id)
.await
}
Expand All @@ -139,63 +136,29 @@ where
diff: Arc<QueueDiff>,
block_id_short: BlockIdShort,
) -> Result<(), QueueError> {
self.session_state
.lock()
.await
.apply_diff(diff, block_id_short)
.await
let session_state_lock = self.session_state.lock().await;
session_state_lock.apply_diff(diff, block_id_short).await
}

async fn add_shard(&self, shard_id: &ShardIdent) -> Result<(), QueueError> {
self.session_state.lock().await.add_shard(shard_id).await
let session_state_lock = self.session_state.lock().await;
session_state_lock.add_shard(shard_id).await
}

async fn commit_diff(
&self,
diff_id: &BlockIdShort,
) -> Result<Option<Arc<QueueDiff>>, QueueError> {
let session_state_lock = self.session_state.lock().await;
let persistent_state_lock = self.persistent_state.write().await;
let diff = session_state_lock.remove_diff(diff_id).await?;
let diff = {
let session_state_lock = self.session_state.lock().await;
session_state_lock.remove_diff(diff_id).await?
};
if let Some(diff) = &diff {
let persistent_state_lock = self.persistent_state.write().await;
persistent_state_lock
.add_messages(*diff_id, diff.messages.clone())
.await?;
}
Ok(diff)
}
}
// #[cfg(test)]
// mod tests {
// use everscale_types::models::ShardIdent;
//
// use super::*;
// use crate::internal_queue::persistent::persistent_state::{
// PersistentStateImplFactory, PersistentStateStdImpl,
// };
//
// #[tokio::test]
// async fn test_new_queue() {
// let base_shard = ShardIdent::new_full(0);
// let config = QueueConfig {
// persistent_state_config: PersistentStateConfig {
// database_url: "db_url".to_string(),
// },
// };
//
// let session_state_factory = SessionStateImplFactory::new(vec![ShardIdent::new_full(0)]);
// let persistent_state_factory =
// PersistentStateImplFactory::new(config.persistent_state_config);
//
// let queue_factory = QueueFactoryStdImpl {
// session_state_factory,
// persistent_state_factory,
// };
//
// let queue = queue_factory.create();
//
// Queue::split_shard(&queue, &base_shard).await.unwrap();
//
// assert_eq!(queue.session_state.lock().await.shards_count().await, 3);
// }
// }
2 changes: 0 additions & 2 deletions collator/src/internal_queue/session/mod.rs

This file was deleted.

Loading

0 comments on commit e9a8016

Please sign in to comment.