From 45f288572962117dea2f3159ffdb8f97afdbdb95 Mon Sep 17 00:00:00 2001 From: Maksim Greshniakov Date: Wed, 1 May 2024 12:48:08 +0200 Subject: [PATCH 1/6] fix(validataor): improve concurrency --- Cargo.lock | 41 +- Cargo.toml | 2 +- block-util/src/config/mod.rs | 4 +- collator/Cargo.toml | 1 + collator/src/collator/types.rs | 10 +- collator/src/lib.rs | 2 +- collator/src/manager/collation_manager.rs | 15 +- collator/src/manager/collation_processor.rs | 12 +- collator/src/state_node.rs | 8 + collator/src/test_utils.rs | 6 +- collator/src/types.rs | 1 + collator/src/validator/config.rs | 4 + collator/src/validator/mod.rs | 3 +- collator/src/validator/network/handlers.rs | 73 +-- .../src/validator/network/network_service.rs | 51 +- collator/src/validator/state.rs | 360 +++++++---- collator/src/validator/test_impl.rs | 78 +-- collator/src/validator/types.rs | 9 + collator/src/validator/validator.rs | 391 ++++++++++-- collator/src/validator/validator_processor.rs | 590 ------------------ collator/tests/collation_tests.rs | 7 +- collator/tests/validator_tests.rs | 250 ++++---- 22 files changed, 860 insertions(+), 1058 deletions(-) create mode 100644 collator/src/validator/config.rs delete mode 100644 collator/src/validator/validator_processor.rs diff --git a/Cargo.lock b/Cargo.lock index cc7a8c03d..0f478bae4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -588,9 +588,9 @@ dependencies = [ [[package]] name = "everscale-crypto" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d3b3e4fc7882223c86a7cfd8ccdb58e017b89a9f91d90114beafa0e8d35b45fb" +checksum = "0b0304a55e328ca4f354e59e6816bccb43b03f681b85b31c6bd10ea7233d62b5" dependencies = [ "curve25519-dalek", "generic-array", @@ -641,9 +641,9 @@ dependencies = [ [[package]] name = "fastrand" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "658bd65b1cf4c852a3cc96f18a8ce7b5640f6b703f905c7d74532294c2a63984" +checksum = "9fc0510504f03c51ada170672ac806f1f105a88aa97a5281117e1ddc3368e51a" [[package]] name = "fdlimit" @@ -865,9 +865,9 @@ checksum = "01cda141df6706de531b6c46c3a33ecca755538219bd484262fa09410c13539c" [[package]] name = "lock_api" -version = "0.4.11" +version = "0.4.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3c168f8615b12bc01f9c17e2eb0cc07dcae1940121185446edc3744920e8ef45" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" dependencies = [ "autocfg", "scopeguard", @@ -1066,9 +1066,9 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "7e4af0ca4f6caed20e900d564c242b8e5d4903fdacf31d3daf527b66fe6f42fb" dependencies = [ "lock_api", "parking_lot_core", @@ -1076,15 +1076,15 @@ dependencies = [ [[package]] name = "parking_lot_core" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" dependencies = [ "cfg-if", "libc", "redox_syscall", "smallvec", - "windows-targets 0.48.5", + "windows-targets 0.52.5", ] [[package]] @@ -1375,11 +1375,11 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.4.1" +version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4722d768eff46b75989dd134e5c353f0d6296e5aaa3132e776cbdb56be7731aa" +checksum = "469052894dcb553421e483e4209ee581a45100d31b4018de03e5a7ad86374a7e" dependencies = [ - "bitflags 1.3.2", + "bitflags 2.5.0", ] [[package]] @@ -1520,9 +1520,9 @@ dependencies = [ [[package]] name = "rustls" -version = "0.21.11" +version = "0.21.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fecbfb7b1444f477b345853b1fce097a2c6fb637b2bfb87e6bc5db0f043fae4" +checksum = "3f56a14d1f48b391359b22f731fd4bd7e43c97f3c50eee276f3aa09c94784d3e" dependencies = [ "log", "ring 0.17.8", @@ -1576,18 +1576,18 @@ checksum = "92d43fe69e652f3df9bdc2b85b2854a0825b86e4fb76bc44d945137d053639ca" [[package]] name = "serde" -version = "1.0.198" +version = "1.0.199" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9846a40c979031340571da2545a4e5b7c4163bdae79b301d5f86d03979451fcc" +checksum = "0c9f6e76df036c77cd94996771fb40db98187f096dd0b9af39c6c6e452ba966a" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.198" +version = "1.0.199" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e88edab869b01783ba905e7d0153f9fc1a6505a96e4ad3018011eedb838566d9" +checksum = "11bd257a6541e141e42ca6d24ae26f7714887b47e89aa739099104c7e4d3b7fc" dependencies = [ "proc-macro2", "quote", @@ -2109,6 +2109,7 @@ dependencies = [ "tempfile", "tl-proto", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "tracing-test", diff --git a/Cargo.toml b/Cargo.toml index 0fdedfacf..d056deb96 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,7 +76,7 @@ tikv-jemallocator = { version = "0.5", features = [ ] } tl-proto = "0.4" tokio = { version = "1", default-features = false } -tokio-util = { version = "0.7", features = ["codec"] } +tokio-util = { version = "0.7.10", features = ["codec"] } tracing = "0.1" tracing-appender = "0.2.3" tracing-subscriber = { version = "0.3", features = ["env-filter"] } diff --git a/block-util/src/config/mod.rs b/block-util/src/config/mod.rs index 88f94d32a..163e5fc1e 100644 --- a/block-util/src/config/mod.rs +++ b/block-util/src/config/mod.rs @@ -21,8 +21,8 @@ pub trait BlockchainConfigExt { impl BlockchainConfigExt for BlockchainConfig { fn valid_config_data( &self, - relax_par0: bool, - mandatory_params: Option>, + _relax_par0: bool, + _mandatory_params: Option>, ) -> Result { //TODO: refer to https://github.com/everx-labs/ever-block/blob/master/src/config_params.rs#L452 //STUB: currently should not be invoked in prototype diff --git a/collator/Cargo.toml b/collator/Cargo.toml index 413aecb67..ae9cac810 100644 --- a/collator/Cargo.toml +++ b/collator/Cargo.toml @@ -19,6 +19,7 @@ sha2 = { workspace = true } tl-proto = { workspace = true } tempfile = { workspace = true } tokio = { workspace = true, features = ["macros", "rt", "signal"] } +tokio-util = { workspace = true } tracing = { workspace = true } tracing-subscriber = { workspace = true } diff --git a/collator/src/collator/types.rs b/collator/src/collator/types.rs index 007cf254b..2421dc916 100644 --- a/collator/src/collator/types.rs +++ b/collator/src/collator/types.rs @@ -160,8 +160,8 @@ impl McData { BlockRef { end_lt, seqno: block_id.seqno, - root_hash: block_id.root_hash.clone(), - file_hash: block_id.file_hash.clone(), + root_hash: block_id.root_hash, + file_hash: block_id.file_hash, } } @@ -270,7 +270,7 @@ impl PrevData { } pub fn get_blocks_ref(&self) -> Result { - if self.pure_states.len() < 1 || self.pure_states.len() > 2 { + if self.pure_states.is_empty() || self.pure_states.len() > 2 { bail!( "There should be 1 or 2 prev states. Actual count is {}", self.pure_states.len() @@ -282,8 +282,8 @@ impl PrevData { block_refs.push(BlockRef { end_lt: state.state().gen_lt, seqno: state.block_id().seqno, - root_hash: state.block_id().root_hash.clone(), - file_hash: state.block_id().file_hash.clone(), + root_hash: state.block_id().root_hash, + file_hash: state.block_id().file_hash, }); } diff --git a/collator/src/lib.rs b/collator/src/lib.rs index baa4b0a5c..4462f4b28 100644 --- a/collator/src/lib.rs +++ b/collator/src/lib.rs @@ -9,4 +9,4 @@ pub mod types; mod utils; pub mod validator; -pub use validator::test_impl as validator_test_impl; +// pub use validator::test_impl as validator_test_impl; diff --git a/collator/src/manager/collation_manager.rs b/collator/src/manager/collation_manager.rs index d3b1de9fa..3126a8007 100644 --- a/collator/src/manager/collation_manager.rs +++ b/collator/src/manager/collation_manager.rs @@ -26,10 +26,7 @@ use crate::{ async_queued_dispatcher::{AsyncQueuedDispatcher, STANDARD_DISPATCHER_QUEUE_BUFFER_SIZE}, schedule_async_action, }, - validator::{ - validator_processor::{ValidatorProcessor, ValidatorProcessorStdImpl}, - Validator, ValidatorEventListener, ValidatorStdImpl, - }, + validator::{Validator, ValidatorEventListener, ValidatorStdImpl}, }; use super::collation_processor::CollationProcessor; @@ -84,7 +81,7 @@ where { CollationManagerGenImpl::< CollatorStdImpl, _, _, _>, - ValidatorStdImpl, _>, + ValidatorStdImpl<_>, MessageQueueAdapterStdImpl, MP, ST, @@ -96,7 +93,7 @@ where ) } #[allow(private_bounds)] -pub fn create_std_manager_with_validator( +pub fn create_std_manager_with_validator( config: CollationConfig, mpool_adapter_builder: impl MempoolAdapterBuilder + Send, state_adapter_builder: impl StateNodeAdapterBuilder + Send, @@ -105,11 +102,10 @@ pub fn create_std_manager_with_validator( where MP: MempoolAdapter, ST: StateNodeAdapter, - V: ValidatorProcessor, { CollationManagerGenImpl::< CollatorStdImpl, _, _, _>, - ValidatorStdImpl, + ValidatorStdImpl<_>, MessageQueueAdapterStdImpl, MP, ST, @@ -154,9 +150,10 @@ where // create validator and start its tasks queue let validator = Validator::create( - dispatcher.clone(), + vec![dispatcher.clone()], state_node_adapter.clone(), node_network.into(), + config.key_pair, ); // create collation processor that will use these adapters diff --git a/collator/src/manager/collation_processor.rs b/collator/src/manager/collation_processor.rs index d2fe916b1..ee4cd4b7b 100644 --- a/collator/src/manager/collation_processor.rs +++ b/collator/src/manager/collation_processor.rs @@ -560,7 +560,7 @@ where // notify validator, it will start overlay initialization self.validator - .enqueue_add_session(Arc::new(new_session_info.clone().try_into()?)) + .add_session(Arc::new(new_session_info.clone().try_into()?)) .await?; } else { tracing::info!( @@ -683,13 +683,9 @@ where candidate_id.as_short_id(), candidate_chain_time, ); - let current_collator_keypair = self.config.key_pair; - self.validator - .enqueue_candidate_validation( - candidate_id, - session_info.seqno(), - current_collator_keypair, - ) + let _handle = self + .validator + .validate(candidate_id, session_info.seqno()) .await?; // chek if master block min interval elapsed and it needs to collate new master block diff --git a/collator/src/state_node.rs b/collator/src/state_node.rs index b88abba6c..272b6aefe 100644 --- a/collator/src/state_node.rs +++ b/collator/src/state_node.rs @@ -71,6 +71,7 @@ pub trait StateNodeAdapter: BlockProvider + Send + Sync + 'static { pub struct StateNodeAdapterStdImpl { listener: Arc, blocks: Arc>>>, + blocks_mapping: Arc>>, storage: Arc, broadcaster: broadcast::Sender, } @@ -99,6 +100,7 @@ impl StateNodeAdapterStdImpl { storage, blocks: Default::default(), broadcaster, + blocks_mapping: Arc::new(Default::default()), } } @@ -258,10 +260,16 @@ impl StateNodeAdapter for StateNodeAdapterStdImpl { .last() .ok_or(anyhow!("no prev block"))?; + self.blocks_mapping + .lock() + .await + .insert(block.block_id, prev_block_id); + blocks .entry(block.block_id.shard) .or_insert_with(BTreeMap::new) .insert(prev_block_id.seqno, block); + prev_block_id } false => { diff --git a/collator/src/test_utils.rs b/collator/src/test_utils.rs index 44ec18e32..f09c0a7bb 100644 --- a/collator/src/test_utils.rs +++ b/collator/src/test_utils.rs @@ -117,9 +117,7 @@ pub async fn prepare_test_storage() -> anyhow::Result<(DummyArchiveProvider, Arc // shard state let shard_bytes = include_bytes!("../src/state_node/tests/data/test_state_2_0:80.boc"); - let shard_file_hash: HashBytes = sha2::Sha256::digest(shard_bytes).into(); let shard_root = Boc::decode(shard_bytes)?; - let shard_root_hash = *shard_root.repr_hash(); let shard_state = shard_root.parse::()?; let shard_id = BlockId { shard: shard_info.0, @@ -157,11 +155,11 @@ impl BlockProvider for DummyArchiveProvider { type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; - fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { futures_util::future::ready(None).boxed() } - fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + fn get_block<'a>(&'a self, _block_id: &'a BlockId) -> Self::GetBlockFut<'a> { futures_util::future::ready(None).boxed() } } diff --git a/collator/src/types.rs b/collator/src/types.rs index ba41c5bca..819cdfce8 100644 --- a/collator/src/types.rs +++ b/collator/src/types.rs @@ -108,6 +108,7 @@ impl ShardStateStuffExt for ShardStateStuff { } } +#[derive(Clone)] pub enum OnValidatedBlockEvent { ValidByState, Invalid, diff --git a/collator/src/validator/config.rs b/collator/src/validator/config.rs new file mode 100644 index 000000000..678ce3664 --- /dev/null +++ b/collator/src/validator/config.rs @@ -0,0 +1,4 @@ +struct ValidatorConfig { + base_elapsed_time: u64, + +} \ No newline at end of file diff --git a/collator/src/validator/mod.rs b/collator/src/validator/mod.rs index b8cf1a17d..4b437ecd5 100644 --- a/collator/src/validator/mod.rs +++ b/collator/src/validator/mod.rs @@ -1,9 +1,8 @@ pub(crate) use validator::*; +pub mod config; pub mod network; pub mod state; -pub mod test_impl; pub mod types; #[allow(clippy::module_inception)] pub mod validator; -pub mod validator_processor; diff --git a/collator/src/validator/network/handlers.rs b/collator/src/validator/network/handlers.rs index 996bde8b7..4f5158358 100644 --- a/collator/src/validator/network/handlers.rs +++ b/collator/src/validator/network/handlers.rs @@ -1,67 +1,46 @@ -use std::sync::Arc; - -use anyhow::anyhow; +use crate::validator::network::dto::SignaturesQuery; +use crate::validator::state::SessionInfo; +use crate::validator::{process_candidate_signature_response, ValidatorEventListener}; use everscale_types::models::BlockIdShort; - +use std::sync::Arc; use tycho_network::Response; -use crate::method_to_async_task_closure; -use crate::state_node::StateNodeAdapter; -use crate::utils::async_queued_dispatcher::AsyncQueuedDispatcher; -use crate::validator::network::dto::SignaturesQuery; - -use crate::validator::validator_processor::{ValidatorProcessor, ValidatorTaskResult}; - -pub async fn handle_signatures_query( - dispatcher: &Arc>, +pub async fn handle_signatures_query( + session: Option>, session_seqno: u32, block_id_short: BlockIdShort, signatures: Vec<([u8; 32], [u8; 64])>, + listeners: Vec>, ) -> Result, anyhow::Error> where - W: ValidatorProcessor + Send + Sync, - ST: StateNodeAdapter + Send + Sync, { - let receiver = dispatcher - .enqueue_task_with_responder(method_to_async_task_closure!( - get_block_signatures, - session_seqno, - &block_id_short - )) - .await - .map_err(|e| anyhow!("Error getting receiver: {:?}", e))?; - - let task_result = receiver - .await - .map_err(|e| anyhow!("Receiver error: {:?}", e))?; - - dispatcher - .enqueue_task(method_to_async_task_closure!( - process_candidate_signature_response, + let response = match session { + None => SignaturesQuery { session_seqno, block_id_short, - signatures - )) - .await - .map_err(|e| anyhow!("Error enqueueing task: {:?}", e))?; + signatures: vec![], + }, + Some(session) => { + process_candidate_signature_response( + session.clone(), + block_id_short, + signatures, + listeners, + ) + .await?; - match task_result { - Ok(ValidatorTaskResult::Signatures(received_signatures)) => { - let signatures = received_signatures + let signatures = session + .get_valid_signatures(&block_id_short) + .await .into_iter() .map(|(k, v)| (k.0, v.0)) .collect::>(); - - let response = SignaturesQuery { + SignaturesQuery { session_seqno, block_id_short, signatures, - }; - Ok(Some(Response::from_tl(response))) + } } - Ok(ValidatorTaskResult::Void | ValidatorTaskResult::ValidationStatus(_)) => Err(anyhow!( - "Invalid response type received from get_block_signatures." - )), - Err(e) => Err(anyhow!("Error processing task result: {:?}", e)), - } + }; + Ok(Some(Response::from_tl(response))) } diff --git a/collator/src/validator/network/network_service.rs b/collator/src/validator/network/network_service.rs index e9380d92e..09f5cbea3 100644 --- a/collator/src/validator/network/network_service.rs +++ b/collator/src/validator/network/network_service.rs @@ -11,31 +11,27 @@ use tycho_network::{Response, Service, ServiceRequest}; use crate::validator::network::dto::SignaturesQuery; use crate::validator::network::handlers::handle_signatures_query; -use crate::{ - state_node::StateNodeAdapter, - utils::async_queued_dispatcher::AsyncQueuedDispatcher, - validator::validator_processor::{ValidatorProcessor, ValidatorTaskResult}, -}; +use crate::validator::state::{SessionInfo, ValidationState, ValidationStateStdImpl}; +use crate::validator::ValidatorEventListener; +use crate::{state_node::StateNodeAdapter, utils::async_queued_dispatcher::AsyncQueuedDispatcher}; #[derive(Clone)] -pub struct NetworkService -where - W: ValidatorProcessor + Send + Sync, - ST: StateNodeAdapter + Send + Sync, -{ - dispatcher: Arc>, - _marker: PhantomData, +pub struct NetworkService { + listeners: Vec>, + state: Arc, + session_seqno: u32, } -impl NetworkService -where - W: ValidatorProcessor + Send + Sync, - ST: StateNodeAdapter + Send + Sync, -{ - pub fn new(dispatcher: Arc>) -> Self { +impl NetworkService { + pub fn new( + listeners: Vec>, + state: Arc, + session_seqno: u32, + ) -> Self { Self { - dispatcher, - _marker: Default::default(), + listeners, + state, + session_seqno, } } } @@ -44,11 +40,7 @@ where #[repr(transparent)] pub struct OverlayId(pub [u8; 32]); -impl Service for NetworkService -where - W: ValidatorProcessor + Send + Sync, - ST: StateNodeAdapter + Send + Sync, -{ +impl Service for NetworkService { type QueryResponse = Response; type OnQueryFuture = Pin> + Send>>; type OnMessageFuture = Ready<()>; @@ -57,8 +49,8 @@ where fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture { let query_result = req.parse_tl(); - let dispatcher = Arc::clone(&self.dispatcher); - + let state = self.state.clone(); + let listeners = self.listeners.clone(); async move { match query_result { Ok(query) => { @@ -68,17 +60,18 @@ where signatures, } = query; { + let session = state.get_session(session_seqno).await; match handle_signatures_query( - &dispatcher, + session, session_seqno, block_id_short, signatures, + listeners, ) .await { Ok(response_option) => response_option, Err(e) => { - error!("Error handling signatures query: {:?}", e); panic!("Error handling signatures query: {:?}", e); } } diff --git a/collator/src/validator/state.rs b/collator/src/validator/state.rs index f9df9b273..a1d449492 100644 --- a/collator/src/validator/state.rs +++ b/collator/src/validator/state.rs @@ -2,148 +2,167 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::{bail, Context}; +use async_trait::async_trait; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, BlockIdShort, Signature}; +use tokio::sync::{Mutex, RwLock}; +use tracing::{debug, trace}; +use crate::types::{BlockSignatures, OnValidatedBlockEvent}; use crate::validator::types::{ BlockValidationCandidate, ValidationResult, ValidationSessionInfo, ValidatorInfo, }; +use crate::validator::ValidatorEventListener; use tycho_network::PrivateOverlay; -use tycho_util::FastHashMap; +use tycho_util::{FastDashMap, FastHashMap}; struct SignatureMaps { valid_signatures: FastHashMap, invalid_signatures: FastHashMap, + event_dispatched: Mutex, } /// Represents the state of validation for blocks and sessions. -pub trait ValidationState: Send + Sync + 'static { +pub trait ValidationState: Send + Sync { /// Creates a new instance of a type implementing `ValidationState`. fn new() -> Self; /// Adds a new validation session. - fn add_session(&mut self, session: Arc, private_overlay: PrivateOverlay); - + fn try_add_session( + &self, + session: Arc, + ) -> impl std::future::Future> + Send; /// Retrieves an immutable reference to a session by its ID. - fn get_session(&self, session_id: u32) -> Option<&SessionInfo>; - - /// Retrieves a mutable reference to a session by its ID. - fn get_mut_session(&mut self, session_id: u32) -> Option<&mut SessionInfo>; + fn get_session( + &self, + session_id: u32, + ) -> impl std::future::Future>> + Send; } /// Holds information about a validation session. pub struct SessionInfo { - session_id: u32, + seqno: u32, max_weight: u64, - blocks_signatures: FastHashMap, - cached_signatures: FastHashMap>, + blocks_signatures: FastDashMap, + cached_signatures: FastDashMap>, validation_session_info: Arc, private_overlay: PrivateOverlay, } impl SessionInfo { + pub fn new( + seqno: u32, + validation_session_info: Arc, + private_overlay: PrivateOverlay, + ) -> Arc { + let max_weight = validation_session_info + .validators + .values() + .map(|vi| vi.weight) + .sum(); + Arc::new(Self { + seqno, + max_weight, + blocks_signatures: Default::default(), + cached_signatures: Default::default(), + validation_session_info, + private_overlay, + }) + } + + pub fn get_seqno(&self) -> u32 { + self.seqno + } + + pub fn get_cached_signatures_by_block( + &self, + block_id_short: &BlockIdShort, + ) -> Option<(BlockIdShort, FastHashMap)> { + self.cached_signatures.remove(block_id_short) + } + /// Returns the associated `PrivateOverlay`. pub(crate) fn get_overlay(&self) -> &PrivateOverlay { &self.private_overlay } /// Returns the `ValidationSessionInfo`. - pub fn get_validation_session_info(&self) -> &ValidationSessionInfo { - &self.validation_session_info + pub fn get_validation_session_info(&self) -> Arc { + self.validation_session_info.clone() + } + + pub async fn is_validator_signed( + &self, + block_id_short: &BlockIdShort, + validator_id: HashBytes, + ) -> bool { + if let Some(ref_data) = self.blocks_signatures.get(block_id_short) { + ref_data.1.valid_signatures.contains_key(&validator_id) + || ref_data.1.invalid_signatures.contains_key(&validator_id) + } else { + false + } } /// Adds a block to the session, moving cached signatures to block signatures. - pub fn add_block(&mut self, block: BlockId) -> anyhow::Result<()> { + pub async fn add_block(&self, block: BlockId) -> anyhow::Result<()> { let block_header = block.as_short_id(); + self.blocks_signatures .entry(block_header) .or_insert_with(|| { ( block, SignatureMaps { - valid_signatures: Default::default(), - invalid_signatures: Default::default(), + valid_signatures: FastHashMap::default(), + invalid_signatures: FastHashMap::default(), + event_dispatched: Mutex::new(false), }, ) }); - - if let Some(cached_signatures) = self.cached_signatures.remove(&block_header) { - let candidate: BlockValidationCandidate = block.into(); - for (validator_id, signature) in cached_signatures { - let validator = self - .validation_session_info - .validators - .get(&validator_id) - .context("Validator not found in session")?; - let signature_is_valid = validator - .public_key - .verify(candidate.as_bytes(), &signature.0); - if let Some((_, signature_maps)) = self.blocks_signatures.get_mut(&block_header) { - if signature_is_valid { - signature_maps - .valid_signatures - .insert(validator_id, signature); - } else { - signature_maps - .invalid_signatures - .insert(validator_id, signature); - } - } else { - bail!("Block not found in session but was added before"); - } - } - } Ok(()) } - pub fn get_block(&self, block_id_short: &BlockIdShort) -> Option<&BlockId> { + pub async fn get_block(&self, block_id_short: &BlockIdShort) -> Option { self.blocks_signatures .get(block_id_short) - .map(|(block, _)| block) + .map(|ref_data| ref_data.0) } - pub(crate) fn blocks_count(&self) -> usize { + pub(crate) async fn blocks_count(&self) -> usize { self.blocks_signatures.len() } /// Determines the validation status of a block. - pub fn validation_status(&self, block_id_short: &BlockIdShort) -> ValidationResult { - let valid_weight = self.max_weight * 2 / 3 + 1; - if let Some((_, signature_maps)) = self.blocks_signatures.get(block_id_short) { - let total_valid_weight: u64 = signature_maps - .valid_signatures - .keys() - .map(|validator_id| { - self.validation_session_info - .validators - .get(validator_id) - .map_or(0, |vi| vi.weight) - }) - .sum(); + pub async fn get_validation_status( + &self, + block_id_short: &BlockIdShort, + ) -> anyhow::Result { + trace!("Getting validation status for block {:?}", block_id_short); + // Bind the lock result to a variable to extend its lifetime + // let block_signatures_guard = self.blocks_signatures; + let signatures = self.blocks_signatures.get(block_id_short); - if total_valid_weight >= valid_weight { - ValidationResult::Valid - } else if self.is_invalid(signature_maps, valid_weight) { - ValidationResult::Invalid - } else { - ValidationResult::Insufficient(total_valid_weight, valid_weight) - } + if let Some(ref_data) = signatures { + Ok(self.validation_status(&ref_data.1).await) } else { - ValidationResult::Insufficient(0, valid_weight) + Ok(ValidationResult::Insufficient(0, 0)) } } + /// Lists validators without signatures for a given block. - pub fn validators_without_signatures( + pub async fn validators_without_signatures( &self, block_id_short: &BlockIdShort, ) -> Vec> { // Retrieve the block signatures (both valid and invalid) if they exist. - if let Some((_, signature_maps)) = self.blocks_signatures.get(block_id_short) { + if let Some(ref_data) = self.blocks_signatures.get(block_id_short) { // Create a combined set of validator IDs who have signed (either validly or invalidly). - let validators_with_signatures: std::collections::HashSet<_> = signature_maps + let validators_with_signatures: std::collections::HashSet<_> = ref_data + .1 .valid_signatures .keys() - .chain(signature_maps.invalid_signatures.keys()) + .chain(ref_data.1.invalid_signatures.keys()) .collect(); // Filter validators who haven't provided a signature. @@ -169,8 +188,8 @@ impl SessionInfo { } /// Adds cached signatures for a block. - pub fn add_cached_signatures( - &mut self, + pub async fn add_cached_signatures( + &self, block_id_short: &BlockIdShort, signatures: Vec<(HashBytes, Signature)>, ) { @@ -178,41 +197,46 @@ impl SessionInfo { .insert(*block_id_short, signatures.into_iter().collect()); } - // /// Checks if a block exists within the session. - // pub fn is_block_exists(&self, block_id_short: &BlockIdShort) -> bool { - // self.blocks_signatures.contains_key(block_id_short) - // } - /// Retrieves valid signatures for a block. - pub fn get_valid_signatures( + pub async fn get_valid_signatures( &self, block_id_short: &BlockIdShort, ) -> FastHashMap { - if let Some((_, signature_maps)) = self.blocks_signatures.get(block_id_short) { - signature_maps.valid_signatures.clone() + let cached_signatures = self.cached_signatures.len(); + let normal_signatures = self.blocks_signatures.len(); + let block_signatures = self.blocks_signatures.get(block_id_short); + let valid_signatures = block_signatures.map(|ref_data| ref_data.1.valid_signatures.clone()); + let block_signatures = self.blocks_signatures.get(block_id_short); + let invalid_signatures = + block_signatures.map(|ref_data| ref_data.1.invalid_signatures.clone()); + + if let Some(ref_data) = self.blocks_signatures.get(block_id_short) { + ref_data.1.valid_signatures.clone() } else { FastHashMap::default() } } /// Adds a signature for a block. - pub fn add_signature( - &mut self, + pub async fn add_signature( + &self, block_id: &BlockId, validator_id: HashBytes, signature: Signature, is_valid: bool, ) { let block_header = block_id.as_short_id(); - let entry = self + // let mut write_guard = self.blocks_signatures.write().await; // Hold onto the lock + let mut entry = self .blocks_signatures - .entry(block_header) + .entry(block_header) // Use the guard to access the map .or_insert_with(|| { ( *block_id, SignatureMaps { valid_signatures: FastHashMap::default(), invalid_signatures: FastHashMap::default(), + event_dispatched: Mutex::new(false), }, ) }); @@ -245,11 +269,145 @@ impl SessionInfo { .sum::(); total_possible_weight - total_invalid_weight < valid_weight } + + pub async fn process_signatures_and_update_status( + &self, + block_id_short: BlockIdShort, + signatures: Vec<([u8; 32], [u8; 64])>, + listeners: Vec>, + ) -> anyhow::Result<()> { + trace!( + "Processing signatures for block in state {:?}", + block_id_short + ); + let mut entry = self + .blocks_signatures + .entry(block_id_short) + .or_insert_with(|| { + ( + BlockId::default(), // Default should be replaced with actual block retrieval logic if necessary + SignatureMaps { + valid_signatures: FastHashMap::default(), + invalid_signatures: FastHashMap::default(), + event_dispatched: Mutex::new(false), + }, + ) + }); + + let mut event_guard = entry.1.event_dispatched.lock().await; + if *event_guard { + debug!( + "Validation event already dispatched for block {:?}", + block_id_short + ); + return Ok(()); + } + + // Drop the guard to allow mutable access below + drop(event_guard); + + // Process each signature + for (pub_key_bytes, sig_bytes) in signatures { + let validator_id = HashBytes(pub_key_bytes); + let signature = Signature(sig_bytes); + let block_validation_candidate = BlockValidationCandidate::from(entry.0); + + let is_valid = self + .get_validation_session_info() + .validators + .get(&validator_id) + .context("Validator not found")? + .public_key + .verify(block_validation_candidate.as_bytes(), &signature.0); + + if is_valid { + entry.1.valid_signatures.insert(validator_id, signature); + } else { + entry.1.invalid_signatures.insert(validator_id, signature); + } + } + + let validation_status = self.validation_status(&entry.1).await; + // Check if the validation status qualifies for dispatching the event + match validation_status { + ValidationResult::Valid => { + let mut event_guard = entry.1.event_dispatched.lock().await; + *event_guard = true; // Prevent further event dispatching for this block + drop(event_guard); // Drop guard as soon as possible + let event = OnValidatedBlockEvent::Valid(BlockSignatures { + signatures: entry.1.valid_signatures.clone(), + }); + Self::notify_listeners(entry.0, event, listeners); + } + ValidationResult::Invalid => { + let mut event_guard = entry.1.event_dispatched.lock().await; + *event_guard = true; // Prevent further event dispatching for this block + drop(event_guard); // Drop guard as soon as possible + let event = OnValidatedBlockEvent::Invalid; + Self::notify_listeners(entry.0, event, listeners); + } + + ValidationResult::Insufficient(_, _) => {} + } + + Ok(()) + } + + async fn validation_status(&self, signature_maps: &SignatureMaps) -> ValidationResult { + let total_valid_weight: u64 = signature_maps + .valid_signatures + .keys() + .map(|validator_id| { + self.validation_session_info + .validators + .get(validator_id) + .map_or(0, |vi| vi.weight) + }) + .sum(); + + let total_invalid_weight: u64 = signature_maps + .invalid_signatures + .keys() + .map(|validator_id| { + self.validation_session_info + .validators + .get(validator_id) + .map_or(0, |vi| vi.weight) + }) + .sum(); + + let valid_weight_threshold = self.max_weight * 2 / 3 + 1; + let invalid_weight_threshold = self.max_weight / 3 + 1; + + if total_valid_weight >= valid_weight_threshold { + ValidationResult::Valid + } else if total_invalid_weight >= invalid_weight_threshold { + ValidationResult::Invalid + } else { + ValidationResult::Insufficient(total_valid_weight, valid_weight_threshold) + } + } + + fn notify_listeners( + block: BlockId, + event: OnValidatedBlockEvent, + listeners: Vec>, + ) { + for listener in listeners { + let cloned_event = event.clone(); + tokio::spawn(async move { + listener + .on_block_validated(block, cloned_event) + .await + .expect("Failed to notify listener"); + }); + } + } } /// Standard implementation of `ValidationState`. pub struct ValidationStateStdImpl { - sessions: HashMap, + sessions: RwLock>>, } impl ValidationState for ValidationStateStdImpl { @@ -259,27 +417,19 @@ impl ValidationState for ValidationStateStdImpl { } } - fn add_session( - &mut self, - session: Arc, - private_overlay: PrivateOverlay, - ) { - let session_info = SessionInfo { - session_id: session.seqno, - max_weight: session.validators.values().map(|info| info.weight).sum(), - blocks_signatures: Default::default(), - cached_signatures: Default::default(), - validation_session_info: session, - private_overlay, - }; - self.sessions.insert(session_info.session_id, session_info); - } + async fn try_add_session(&self, session: Arc) -> anyhow::Result<()> { + let seqno = session.seqno; + + let session = self.sessions.write().await.insert(seqno, session); - fn get_session(&self, session_id: u32) -> Option<&SessionInfo> { - self.sessions.get(&session_id) + if session.is_some() { + bail!("Session already exists with seqno: {seqno}"); + } + + Ok(()) } - fn get_mut_session(&mut self, session_id: u32) -> Option<&mut SessionInfo> { - self.sessions.get_mut(&session_id) + async fn get_session(&self, session_id: u32) -> Option> { + self.sessions.read().await.get(&session_id).cloned() } } diff --git a/collator/src/validator/test_impl.rs b/collator/src/validator/test_impl.rs index 74348d6be..8ddb89fa4 100644 --- a/collator/src/validator/test_impl.rs +++ b/collator/src/validator/test_impl.rs @@ -5,6 +5,7 @@ use async_trait::async_trait; use everscale_crypto::ed25519::{KeyPair, PublicKey}; use everscale_types::models::{BlockId, BlockIdShort, Signature}; +use tokio::sync::Semaphore; use tycho_block_util::state::ShardStateStuff; use tycho_util::FastHashMap; @@ -13,9 +14,9 @@ use crate::tracing_targets; use crate::types::{BlockSignatures, OnValidatedBlockEvent, ValidatorNetwork}; use crate::validator::types::ValidationSessionInfo; use crate::{state_node::StateNodeAdapter, utils::async_queued_dispatcher::AsyncQueuedDispatcher}; +use crate::validator::state::SessionInfo; use super::{ - validator_processor::{ValidatorProcessor, ValidatorTaskResult}, ValidatorEventEmitter, ValidatorEventListener, }; @@ -57,7 +58,6 @@ where } fn new( - _dispatcher: Arc>, listener: Arc, _state_node_adapter: Arc, _network: ValidatorNetwork, @@ -71,11 +71,12 @@ where } async fn start_candidate_validation( - &mut self, + // &self, candidate_id: BlockId, - _session_seqno: u32, + session: &Arc, current_validator_keypair: KeyPair, - ) -> Result { + listener: Vec>, + ) -> Result<()> { let mut signatures = FastHashMap::default(); signatures.insert( current_validator_keypair.public_key.to_bytes().into(), @@ -86,64 +87,15 @@ where "Validator (block: {}): STUB: emulated validation via signatures request", candidate_id.as_short_id(), ); - self.listener - .on_block_validated( - candidate_id, - OnValidatedBlockEvent::Valid(BlockSignatures { signatures }), - ) - .await?; - - Ok(ValidatorTaskResult::Void) - } - - fn get_dispatcher(&self) -> Arc> { - self._dispatcher.clone() - } - - async fn try_add_session( - &mut self, - _session: Arc, - ) -> Result { - Ok(ValidatorTaskResult::Void) - } - - async fn stop_candidate_validation( - &self, - _candidate_id: BlockId, - ) -> Result { - todo!() - } - - async fn get_block_signatures( - &mut self, - _session_seqno: u32, - _block_id_short: &BlockIdShort, - ) -> Result { - todo!() - } - async fn process_candidate_signature_response( - &mut self, - _session_seqno: u32, - _block_id_short: BlockIdShort, - _signatures: Vec<([u8; 32], [u8; 64])>, - ) -> Result { - todo!() - } - - async fn validate_candidate( - &mut self, - _candidate_id: BlockId, - _session_seqno: u32, - _current_validator_pubkey: PublicKey, - ) -> Result { - todo!() - } + for listener in listener.iter() { + listener + .on_block_validated( + candidate_id, + OnValidatedBlockEvent::Valid(BlockSignatures { signatures: signatures.clone() }), + ) + .await?; + } - async fn get_validation_status( - &mut self, - _session_seqno: u32, - _block_id_short: &BlockIdShort, - ) -> Result { - todo!() + Ok(()) } } diff --git a/collator/src/validator/types.rs b/collator/src/validator/types.rs index 75ad70ca9..01945c7d3 100644 --- a/collator/src/validator/types.rs +++ b/collator/src/validator/types.rs @@ -108,3 +108,12 @@ pub enum ValidationResult { Invalid, Insufficient(u64, u64), } + +impl ValidationResult { + pub fn is_finished(&self) -> bool { + match self { + ValidationResult::Valid | ValidationResult::Invalid => true, + ValidationResult::Insufficient(..) => false, + } + } +} diff --git a/collator/src/validator/validator.rs b/collator/src/validator/validator.rs index 3e20921a9..5dc9eefb0 100644 --- a/collator/src/validator/validator.rs +++ b/collator/src/validator/validator.rs @@ -1,12 +1,27 @@ +use std::mem::take; use std::sync::Arc; +use std::time::Duration; -use anyhow::Result; +use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; use everscale_crypto::ed25519::KeyPair; -use everscale_types::models::BlockId; +use everscale_types::cell::HashBytes; +use everscale_types::models::{BlockId, BlockIdShort, Signature}; +use futures_util::future::join_all; +use tokio::select; +use tokio::task::{JoinError, JoinHandle}; +use tracing::{debug, trace, warn}; +use tycho_network::{OverlayId, PeerId, PrivateOverlay, Request}; +use tycho_util::FastHashMap; -use crate::types::{OnValidatedBlockEvent, ValidatorNetwork}; -use crate::validator::types::ValidationSessionInfo; +use crate::state_node::StateNodeAdapterStdImpl; +use crate::types::{BlockSignatures, OnValidatedBlockEvent, ValidatorNetwork}; +use crate::validator::network::dto::SignaturesQuery; +use crate::validator::network::network_service::NetworkService; +use crate::validator::state::{SessionInfo, ValidationState, ValidationStateStdImpl}; +use crate::validator::types::{ + BlockValidationCandidate, OverlayNumber, ValidationResult, ValidationSessionInfo, ValidatorInfo, +}; use crate::{ method_to_async_task_closure, state_node::StateNodeAdapter, @@ -16,9 +31,6 @@ use crate::{ }, }; -use super::validator_processor::{ValidatorProcessor, ValidatorTaskResult}; -const VALIDATOR_BUFFER_SIZE: usize = 1usize; -//TODO: remove emitter #[async_trait] pub trait ValidatorEventEmitter { /// When shard or master block was validated by validator @@ -45,93 +57,356 @@ where ST: StateNodeAdapter, { fn create( - listener: Arc, + listeners: Vec>, state_node_adapter: Arc, network: ValidatorNetwork, + keypair: KeyPair, ) -> Self; /// Enqueue block candidate validation task - async fn enqueue_candidate_validation( - &self, - candidate: BlockId, - session_seqno: u32, - current_validator_keypair: KeyPair, - ) -> Result<()>; + async fn validate(&self, candidate: BlockId, session_seqno: u32) -> Result<()>; async fn enqueue_stop_candidate_validation(&self, candidate: BlockId) -> Result<()>; - async fn enqueue_add_session(&self, session_info: Arc) -> Result<()>; + async fn add_session(&self, validators_session_info: Arc) -> Result<()>; + fn get_keypair(&self) -> &KeyPair; } #[allow(private_bounds)] -pub struct ValidatorStdImpl +pub struct ValidatorStdImpl where - W: ValidatorProcessor, ST: StateNodeAdapter, { _marker_state_node_adapter: std::marker::PhantomData, - dispatcher: Arc>, + validation_state: Arc, + validation_semaphore: Arc, + listeners: Vec>, + network: ValidatorNetwork, + state_node_adapter: Arc, + keypair: KeyPair, } #[async_trait] -impl Validator for ValidatorStdImpl +impl Validator for ValidatorStdImpl where - W: ValidatorProcessor, ST: StateNodeAdapter, { fn create( - listener: Arc, + listeners: Vec>, state_node_adapter: Arc, network: ValidatorNetwork, + keypair: KeyPair, ) -> Self { tracing::info!(target: tracing_targets::VALIDATOR, "Creating validator..."); - // create dispatcher for own async tasks queue - let (dispatcher, receiver) = AsyncQueuedDispatcher::new(VALIDATOR_BUFFER_SIZE); - let dispatcher = Arc::new(dispatcher); - - // create validation processor and run dispatcher for own tasks queue - let processor = - ValidatorProcessor::new(dispatcher.clone(), listener, state_node_adapter, network); - AsyncQueuedDispatcher::run(processor, receiver); - tracing::trace!(target: tracing_targets::VALIDATOR, "Tasks queue dispatcher started"); + let validation_state = Arc::new(ValidationStateStdImpl::new()); - tracing::info!(target: tracing_targets::VALIDATOR, "Validator created"); - - // create validator instance Self { _marker_state_node_adapter: std::marker::PhantomData, - dispatcher, + validation_semaphore: Arc::new(tokio::sync::Semaphore::new(1)), + validation_state, + listeners, + network, + state_node_adapter, + keypair, } } - async fn enqueue_candidate_validation( - &self, - candidate: BlockId, - session_seqno: u32, - current_validator_keypair: KeyPair, - ) -> Result<()> { - self.dispatcher - .enqueue_task(method_to_async_task_closure!( - start_candidate_validation, - candidate, - session_seqno, - current_validator_keypair - )) + async fn validate(&self, candidate: BlockId, session_seqno: u32) -> Result<()> { + let session = self + .validation_state + .get_session(session_seqno) .await + .ok_or_else(|| { + anyhow::anyhow!("Validation session not found for seqno: {}", session_seqno) + })? + .clone(); + + start_candidate_validation( + candidate, + session, + &self.keypair, + self.listeners.clone(), + self.network.clone(), + self.state_node_adapter.clone(), + ) + .await?; + Ok(()) } - async fn enqueue_stop_candidate_validation(&self, candidate: BlockId) -> Result<()> { - self.dispatcher - .enqueue_task(method_to_async_task_closure!( - stop_candidate_validation, - candidate - )) - .await + async fn enqueue_stop_candidate_validation(&self, _candidate: BlockId) -> Result<()> { + Ok(()) } - async fn enqueue_add_session(&self, session_info: Arc) -> Result<()> { - self.dispatcher - .enqueue_task(method_to_async_task_closure!(try_add_session, session_info)) - .await + fn get_keypair(&self) -> &KeyPair { + &self.keypair + } + + async fn add_session(&self, validators_session_info: Arc) -> Result<()> { + trace!(target: tracing_targets::VALIDATOR, "Trying to add session seqno {:?}", validators_session_info.seqno); + let (peer_resolver, local_peer_id) = { + let network = self.network.clone(); + ( + network.clone().peer_resolver, + network.dht_client.network().peer_id().0, + ) + }; + + let overlay_id = OverlayNumber { + session_seqno: validators_session_info.seqno, + }; + trace!(target: tracing_targets::VALIDATOR, overlay_id = ?validators_session_info.seqno, "Creating private overlay"); + let overlay_id = OverlayId(tl_proto::hash(overlay_id)); + + let seqno = validators_session_info.seqno; + + let network_service = + NetworkService::new(self.listeners.clone(), self.validation_state.clone(), seqno); + + let private_overlay = PrivateOverlay::builder(overlay_id) + .with_peer_resolver(peer_resolver) + .build(network_service.clone()); + + let overlay_added = self + .network + .overlay_service + .add_private_overlay(&private_overlay.clone()); + + if !overlay_added { + bail!("Failed to add private overlay"); + } + + let session_info = SessionInfo::new( + validators_session_info.seqno, + validators_session_info.clone(), + private_overlay.clone(), + ); + + self.validation_state.try_add_session(session_info).await?; + + let mut entries = private_overlay.write_entries(); + + for validator in validators_session_info.validators.values() { + if validator.public_key.to_bytes() == local_peer_id { + continue; + } + entries.insert(&PeerId(validator.public_key.to_bytes())); + trace!(target: tracing_targets::VALIDATOR, validator_pubkey = ?validator.public_key.as_bytes(), "Added validator to overlay"); + } + + trace!(target: tracing_targets::VALIDATOR, "Session seqno {:?} added", validators_session_info.seqno); + Ok(()) + } +} + +fn sign_block(key_pair: &KeyPair, block: &BlockId) -> anyhow::Result { + let block_validation_candidate = BlockValidationCandidate::from(*block); + let signature = Signature(key_pair.sign(block_validation_candidate.as_bytes())); + Ok(signature) +} + +async fn start_candidate_validation( + block_id: BlockId, + session: Arc, + current_validator_keypair: &KeyPair, + listeners: Vec>, + network: ValidatorNetwork, + state_node_adapter: Arc, +) -> Result<()> { + let base_delay_ms = 100; + let cancellation_token = tokio_util::sync::CancellationToken::new(); + let short_id = block_id.as_short_id(); + let our_signature = sign_block(current_validator_keypair, &block_id)?; + + session.add_block(block_id).await?; + let current_validator_pubkey = HashBytes(current_validator_keypair.public_key.to_bytes()); + + let mut initial_signatures = vec![(current_validator_pubkey.0, our_signature.0)]; + + let cached_signatures = session.get_cached_signatures_by_block(&block_id.as_short_id()); + + if let Some(cached_signatures) = cached_signatures { + initial_signatures.extend(cached_signatures.1.into_iter().map(|(k, v)| (k.0, v.0))); + } + + let is_validation_finished = process_candidate_signature_response( + session.clone(), + short_id, + vec![(current_validator_pubkey.0, our_signature.0)], + listeners.clone(), + ) + .await?; + trace!(target: tracing_targets::VALIDATOR, "Validation finished: {:?}", is_validation_finished); + + if is_validation_finished { + cancellation_token.cancel(); // Cancel all tasks if validation is finished + return Ok(()); + } + + let validators = session.validators_without_signatures(&short_id).await; + trace!(target: tracing_targets::VALIDATOR, "Validators without signatures: {:?}", validators.len()); + let filtered_validators: Vec> = validators + .iter() + .filter(|validator| validator.public_key != current_validator_keypair.public_key) + .cloned() + .collect(); + + let block_from_state = state_node_adapter.load_block_handle(&block_id).await?; + + if block_from_state.is_some() { + for listener in listeners.iter() { + let cloned_listener = listener.clone(); + tokio::spawn(async move { + cloned_listener + .on_block_validated(block_id, OnValidatedBlockEvent::ValidByState) + .await + .expect("Failed to notify listener"); + }); + } + + return Ok(()); + } + + let mut handlers: Vec>> = Vec::new(); + + for validator in filtered_validators { + let cloned_private_overlay = session.get_overlay().clone(); + let cloned_network = network.dht_client.network().clone(); + let cloned_listeners = listeners.clone(); + let cloned_session = session.clone(); + let token_clone = cancellation_token.clone(); + + let handler = tokio::spawn(async move { + let mut attempt = 0; + loop { + if token_clone.is_cancelled() { + trace!(target: tracing_targets::VALIDATOR, "Validation task cancelled"); + return Ok(()); + } + + let already_signed = cloned_session + .is_validator_signed(&short_id, HashBytes(validator.public_key.to_bytes())) + .await; + if already_signed { + trace!(target: tracing_targets::VALIDATOR, "Validator {:?} already signed", validator.public_key.to_bytes()); + return Ok(()); + } + + let validation_finished = cloned_session + .get_validation_status(&short_id) + .await? + .is_finished(); + if validation_finished { + trace!(target: tracing_targets::VALIDATOR, "Validation is finished"); + token_clone.cancel(); // Signal cancellation to all tasks + return Ok(()); + } + + let payload = SignaturesQuery::create( + cloned_session.get_seqno(), + short_id, + &cloned_session.get_valid_signatures(&short_id).await, + ); + + let response = tokio::time::timeout( + Duration::from_secs(1), + cloned_private_overlay.query( + &cloned_network, + &PeerId(validator.public_key.to_bytes()), + Request::from_tl(payload), + ), + ) + .await; + + match response { + Ok(Ok(response)) => { + if let Ok(signatures) = response.parse_tl::() { + trace!(target: tracing_targets::VALIDATOR, "Received signatures from validator {:?}", validator.public_key.to_bytes()); + + let is_finished = process_candidate_signature_response( + cloned_session.clone(), + short_id, + signatures.signatures, + cloned_listeners.clone(), + ) + .await?; + + if is_finished { + trace!(target: tracing_targets::VALIDATOR, "Validation is finished for block {:?}", short_id); + token_clone.cancel(); + return Ok(()); + } + } + } + Err(e) => { + warn!(target: tracing_targets::VALIDATOR, "Error receiving signatures from validator {:?}: {:?}", validator.public_key.to_bytes(), e); + let delay = base_delay_ms * 2_u64.pow(attempt); + tokio::time::sleep(Duration::from_millis(delay)).await; + attempt += 1; + } + Ok(Err(e)) => { + warn!(target: tracing_targets::VALIDATOR, "Error receiving signatures from validator {:?}: {:?}", validator.public_key.to_bytes(), e); + let delay = base_delay_ms * 2_u64.pow(attempt); + tokio::time::sleep(Duration::from_millis(delay)).await; + attempt += 1; + } + } + tokio::time::sleep(Duration::from_millis(base_delay_ms)).await; + } + }); + + handlers.push(handler); + } + + let results = futures_util::future::join_all(handlers).await; + results + .into_iter() + .collect::, _>>() + .context("One or more validation tasks failed")?; + Ok(()) +} + +pub async fn process_candidate_signature_response( + session: Arc, + block_id_short: BlockIdShort, + signatures: Vec<([u8; 32], [u8; 64])>, + listeners: Vec>, +) -> Result { + trace!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Processing candidate signature response"); + let validation_status = session.get_validation_status(&block_id_short).await?; + trace!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Validation status: {:?}", validation_status); + if validation_status == ValidationResult::Valid + || validation_status == ValidationResult::Invalid + { + debug!( + "Validation status is already set for block {:?}.", + block_id_short + ); + return Ok(true); + } + + if session.get_block(&block_id_short).await.is_some() { + session + .process_signatures_and_update_status(block_id_short, signatures, listeners) + .await?; + } else { + trace!(target: tracing_targets::VALIDATOR, "Caching signatures for block {:?}", block_id_short); + if block_id_short.seqno > 0 { + let previous_block = + BlockIdShort::from((block_id_short.shard, block_id_short.seqno - 1)); + let previous_block = session.get_block(&previous_block).await; + + if previous_block.is_some() { + session + .add_cached_signatures( + &block_id_short, + signatures + .into_iter() + .map(|(k, v)| (HashBytes(k), Signature(v))) + .collect(), + ) + .await; + } + } } + Ok(false) } diff --git a/collator/src/validator/validator_processor.rs b/collator/src/validator/validator_processor.rs deleted file mode 100644 index a4fb9fff2..000000000 --- a/collator/src/validator/validator_processor.rs +++ /dev/null @@ -1,590 +0,0 @@ -use std::sync::Arc; -use std::time::Duration; - -use anyhow::{anyhow, bail, Context, Result}; -use async_trait::async_trait; -use everscale_crypto::ed25519::KeyPair; -use everscale_types::cell::HashBytes; -use everscale_types::models::{BlockId, BlockIdShort, Signature}; -use tokio::sync::broadcast; -use tracing::warn; -use tracing::{debug, trace}; - -use crate::types::{BlockSignatures, OnValidatedBlockEvent, ValidatorNetwork}; -use tycho_block_util::state::ShardStateStuff; -use tycho_network::{OverlayId, PeerId, PrivateOverlay, Request}; -use tycho_util::FastHashMap; - -use crate::validator::network::dto::SignaturesQuery; -use crate::validator::network::network_service::NetworkService; -use crate::validator::state::{ValidationState, ValidationStateStdImpl}; -use crate::validator::types::{ - BlockValidationCandidate, OverlayNumber, ValidationResult, ValidationSessionInfo, -}; -use crate::{ - method_to_async_task_closure, state_node::StateNodeAdapter, tracing_targets, - utils::async_queued_dispatcher::AsyncQueuedDispatcher, -}; - -use super::{ValidatorEventEmitter, ValidatorEventListener}; - -const NETWORK_TIMEOUT: Duration = Duration::from_millis(1000); -const INITIAL_BACKOFF: Duration = Duration::from_millis(100); -const MAX_BACKOFF: Duration = Duration::from_secs(10); -const BACKOFF_FACTOR: u32 = 2; // Factor by which the timeout will increase - -#[derive(PartialEq, Debug)] -pub enum ValidatorTaskResult { - Void, - Signatures(FastHashMap), - ValidationStatus(ValidationResult), -} - -#[derive(Debug, Clone, PartialEq)] -pub struct StopMessage { - block_id: BlockId, -} - -#[allow(private_bounds)] -#[async_trait] -pub trait ValidatorProcessor: ValidatorEventEmitter + Sized + Send + Sync + 'static -where - ST: StateNodeAdapter, -{ - fn new( - dispatcher: Arc>, - listener: Arc, - state_node_adapter: Arc, - network: ValidatorNetwork, - ) -> Self; - - fn get_dispatcher(&self) -> Arc>; - - async fn try_add_session( - &mut self, - session: Arc, - ) -> Result; - - /// Start block candidate validation process - async fn start_candidate_validation( - &mut self, - candidate_id: BlockId, - session_seqno: u32, - current_validator_keypair: KeyPair, - ) -> Result; - - async fn stop_candidate_validation(&self, candidate_id: BlockId) - -> Result; - - async fn enqueue_process_new_mc_block_state( - &self, - mc_state: Arc, - ) -> Result<()>; - - async fn process_candidate_signature_response( - &mut self, - session_seqno: u32, - block_id_short: BlockIdShort, - signatures: Vec<([u8; 32], [u8; 64])>, - ) -> Result; - - async fn validate_candidate_by_block_from_bc( - &mut self, - candidate_id: BlockId, - ) -> Result { - self.on_block_validated_event(candidate_id, OnValidatedBlockEvent::ValidByState) - .await?; - Ok(ValidatorTaskResult::Void) - } - async fn get_block_signatures( - &mut self, - session_seqno: u32, - block_id_short: &BlockIdShort, - ) -> Result; - - async fn validate_candidate( - &mut self, - candidate_id: BlockId, - session_seqno: u32, - current_validator_pubkey: everscale_crypto::ed25519::PublicKey, - ) -> Result; - async fn get_validation_status( - &mut self, - session_seqno: u32, - block_id_short: &BlockIdShort, - ) -> Result; -} - -pub struct ValidatorProcessorStdImpl -where - ST: StateNodeAdapter, -{ - dispatcher: Arc>, - listener: Arc, - validation_state: ValidationStateStdImpl, - state_node_adapter: Arc, - network: ValidatorNetwork, - stop_sender: broadcast::Sender, -} - -#[async_trait] -impl ValidatorEventEmitter for ValidatorProcessorStdImpl -where - ST: StateNodeAdapter, -{ - async fn on_block_validated_event( - &self, - block: BlockId, - event: OnValidatedBlockEvent, - ) -> Result<()> { - self.listener.on_block_validated(block, event).await - } -} - -#[async_trait] -impl ValidatorProcessor for ValidatorProcessorStdImpl -where - ST: StateNodeAdapter, -{ - fn new( - dispatcher: Arc>, - listener: Arc, - state_node_adapter: Arc, - network: ValidatorNetwork, - ) -> Self { - let (stop_sender, _) = broadcast::channel(1000); - let validation_state = ValidationStateStdImpl::new(); - Self { - dispatcher, - listener, - state_node_adapter, - validation_state, - network, - stop_sender, - } - } - - fn get_dispatcher(&self) -> Arc> { - self.dispatcher.clone() - } - - async fn try_add_session( - &mut self, - session: Arc, - ) -> Result { - trace!(target: tracing_targets::VALIDATOR, "Trying to add session seqno {:?}", session.seqno); - if self.validation_state.get_session(session.seqno).is_none() { - let (peer_resolver, local_peer_id) = { - let network = self.network.clone(); - ( - network.clone().peer_resolver, - network.dht_client.network().peer_id().0, - ) - }; - - let overlay_id = OverlayNumber { - session_seqno: session.seqno, - }; - trace!(target: tracing_targets::VALIDATOR, overlay_id = ?session.seqno, "Creating private overlay"); - let overlay_id = OverlayId(tl_proto::hash(overlay_id)); - let network_service = NetworkService::new(self.get_dispatcher().clone()); - - let private_overlay = PrivateOverlay::builder(overlay_id) - .with_peer_resolver(peer_resolver) - .build(network_service); - - let overlay_added = self - .network - .overlay_service - .add_private_overlay(&private_overlay); - - if !overlay_added { - panic!("Failed to add private overlay"); - } - - self.validation_state - .add_session(session.clone(), private_overlay.clone()); - - let mut entries = private_overlay.write_entries(); - - for validator in session.validators.values() { - if validator.public_key.to_bytes() == local_peer_id { - continue; - } - entries.insert(&PeerId(validator.public_key.to_bytes())); - trace!(target: tracing_targets::VALIDATOR, validator_pubkey = ?validator.public_key.as_bytes(), "Added validator to overlay"); - } - } - trace!(target: tracing_targets::VALIDATOR, "Session seqno {:?} added", session.seqno); - Ok(ValidatorTaskResult::Void) - } - - /// Start block candidate validation process - async fn start_candidate_validation( - &mut self, - candidate_id: BlockId, - session_seqno: u32, - current_validator_keypair: KeyPair, - ) -> Result { - let mut stop_receiver = self.stop_sender.subscribe(); - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Starting candidate validation"); - - // Simplify session retrieval with clear, concise error handling. - let session = self - .validation_state - .get_mut_session(session_seqno) - .ok_or_else(|| anyhow!("Failed to start candidate validation. Session not found"))?; - - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Signing block"); - - let our_signature = sign_block(¤t_validator_keypair, &candidate_id)?; - let current_validator_signature = - HashBytes(current_validator_keypair.public_key.to_bytes()); - - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Adding block to session"); - session.add_block(candidate_id)?; - - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Adding our signature to session"); - - let enqueue_task_result = self - .process_candidate_signature_response( - session_seqno, - candidate_id.as_short_id(), - vec![(current_validator_signature.0, our_signature.0)], - ) - .await; - - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Enqueued task for processing signatures response"); - if let Err(e) = enqueue_task_result { - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Failed to enqueue task for processing signatures response {e:?}"); - bail!("Failed to enqueue task for processing signatures response {e:?}"); - } - - let session = self - .validation_state - .get_session(session_seqno) - .ok_or_else(|| anyhow!("Failed to start candidate validation. Session not found"))?; - - let validation_status = session.validation_status(&candidate_id.as_short_id()); - - if validation_status == ValidationResult::Valid - || validation_status == ValidationResult::Invalid - { - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Validation status is already set for block {:?}", candidate_id); - return Ok(ValidatorTaskResult::Void); - } - - let dispatcher = self.get_dispatcher().clone(); - let current_validator_pubkey = current_validator_keypair.public_key; - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Starting validation loop"); - - tokio::spawn(async move { - let mut iteration = 0; - loop { - let interval_duration = if iteration == 0 { - Duration::from_millis(0) - } else { - let exponential_backoff = INITIAL_BACKOFF * BACKOFF_FACTOR.pow(iteration - 1); - - if exponential_backoff > MAX_BACKOFF { - MAX_BACKOFF - } else { - exponential_backoff - } - }; - - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, interval = ?interval_duration, "Waiting for next validation attempt"); - - let dispatcher_clone = dispatcher.clone(); - let cloned_candidate = candidate_id; - - tokio::select! { - Ok(message) = stop_receiver.recv() => { - if message.block_id == cloned_candidate { - trace!(target: tracing_targets::VALIDATOR, "Stopping validation for block {:?}", cloned_candidate); - break; - } - }, - _ = tokio::time::sleep(interval_duration) => { - - let validation_task_result = dispatcher_clone.enqueue_task_with_responder( - method_to_async_task_closure!( - get_validation_status, - session_seqno, - &cloned_candidate.as_short_id()) - ).await; - - trace!(target: tracing_targets::VALIDATOR, block = %cloned_candidate, "Enqueued task for getting validation status"); - - match validation_task_result { - Ok(receiver) => match receiver.await.unwrap() { - Ok(ValidatorTaskResult::ValidationStatus(validation_status)) => { - if validation_status == ValidationResult::Valid || validation_status == ValidationResult::Invalid { - trace!(target: tracing_targets::VALIDATOR, "Validation status is already set for block {:?}", cloned_candidate); - break; - } - - trace!(target: tracing_targets::VALIDATOR, block = %cloned_candidate, "Validation status is not set yet. Enqueueing validation task"); - dispatcher_clone.enqueue_task(method_to_async_task_closure!( - validate_candidate, - cloned_candidate, - session_seqno, - current_validator_pubkey - )).await.expect("Failed to validate candidate"); - trace!(target: tracing_targets::VALIDATOR, block = %cloned_candidate, "Enqueued validation task"); - }, - Ok(e) => panic!("Unexpected response from get_validation_status: {:?}", e), - Err(e) => panic!("Failed to get validation status: {:?}", e), - }, - Err(e) => panic!("Failed to enqueue validation task: {:?}", e), - } - } - } - iteration += 1; - } - }); - - Ok(ValidatorTaskResult::Void) - } - - async fn stop_candidate_validation( - &self, - candidate_id: BlockId, - ) -> Result { - self.stop_sender.send(StopMessage { - block_id: candidate_id, - })?; - Ok(ValidatorTaskResult::Void) - } - - async fn enqueue_process_new_mc_block_state( - &self, - _mc_state: Arc, - ) -> Result<()> { - todo!() - } - - async fn process_candidate_signature_response( - &mut self, - session_seqno: u32, - block_id_short: BlockIdShort, - signatures: Vec<([u8; 32], [u8; 64])>, - ) -> Result { - trace!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Processing candidate signature response"); - // Simplified session retrieval - let session = self - .validation_state - .get_mut_session(session_seqno) - .context("failed to process_candidate_signature_response. session not found")?; - - // Check if validation status is already determined - let validation_status = session.validation_status(&block_id_short); - if validation_status == ValidationResult::Valid - || validation_status == ValidationResult::Invalid - { - debug!( - "Validation status is already set for block {:?}.", - block_id_short - ); - return Ok(ValidatorTaskResult::Void); - } - - if let Some(block) = session.get_block(&block_id_short).cloned() { - // Process each signature for the existing block - for (pub_key_bytes, sig_bytes) in signatures { - let validator_id = HashBytes(pub_key_bytes); - let signature = Signature(sig_bytes); - let block_validation_candidate = BlockValidationCandidate::from(block); - - let is_valid = session - .get_validation_session_info() - .validators - .get(&validator_id) - .context("validator not found")? - .public_key - .verify(block_validation_candidate.as_bytes(), &signature.0); - - session.add_signature(&block, validator_id, signature, is_valid); - } - - match session.validation_status(&block_id_short) { - ValidationResult::Valid => { - let signatures = BlockSignatures { - signatures: session.get_valid_signatures(&block_id_short), - }; - - self.on_block_validated_event(block, OnValidatedBlockEvent::Valid(signatures)) - .await?; - } - ValidationResult::Invalid => { - trace!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Block is invalid"); - self.on_block_validated_event(block, OnValidatedBlockEvent::Invalid) - .await?; - } - ValidationResult::Insufficient(total_valid_weight, valid_weight) => { - trace!( - "Insufficient signatures for block {:?}. Total valid weight: {}. Required weight: {}", - block_id_short, - total_valid_weight, - valid_weight - ); - } - } - } else { - // add signatures to cache if previous block exists - let previous_block = BlockIdShort::from((block_id_short.shard, block_id_short.seqno)); - let previous_block = session.get_block(&previous_block); - let blocks_count = session.blocks_count(); - - if blocks_count == 0 || previous_block.is_some() { - session.add_cached_signatures( - &block_id_short, - signatures - .into_iter() - .map(|(k, v)| (HashBytes(k), Signature(v))) - .collect(), - ); - } - } - Ok(ValidatorTaskResult::Void) - } - - async fn get_block_signatures( - &mut self, - session_seqno: u32, - block_id_short: &BlockIdShort, - ) -> Result { - let session = self - .validation_state - .get_session(session_seqno) - .context("session not found")?; - let signatures = session.get_valid_signatures(block_id_short); - Ok(ValidatorTaskResult::Signatures(signatures)) - } - - async fn validate_candidate( - &mut self, - candidate_id: BlockId, - session_seqno: u32, - current_validator_pubkey: everscale_crypto::ed25519::PublicKey, - ) -> Result { - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Validating candidate"); - let block_id_short = candidate_id.as_short_id(); - - let validation_state = &self.validation_state; - let session = validation_state - .get_session(session_seqno) - .ok_or(anyhow!("Session not found"))?; - - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Getting validators"); - let dispatcher = self.get_dispatcher(); - let state_node_adapter = self.state_node_adapter.clone(); - - let validators = session.validators_without_signatures(&block_id_short); - - let private_overlay = session.get_overlay().clone(); - let current_signatures = session.get_valid_signatures(&candidate_id.as_short_id()); - let network = self.network.clone(); - - tokio::spawn(async move { - let block_from_state = state_node_adapter - .load_block_handle(&candidate_id) - .await - .expect("Failed to load block from state"); - - if block_from_state.is_some() { - let result = dispatcher - .clone() - .enqueue_task(method_to_async_task_closure!( - validate_candidate_by_block_from_bc, - candidate_id - )) - .await; - - if let Err(e) = result { - panic!("Failed to validate block by state {e:?}"); - } - } else { - let payload = SignaturesQuery::create( - session_seqno, - candidate_id.as_short_id(), - ¤t_signatures, - ); - - for validator in validators { - let cloned_private_overlay = private_overlay.clone(); - let cloned_network = network.dht_client.network().clone(); - let cloned_payload = Request::from_tl(payload.clone()); - let cloned_dispatcher = dispatcher.clone(); - tokio::spawn(async move { - if validator.public_key != current_validator_pubkey { - trace!(target: tracing_targets::VALIDATOR, validator_pubkey=?validator.public_key.as_bytes(), "trying to send request for getting signatures from validator"); - - let response = tokio::time::timeout( - NETWORK_TIMEOUT, - cloned_private_overlay.query( - &cloned_network, - &PeerId(validator.public_key.to_bytes()), - cloned_payload, - ), - ) - .await; - - match response { - Ok(Ok(response)) => { - let response = response.parse_tl::(); - trace!(target: tracing_targets::VALIDATOR, "Received response from overlay"); - match response { - Ok(signatures) => { - let enqueue_task_result = cloned_dispatcher - .enqueue_task(method_to_async_task_closure!( - process_candidate_signature_response, - signatures.session_seqno, - signatures.block_id_short, - signatures.signatures - )) - .await; - trace!(target: tracing_targets::VALIDATOR, "Enqueued task for processing signatures response"); - if let Err(e) = enqueue_task_result { - panic!("Failed to enqueue task for processing signatures response: {e}"); - } - } - Err(e) => { - panic!("Failed convert signatures response to SignaturesQuery: {e}"); - } - } - } - Ok(Err(e)) => { - warn!("Failed to get response from overlay: {e}"); - } - Err(e) => { - warn!("Network request timed out: {e}"); - } - } - } - }); - } - } - }); - Ok(ValidatorTaskResult::Void) - } - - async fn get_validation_status( - &mut self, - session_seqno: u32, - block_id_short: &BlockIdShort, - ) -> Result { - let session = self - .validation_state - .get_session(session_seqno) - .context("session not found")?; - let validation_status = session.validation_status(block_id_short); - Ok(ValidatorTaskResult::ValidationStatus(validation_status)) - } -} - -fn sign_block(key_pair: &KeyPair, block: &BlockId) -> Result { - let block_validation_candidate = BlockValidationCandidate::from(*block); - let signature = Signature(key_pair.sign(block_validation_candidate.as_bytes())); - Ok(signature) -} diff --git a/collator/tests/collation_tests.rs b/collator/tests/collation_tests.rs index 29afa1258..bcf5552cd 100644 --- a/collator/tests/collation_tests.rs +++ b/collator/tests/collation_tests.rs @@ -2,7 +2,6 @@ use everscale_types::models::GlobalCapability; use tycho_block_util::state::MinRefMcStateTracker; use tycho_collator::test_utils::prepare_test_storage; -use tycho_collator::validator_test_impl::ValidatorProcessorTestImpl; use tycho_collator::{ manager::CollationManager, mempool::{MempoolAdapterBuilder, MempoolAdapterBuilderStdImpl, MempoolAdapterStdImpl}, @@ -52,11 +51,7 @@ async fn test_collation_process_on_stubs() { let node_network = tycho_collator::test_utils::create_node_network(); - let _manager = tycho_collator::manager::create_std_manager_with_validator::< - _, - _, - ValidatorProcessorTestImpl<_>, - >( + let _manager = tycho_collator::manager::create_std_manager_with_validator::<_, _>( config, mpool_adapter_builder, state_node_adapter_builder, diff --git a/collator/tests/validator_tests.rs b/collator/tests/validator_tests.rs index c3c80fb22..7f250cc0e 100644 --- a/collator/tests/validator_tests.rs +++ b/collator/tests/validator_tests.rs @@ -1,6 +1,7 @@ use std::collections::HashMap; use std::net::Ipv4Addr; use std::str::FromStr; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use async_trait::async_trait; @@ -13,20 +14,21 @@ use everscale_crypto::ed25519::KeyPair; use everscale_types::models::{BlockId, ValidatorDescription}; use rand::prelude::ThreadRng; use tokio::sync::{Mutex, Notify}; +use tokio::time::sleep; -use tracing::debug; +use tracing::{debug, error}; use tycho_block_util::block::ValidatorSubsetInfo; use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_collator::state_node::{ - StateNodeAdapterBuilder, StateNodeAdapterBuilderStdImpl, StateNodeEventListener, + StateNodeAdapter, StateNodeAdapterBuilder, StateNodeAdapterBuilderStdImpl, + StateNodeAdapterStdImpl, StateNodeEventListener, }; use tycho_collator::test_utils::{prepare_test_storage, try_init_test_tracing}; use tycho_collator::types::{CollationSessionInfo, OnValidatedBlockEvent, ValidatorNetwork}; use tycho_collator::validator::state::{ValidationState, ValidationStateStdImpl}; use tycho_collator::validator::types::ValidationSessionInfo; use tycho_collator::validator::validator::{Validator, ValidatorEventListener, ValidatorStdImpl}; -use tycho_collator::validator::validator_processor::ValidatorProcessorStdImpl; use tycho_core::block_strider::state::BlockStriderState; use tycho_core::block_strider::subscriber::test::PrintSubscriber; use tycho_core::block_strider::{prepare_state_apply, BlockStrider}; @@ -40,27 +42,29 @@ pub struct TestValidatorEventListener { notify: Arc, expected_notifications: Mutex, received_notifications: Mutex, + global_validated_blocks: Arc, } impl TestValidatorEventListener { - pub fn new(expected_count: u32) -> Arc { + pub fn new(expected_count: u32, global_validated_blocks: Arc) -> Arc { Arc::new(Self { validated_blocks: Mutex::new(vec![]), notify: Arc::new(Notify::new()), expected_notifications: Mutex::new(expected_count), received_notifications: Mutex::new(0), + global_validated_blocks, }) } pub async fn increment_and_check(&self) { let mut received = self.received_notifications.lock().await; *received += 1; + error!( + "received: {}, expected: {}", + *received, + *self.expected_notifications.lock().await + ); if *received == *self.expected_notifications.lock().await { - println!( - "received: {}, expected: {}", - *received, - *self.expected_notifications.lock().await - ); self.notify.notify_one(); } } @@ -72,9 +76,15 @@ impl ValidatorEventListener for TestValidatorEventListener { &self, block_id: BlockId, _event: OnValidatedBlockEvent, - ) -> anyhow::Result<()> { + ) -> Result<()> { let mut validated_blocks = self.validated_blocks.lock().await; - validated_blocks.push(block_id); + if validated_blocks.contains(&block_id) { + return Ok(()); + } + + let current_count = self.global_validated_blocks.fetch_add(1, Ordering::SeqCst); + debug!("Block validated, new global count: {}", current_count); + self.increment_and_check().await; Ok(()) } @@ -88,8 +98,8 @@ impl StateNodeEventListener for TestValidatorEventListener { async fn on_block_accepted_external( &self, - block_id: &BlockId, - state: Option>, + _block_id: &BlockId, + _state: Option>, ) -> Result<()> { unimplemented!("Not implemented"); } @@ -165,7 +175,9 @@ fn make_network(node_count: usize) -> Vec { #[tokio::test] async fn test_validator_accept_block_by_state() -> anyhow::Result<()> { - let test_listener = TestValidatorEventListener::new(1); + let global_validated_blocks = Arc::new(AtomicUsize::new(0)); + + let test_listener = TestValidatorEventListener::new(1, global_validated_blocks); let _state_node_event_listener: Arc = test_listener.clone(); let (provider, storage) = prepare_test_storage().await.unwrap(); @@ -215,16 +227,15 @@ async fn test_validator_accept_block_by_state() -> anyhow::Result<()> { dht_client, }; - let validator = ValidatorStdImpl::, _>::create( - test_listener.clone(), + let validator = ValidatorStdImpl::<_>::create( + vec![test_listener.clone()], state_node_adapter, validator_network, + KeyPair::generate(&mut ThreadRng::default()), ); - let v_keypair = KeyPair::generate(&mut ThreadRng::default()); - let validator_description = ValidatorDescription { - public_key: v_keypair.public_key.to_bytes().into(), + public_key: validator.get_keypair().public_key.to_bytes().into(), weight: 1, adnl_addr: None, mc_seqno_since: 0, @@ -257,13 +268,10 @@ async fn test_validator_accept_block_by_state() -> anyhow::Result<()> { let validation_session = Arc::new(ValidationSessionInfo::try_from(collator_session_info.clone()).unwrap()); - validator - .enqueue_add_session(validation_session) - .await - .unwrap(); + validator.add_session(validation_session).await.unwrap(); validator - .enqueue_candidate_validation(block_id, collator_session_info.seqno(), v_keypair) + .validate(block_id, collator_session_info.seqno()) .await .unwrap(); @@ -277,23 +285,55 @@ async fn test_validator_accept_block_by_state() -> anyhow::Result<()> { Ok(()) } +fn create_blocks(amount: u32) -> Vec { + let mut blocks = vec![]; + for i in 0..amount { + blocks.push(BlockId { + shard: Default::default(), + seqno: i, + root_hash: Default::default(), + file_hash: Default::default(), + }); + } + blocks +} #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn test_validator_accept_block_by_network() -> Result<()> { try_init_test_tracing(tracing_subscriber::filter::LevelFilter::DEBUG); tycho_util::test::init_logger("test_validator_accept_block_by_network"); - let network_nodes = make_network(13); - let blocks_amount = 1000; - let sessions = 1; + let node_count = 13u32; + let network_nodes = make_network(node_count as usize); + let blocks_amount = 100u32; + let sessions = 1u32; + let max_concurrent_blocks = 1; // Limit to processing ten blocks at a time + let required_validations = blocks_amount * node_count; // Total required validations for all validators together + let global_validated_blocks = Arc::new(AtomicUsize::new(0)); - let mut validators = vec![]; - let mut listeners = vec![]; // Track listeners for later validation + let mut tasks = vec![]; - for node in network_nodes { - // Create a unique listener for each validator - let test_listener = TestValidatorEventListener::new(blocks_amount * sessions); - listeners.push(test_listener.clone()); + let mut validators_descriptions = Vec::new(); + for node in &network_nodes { + let peer_id = node.network.peer_id(); + validators_descriptions.push(ValidatorDescription { + public_key: (*peer_id.as_bytes()).into(), + weight: 1, + adnl_addr: None, + mc_seqno_since: 0, + prev_total_weight: 0, + }); + } + let validators_subset_info = ValidatorSubsetInfo { + validators: validators_descriptions, + short_hash: 0, + }; + + for node in network_nodes { + let test_listener = TestValidatorEventListener::new( + blocks_amount * sessions, + global_validated_blocks.clone(), + ); let state_node_adapter = Arc::new( StateNodeAdapterBuilderStdImpl::new(build_tmp_storage()?).build(test_listener.clone()), ); @@ -303,96 +343,90 @@ async fn test_validator_accept_block_by_network() -> Result<()> { dht_client: node.dht_client.clone(), peer_resolver: node.peer_resolver.clone(), }; - let validator = ValidatorStdImpl::, _>::create( - test_listener.clone(), + + let validator = Arc::new(ValidatorStdImpl::<_>::create( + vec![test_listener.clone()], state_node_adapter, network, - ); - validators.push((validator, node)); + node.keypair.clone(), + )); + + let semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_blocks)); + let task = tokio::spawn(handle_validator( + validator, + semaphore.clone(), + test_listener, + blocks_amount, + sessions, + validators_subset_info.clone(), + required_validations, + global_validated_blocks.clone(), + )); + tasks.push(task); } - let mut validators_descriptions = vec![]; - for (_, node) in &validators { - let peer_id = node.network.peer_id(); - validators_descriptions.push(ValidatorDescription { - public_key: (*peer_id.as_bytes()).into(), - weight: 1, - adnl_addr: None, - mc_seqno_since: 0, - prev_total_weight: 0, - }); + // Await all validator tasks to complete + for task in tasks { + task.await?; } - let validators_subset_info = ValidatorSubsetInfo { - validators: validators_descriptions, - short_hash: 0, - }; + // Assert that all validations are completed as expected + assert_eq!( + global_validated_blocks.load(Ordering::SeqCst), + required_validations as usize, + "Not all required validations were completed" + ); + Ok(()) +} + +async fn handle_validator( + validator: Arc>, + semaphore: Arc, + listener: Arc, + blocks_amount: u32, + sessions: u32, + validators_subset_info: ValidatorSubsetInfo, + required_validations: u32, + global_validated_blocks: Arc, +) -> Result<()> { for session in 1..=sessions { let blocks = create_blocks(blocks_amount); - - for (validator, _node) in &validators { - let collator_session_info = Arc::new(CollationSessionInfo::new( - session, - validators_subset_info.clone(), - Some(_node.keypair), // Ensure you use the node's keypair correctly here - )); - // Assuming this setup is correct and necessary for each validator - - let validation_session = - Arc::new(ValidationSessionInfo::try_from(collator_session_info.clone()).unwrap()); - validator - .enqueue_add_session(validation_session) - .await - .unwrap(); - } - - let mut i = 0; - for block in blocks.iter() { - i += 1; - for (validator, _node) in &validators { - let collator_session_info = Arc::new(CollationSessionInfo::new( - session, - validators_subset_info.clone(), - Some(_node.keypair), // Ensure you use the node's keypair correctly here - )); - - if i % 10 == 0 { - tokio::time::sleep(Duration::from_millis(10)).await; - } - validator - .enqueue_candidate_validation( - *block, - collator_session_info.seqno(), - *collator_session_info.current_collator_keypair().unwrap(), - ) + let collator_session_info = Arc::new(CollationSessionInfo::new( + session, + validators_subset_info.clone(), + Some(validator.get_keypair().clone()), // Assuming you have access to node's keypair here + )); + + validator + .add_session(Arc::new( + ValidationSessionInfo::try_from(collator_session_info.clone()).unwrap(), + )) + .await?; + + for block in blocks { + let block_clone = block.clone(); + let collator_info_clone = collator_session_info.clone(); + let v = validator.clone(); + + let permit = semaphore.clone().acquire_owned().await.unwrap(); + tokio::spawn(async move { + v.validate(block_clone, collator_info_clone.seqno()) .await .unwrap(); - } + drop(permit); + }); } } - for listener in listeners { - listener.notify.notified().await; - let validated_blocks = listener.validated_blocks.lock().await; - assert_eq!( - validated_blocks.len() as u32, - sessions * blocks_amount, - "Expected each validator to validate the block once." + while global_validated_blocks.load(Ordering::SeqCst) < required_validations as usize { + debug!( + "Validator wait: {:?}", + global_validated_blocks.load(Ordering::SeqCst) ); + sleep(Duration::from_millis(100)).await; } - Ok(()) -} -fn create_blocks(amount: u32) -> Vec { - let mut blocks = vec![]; - for i in 0..amount { - blocks.push(BlockId { - shard: Default::default(), - seqno: i, - root_hash: Default::default(), - file_hash: Default::default(), - }); - } - blocks + listener.notify.notified().await; + Ok(()) } From e4b478cb354717ba09609e26a931d1cd443fc126 Mon Sep 17 00:00:00 2001 From: Maksim Greshniakov Date: Wed, 29 Nov 2023 12:42:46 +0100 Subject: [PATCH 2/6] review fixes --- core/src/queue/queue.rs | 49 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 core/src/queue/queue.rs diff --git a/core/src/queue/queue.rs b/core/src/queue/queue.rs new file mode 100644 index 000000000..ad8167b89 --- /dev/null +++ b/core/src/queue/queue.rs @@ -0,0 +1,49 @@ +use std::path::PathBuf; + +type ShardIdent = i64; +type Lt = u64; + +// need to load types from types crate +type MessageHash = String; +type Address = String; + +struct QueueDiff {} +struct Message {} +struct QueueIterator {} + +struct MessageEnvelope { + lt: u64, + hash: String, + message: Message, + from_contract: Address, + to_contract: Address, +} + +trait MessageQueue { + // Factory methods for initialization and loading + fn init(directory: PathBuf, shard_id: ShardIdent, load_if_exists: bool) -> Self; + + // Methods for queue management + fn apply_diff(&mut self, diff: QueueDiff); + fn save_to_storage(&self); + fn commit_current_state(&mut self); + + // Differential and state management + fn get_current_diff(&self) -> Option; + fn undo_state_to_block(&mut self, diff: Vec); + + // Message handling + fn add_message(&mut self, message: MessageEnvelope); + fn add_processed_upto(&mut self, lt: Lt, hash: MessageHash); + + // Queue navigation + fn create_iterator(&self) -> QueueIterator; +} + +impl Iterator for QueueIterator { + type Item = Message; + + fn next(&mut self) -> Option { + todo!() + } +} From 75490cc4578f54d9d2830dc8faddf8371f619ed5 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Mon, 8 Jan 2024 09:19:08 +0000 Subject: [PATCH 3/6] mod renamed --- core/src/lib.rs | 1 + core/src/msg_queue/mod.rs | 1 + core/src/{queue => msg_queue}/queue.rs | 0 3 files changed, 2 insertions(+) create mode 100644 core/src/msg_queue/mod.rs rename core/src/{queue => msg_queue}/queue.rs (100%) diff --git a/core/src/lib.rs b/core/src/lib.rs index fe1f80a1e..995bd840a 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,2 +1,3 @@ pub mod block_strider; pub mod internal_queue; +mod msg_queue; diff --git a/core/src/msg_queue/mod.rs b/core/src/msg_queue/mod.rs new file mode 100644 index 000000000..ae2004c85 --- /dev/null +++ b/core/src/msg_queue/mod.rs @@ -0,0 +1 @@ +mod queue; diff --git a/core/src/queue/queue.rs b/core/src/msg_queue/queue.rs similarity index 100% rename from core/src/queue/queue.rs rename to core/src/msg_queue/queue.rs From 7cab6d2c7679f93e38093b8e95c5323d1bb48c82 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Tue, 9 Jan 2024 21:15:17 +0000 Subject: [PATCH 4/6] new msg queue module structure --- core/src/msg_queue/cache_persistent.rs | 68 +++++ core/src/msg_queue/cache_persistent_fs.rs | 0 core/src/msg_queue/config.rs | 31 +++ core/src/msg_queue/diff_mgmt.rs | 0 core/src/msg_queue/iterator.rs | 70 +++++ core/src/msg_queue/loader.rs | 26 ++ core/src/msg_queue/mod.rs | 17 ++ core/src/msg_queue/queue.rs | 260 +++++++++++++++--- core/src/msg_queue/state_persistent.rs | 38 +++ core/src/msg_queue/state_persistent_fs.rs | 0 core/src/msg_queue/storage.rs | 41 +++ core/src/msg_queue/storage_rocksdb.rs | 0 .../msg_queue/tests/test_cache_persistent.rs | 19 ++ core/src/msg_queue/tests/test_config.rs | 25 ++ core/src/msg_queue/tests/test_queue.rs | 8 + core/src/msg_queue/types.rs | 57 ++++ 16 files changed, 627 insertions(+), 33 deletions(-) create mode 100644 core/src/msg_queue/cache_persistent.rs create mode 100644 core/src/msg_queue/cache_persistent_fs.rs create mode 100644 core/src/msg_queue/config.rs create mode 100644 core/src/msg_queue/diff_mgmt.rs create mode 100644 core/src/msg_queue/iterator.rs create mode 100644 core/src/msg_queue/loader.rs create mode 100644 core/src/msg_queue/state_persistent.rs create mode 100644 core/src/msg_queue/state_persistent_fs.rs create mode 100644 core/src/msg_queue/storage.rs create mode 100644 core/src/msg_queue/storage_rocksdb.rs create mode 100644 core/src/msg_queue/tests/test_cache_persistent.rs create mode 100644 core/src/msg_queue/tests/test_config.rs create mode 100644 core/src/msg_queue/tests/test_queue.rs create mode 100644 core/src/msg_queue/types.rs diff --git a/core/src/msg_queue/cache_persistent.rs b/core/src/msg_queue/cache_persistent.rs new file mode 100644 index 000000000..82f55e0c5 --- /dev/null +++ b/core/src/msg_queue/cache_persistent.rs @@ -0,0 +1,68 @@ +use std::{any::Any, fmt::Debug}; + +use anyhow::{anyhow, Result}; + +use super::{state_persistent::PersistentStateService, storage::StorageService, MessageQueueImpl}; + +#[cfg(test)] +#[path = "tests/test_cache_persistent.rs"] +pub(super) mod tests; + +pub trait PersistentCacheService: Debug { + fn new(config: &dyn PersistentCacheConfig) -> Result + where + Self: Sized; +} + +pub trait PersistentCacheConfig: Debug { + fn as_any(&self) -> &dyn Any; +} + +/* +This part of the code contains logic of working with persistent cache. + +We use partials just to separate the codebase on smaller and easier maintainable parts. + */ +impl MessageQueueImpl +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + fn some_internal_method_for_persistent_cache(&mut self) -> Result<()> { + todo!() + } + pub(super) fn some_moddule_internal_method_for_persistent_cache(&mut self) -> Result<()> { + todo!() + } +} + +// STUBS + +#[derive(Debug)] +pub struct PersistentCacheServiceStubImpl { + pub config: PersistentCacheConfigStubImpl, +} +impl PersistentCacheService for PersistentCacheServiceStubImpl { + fn new(config: &dyn PersistentCacheConfig) -> Result { + let config = config + .as_any() + .downcast_ref::() + .ok_or(anyhow!("error"))? + .clone(); + + let ret = Self { config }; + + Ok(ret) + } +} + +#[derive(Debug, Clone)] +pub struct PersistentCacheConfigStubImpl { + pub cfg_value1: String, +} +impl PersistentCacheConfig for PersistentCacheConfigStubImpl { + fn as_any(&self) -> &dyn Any { + self + } +} diff --git a/core/src/msg_queue/cache_persistent_fs.rs b/core/src/msg_queue/cache_persistent_fs.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/src/msg_queue/config.rs b/core/src/msg_queue/config.rs new file mode 100644 index 000000000..ff7eab6b8 --- /dev/null +++ b/core/src/msg_queue/config.rs @@ -0,0 +1,31 @@ +use super::cache_persistent::PersistentCacheConfig; + +#[cfg(test)] +#[path = "tests/test_config.rs"] +pub(super) mod tests; + +pub struct MessageQueueBaseConfig {} + +pub struct MessageQueueConfig { + base_config: MessageQueueBaseConfig, + persistent_cache_config: Box, +} + +impl MessageQueueConfig { + pub fn new( + base_config: MessageQueueBaseConfig, + persistent_cache_config: impl PersistentCacheConfig + 'static, + ) -> Self { + MessageQueueConfig { + base_config, + persistent_cache_config: Box::new(persistent_cache_config), + } + } + + pub fn base_config(&self) -> &MessageQueueBaseConfig { + &self.base_config + } + pub fn persistent_cache_config_ref(&self) -> &dyn PersistentCacheConfig { + self.persistent_cache_config.as_ref() + } +} diff --git a/core/src/msg_queue/diff_mgmt.rs b/core/src/msg_queue/diff_mgmt.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/src/msg_queue/iterator.rs b/core/src/msg_queue/iterator.rs new file mode 100644 index 000000000..98f3b537e --- /dev/null +++ b/core/src/msg_queue/iterator.rs @@ -0,0 +1,70 @@ +/* +There are 2 options to implement iteration: + 1) implement an iterator directly for the MessageQueue trait + 2) implement separate MessageQueueIterator over items in MessageQueue +(you'll find stubs for both options down) + +The next question is what kind of iterator to implement: (a) a consuming iterator +or (b) a non-consuming iterator. Finally, we should remove processed messages from +the current queue state (commit). But also we must have an option to roll back and +process messages again if the collation attempt fails. Moreover, we don't know if +we need to return the item value from the iterator or if we can return just the refs. + +When implementing a non-consuming iterator we can move items to some kind of +"remove" buffer and then clear it on commit. Or we can just remember processed +items and then clear them from the queue state. We should choose the most efficient +implementation regarding the memory and CPU utilization. We also need to consider +that the iterator should have the ability to continue iteration after the commit +with minimal overhead (we shouldn't seek for the last position). + +When implementing the separate MessageQueueIterator it should take ownership of +the source MessageQueue to lazy load more items chunks. After the iteration, we can +convert the iterator into MessageQueue back. + */ + +use super::{cache_persistent::*, state_persistent::*, storage::*, types::*, MessageQueue}; + +// Option (1) - MessageQueue implement iterator by itself + +impl<'a, CH, ST, DB> Iterator for &'a dyn MessageQueue { + type Item = &'a MessageEnvelope; + + fn next(&mut self) -> Option { + todo!() + } +} + +// Option (2) - using the separate MessageQueueIterator + +pub struct MessageQueueIterator +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + queue: Box>, +} +impl MessageQueueIterator +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + fn create_iterator(queue: impl MessageQueue + 'static) -> Self { + Self { + queue: Box::new(queue), + } + } +} +impl<'a, CH, ST, DB> Iterator for &'a MessageQueueIterator +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + type Item = &'a MessageEnvelope; + + fn next(&mut self) -> Option { + todo!() + } +} diff --git a/core/src/msg_queue/loader.rs b/core/src/msg_queue/loader.rs new file mode 100644 index 000000000..26dc9cb3a --- /dev/null +++ b/core/src/msg_queue/loader.rs @@ -0,0 +1,26 @@ +use anyhow::Result; + +use super::{ + cache_persistent::PersistentCacheService, state_persistent::PersistentStateService, + storage::StorageService, MessageQueueImpl, +}; + +/* +This code part contains the logic of messages loading to the queue state, +including lazy loading, etc. + +We use partials just to separate the codebase on smaller and easier maintainable parts. + */ +impl MessageQueueImpl +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + fn some_internal_method_for_loading_logic(&mut self) -> Result<()> { + todo!() + } + pub(super) fn some_module_internal_method_for_loading_logic(&mut self) -> Result<()> { + todo!() + } +} diff --git a/core/src/msg_queue/mod.rs b/core/src/msg_queue/mod.rs index ae2004c85..7588ec4cf 100644 --- a/core/src/msg_queue/mod.rs +++ b/core/src/msg_queue/mod.rs @@ -1 +1,18 @@ +pub mod config; +pub mod types; + +mod diff_mgmt; +mod iterator; +mod loader; mod queue; + +pub mod cache_persistent; +mod cache_persistent_fs; + +pub mod state_persistent; +mod state_persistent_fs; + +pub mod storage; +mod storage_rocksdb; + +pub use {diff_mgmt::*, iterator::*, queue::*}; diff --git a/core/src/msg_queue/queue.rs b/core/src/msg_queue/queue.rs index ad8167b89..b97deed84 100644 --- a/core/src/msg_queue/queue.rs +++ b/core/src/msg_queue/queue.rs @@ -1,49 +1,243 @@ -use std::path::PathBuf; +use anyhow::Result; -type ShardIdent = i64; -type Lt = u64; +use super::types::ext_types_stubs::*; +use super::{cache_persistent::*, config::*, state_persistent::*, storage::*, types::*}; -// need to load types from types crate -type MessageHash = String; -type Address = String; +#[cfg(test)] +#[path = "tests/test_queue.rs"] +pub(super) mod tests; -struct QueueDiff {} -struct Message {} -struct QueueIterator {} +pub trait MessageQueue +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + /// Create new queue with persistent cache, persistent state and storage + /// or init from existing storage, persistent state and cache. + /// Queue can be used only after state loading. + fn init(config: MessageQueueConfig) -> Result + where + Self: Sized; -struct MessageEnvelope { - lt: u64, - hash: String, - message: Message, - from_contract: Address, - to_contract: Address, + /// Load queue state on the specified block by id or shard+`seq_no` + /// from local persistent state and storage. + /// When called again fully reload the state according to a new block. + fn reload_state_on_block(&mut self, block_ident: BlockIdent) -> Result<()>; + + /// Add a new message to internal set. They will be used for creating diff. + /// If all existing messages in the queue state are processed and the end of + /// queue storage (EOQS) is reached, then new messages will be used for processing. + fn add_new_message(&mut self, message: EnqueuedMessage); + + /// Remeber processed message by it's LT and HASH. + /// Will be used for creating diff and to check if transaction updates could be committed. + fn add_processed_upto(&mut self, lt: Lt, hash: MessageHash); + + /// Create pending diff containing remaining new messages and top processed upto marks. + /// Diff will be stored to persistent cache. + /// + /// It is like moving current changes to the index in git. + /// + /// Future features: + /// 1. The queue will be ready for further iteration and recording a new diff. + /// + /// Simplified features: + /// 1. Further iteration and diff recording are disallowed until rollback or commit. + fn create_diff( + &mut self, + block_id: BlockId, + shard_id: ShardIdent, + block_seq_no: SeqNo, + ) -> Result>; + + /// Save diff to the storage when `no_save == false`. + /// Remove it from the persistent cache, and remove processed + /// messages related to this diff from the queue state. + /// + /// If the end of queue storage (EOQS) is reached then add the remaining + /// new messages related to this diff to the queue state for further processing. + /// + /// Future features: + /// 1. If more than one pending diff for current shard exists, eg "diff1" and "diff2", + /// and "diff2" is committed first, then we should hold the required changes and apply + /// them only after "diff1" is committed. + fn commit_diff(&mut self, diff_key: QueueDiffKey, no_save: bool) -> Result<()>; + + /// Rollback queue state changes: + /// 1. return processed messages and move back the iterator pointer + /// 2. clear processed upto info + /// 3. remove diff from persistent cache + /// 4. remove new messages + /// 5. revert other related changes + /// + /// If `by_diff` is not specified then rollback all changes from the last commit, + /// including all pending diffs. + /// + /// Future features: + /// 1. When `by_diff` is specified then rollback changes related to this diff. + /// If several pending diffs exist (eg "diff1" and "diff2") and "diff1" is specified, + /// then we should rollback changes from both "diff1" and "diff2". + fn rollback_changes(&mut self, by_diff: Option) -> Result<()>; + + /// Actions: + /// 1. store diff to persistent cache + /// 2. add new messages from diff to internal set + /// 3. mark processed messages + /// + /// The queue should look like after the executing [`MessageQueue::create_diff`]. + /// + /// Next the [`MessageQueue::commit_diff`] or [`MessageQueue::rollback_changes`] should be called. + fn apply_diff(&mut self, diff: QueueDiff) -> Result<()>; + + /// Reload queue from external persistent state: + /// 1. store submitted external persistent state + /// 2. fill queue with messages from external persistent + /// + /// Should execute it first when syncing queue from other nodes. + fn apply_persistent_state(&mut self, p_state: PersistentStateData) -> Result<()>; + + /// Get queue state data on the specified block for syncing to another node: + /// - persistent state if exists + /// - all diffs upto specified block + /// + /// This state data can be loaded to another queue using [`MessageQueue::apply_persistent_state`], + /// [`MessageQueue::apply_diff`], and [`MessageQueue::commit_diff`]. + /// + /// **Use case:** + /// We have an empty queue and know the last block, we want + /// to load the queue to match the state after this last block. + fn get_sync_state_on_block( + &self, + block_ident: BlockIdent, + ) -> Result<(Option, Vec)>; + + /// Get queue state updates for syncing from the specified block. + /// + /// May be used after [`MessageQueue::get_sync_state_on_block`]. + /// + /// **Use case:** + /// We have synced the queue to the previously specified "last block 1". While + /// we were syncing the other nodes produced more blocks, and now we have + /// a new "last block 12". So we need to sync fresh updates. + /// + /// Will return: + /// - new persistent state if it was created after the specified block + /// - all committed and pending diffs after the specified block + fn get_sync_state_from_block( + &self, + block_ident: BlockIdent, + ) -> Result<(Option, Vec)>; +} + +pub struct MessageQueueImpl +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + config: MessageQueueConfig, + + p_cache_service: CH, + p_state_service: ST, + storage_service: DB, } -trait MessageQueue { - // Factory methods for initialization and loading - fn init(directory: PathBuf, shard_id: ShardIdent, load_if_exists: bool) -> Self; +impl MessageQueue for MessageQueueImpl +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + fn init(config: MessageQueueConfig) -> Result + where + Self: Sized, + { + let p_cache_cfg = config.persistent_cache_config_ref(); + let p_cache_service = CH::new(p_cache_cfg)?; - // Methods for queue management - fn apply_diff(&mut self, diff: QueueDiff); - fn save_to_storage(&self); - fn commit_current_state(&mut self); + let p_state_service = ST::new()?; + let storage_service = DB::new()?; - // Differential and state management - fn get_current_diff(&self) -> Option; - fn undo_state_to_block(&mut self, diff: Vec); + Ok(Self { + config, + p_cache_service, + p_state_service, + storage_service, + }) + } - // Message handling - fn add_message(&mut self, message: MessageEnvelope); - fn add_processed_upto(&mut self, lt: Lt, hash: MessageHash); + fn reload_state_on_block(&mut self, block_ident: BlockIdent) -> Result<()> { + todo!() + } + + fn add_new_message(&mut self, message: EnqueuedMessage) { + todo!() + } + + fn add_processed_upto(&mut self, lt: Lt, hash: MessageHash) { + todo!() + } + + fn create_diff( + &mut self, + block_id: BlockId, + shard_id: ShardIdent, + block_seq_no: SeqNo, + ) -> Result> { + todo!() + } + + fn commit_diff(&mut self, diff_key: QueueDiffKey, no_save: bool) -> Result<()> { + todo!() + } + + fn rollback_changes(&mut self, by_diff: Option) -> Result<()> { + todo!() + } + + fn apply_diff(&mut self, diff: QueueDiff) -> Result<()> { + todo!() + } + + fn apply_persistent_state(&mut self, p_state: PersistentStateData) -> Result<()> { + todo!() + } - // Queue navigation - fn create_iterator(&self) -> QueueIterator; + fn get_sync_state_on_block( + &self, + block_ident: BlockIdent, + ) -> Result<(Option, Vec)> { + todo!() + } + + fn get_sync_state_from_block( + &self, + block_ident: BlockIdent, + ) -> Result<(Option, Vec)> { + todo!() + } } -impl Iterator for QueueIterator { - type Item = Message; +/* +This part of the code contains logic that cannot be attributed specifically +to the persistent state and cache, storage, loader, diff management. - fn next(&mut self) -> Option { +We use partials just to separate the codebase on smaller and easier maintainable parts. + */ +impl MessageQueueImpl +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + fn some_internal_method(&mut self) -> Result<()> { todo!() } } + +pub type MessageQueueImplOnStubs = MessageQueueImpl< + PersistentCacheServiceStubImpl, + PersistentStateServiceStubImpl, + StorageServiceStubImpl, +>; diff --git a/core/src/msg_queue/state_persistent.rs b/core/src/msg_queue/state_persistent.rs new file mode 100644 index 000000000..75b78dc72 --- /dev/null +++ b/core/src/msg_queue/state_persistent.rs @@ -0,0 +1,38 @@ +use std::fmt::Debug; + +use anyhow::Result; + +use super::{cache_persistent::PersistentCacheService, storage::StorageService, MessageQueueImpl}; + +pub trait PersistentStateService: Debug + Sized { + fn new() -> Result; +} + +/* +This part of the code contains logic of working with persistent state. + +We use partials just to separate the codebase on smaller and easier maintainable parts. + */ +impl MessageQueueImpl +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + fn some_internal_method_for_persistent_state(&mut self) -> Result<()> { + todo!() + } + pub(super) fn some_module_internal_method_for_persistent_state(&mut self) -> Result<()> { + todo!() + } +} + +// STUBS + +#[derive(Debug)] +pub struct PersistentStateServiceStubImpl {} +impl PersistentStateService for PersistentStateServiceStubImpl { + fn new() -> Result { + Ok(Self {}) + } +} diff --git a/core/src/msg_queue/state_persistent_fs.rs b/core/src/msg_queue/state_persistent_fs.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/src/msg_queue/storage.rs b/core/src/msg_queue/storage.rs new file mode 100644 index 000000000..422d6a0d6 --- /dev/null +++ b/core/src/msg_queue/storage.rs @@ -0,0 +1,41 @@ +use std::fmt::Debug; + +use anyhow::Result; + +use super::{ + cache_persistent::PersistentCacheService, state_persistent::PersistentStateService, + MessageQueueImpl, +}; + +pub trait StorageService: Debug + Sized { + fn new() -> Result; +} + +/* +This part of the code contains logic of working with storage. + +We use partials just to separate the codebase on smaller and easier maintainable parts. + */ +impl MessageQueueImpl +where + CH: PersistentCacheService, + ST: PersistentStateService, + DB: StorageService, +{ + fn some_internal_method_for_storage(&mut self) -> Result<()> { + todo!() + } + pub(super) fn some_module_internal_method_for_storage(&mut self) -> Result<()> { + todo!() + } +} + +// STUBS + +#[derive(Debug)] +pub struct StorageServiceStubImpl {} +impl StorageService for StorageServiceStubImpl { + fn new() -> Result { + Ok(Self {}) + } +} diff --git a/core/src/msg_queue/storage_rocksdb.rs b/core/src/msg_queue/storage_rocksdb.rs new file mode 100644 index 000000000..e69de29bb diff --git a/core/src/msg_queue/tests/test_cache_persistent.rs b/core/src/msg_queue/tests/test_cache_persistent.rs new file mode 100644 index 000000000..c55d22fbd --- /dev/null +++ b/core/src/msg_queue/tests/test_cache_persistent.rs @@ -0,0 +1,19 @@ +use super::super::config::tests::init_test_config; + +#[test] +fn test_persistent_cache_init() { + use super::{PersistentCacheService, PersistentCacheServiceStubImpl}; + + let cfg = init_test_config(); + + let p_cache_impl = + PersistentCacheServiceStubImpl::new(cfg.persistent_cache_config_ref()).unwrap(); + + println!("persistent_cache_impl.config: {:?}", p_cache_impl.config); + + assert_eq!(p_cache_impl.config.cfg_value1.as_str(), "test_value_1"); + + let p_cache_dyn: &dyn PersistentCacheService = &p_cache_impl; + + println!("persistent_cache_dyn: {:?}", p_cache_dyn); +} diff --git a/core/src/msg_queue/tests/test_config.rs b/core/src/msg_queue/tests/test_config.rs new file mode 100644 index 000000000..f1944ae39 --- /dev/null +++ b/core/src/msg_queue/tests/test_config.rs @@ -0,0 +1,25 @@ +use super::MessageQueueConfig; + +pub fn init_test_config() -> MessageQueueConfig { + use super::super::cache_persistent::PersistentCacheConfigStubImpl; + use super::MessageQueueBaseConfig; + + MessageQueueConfig::new( + MessageQueueBaseConfig {}, + PersistentCacheConfigStubImpl { + cfg_value1: "test_value_1".to_owned(), + }, + ) +} + +#[test] +fn test_config_init() { + use super::super::cache_persistent::PersistentCacheConfigStubImpl; + + let cfg = init_test_config(); + + let p_cache_cfg = cfg.persistent_cache_config_ref().as_any(); + assert!(p_cache_cfg + .downcast_ref::() + .is_some()); +} diff --git a/core/src/msg_queue/tests/test_queue.rs b/core/src/msg_queue/tests/test_queue.rs new file mode 100644 index 000000000..f4b027fb8 --- /dev/null +++ b/core/src/msg_queue/tests/test_queue.rs @@ -0,0 +1,8 @@ +use super::{super::config::tests::init_test_config, MessageQueue, MessageQueueImplOnStubs}; + +#[test] +fn test_queue_init() { + let cfg = init_test_config(); + + let queue = MessageQueueImplOnStubs::init(cfg).unwrap(); +} diff --git a/core/src/msg_queue/types.rs b/core/src/msg_queue/types.rs new file mode 100644 index 000000000..f31097160 --- /dev/null +++ b/core/src/msg_queue/types.rs @@ -0,0 +1,57 @@ +pub type Lt = u64; +pub type MessageHash = UInt256; + +pub type SeqNo = u32; + +pub enum BlockIdent { + Id(BlockId), + ShardAndSeqNo(ShardIdent, SeqNo), +} + +#[derive(PartialEq, Eq, Clone)] +pub struct QueueDiffKey {} + +pub struct QueueDiff { + key: QueueDiffKey, +} +impl QueueDiff { + fn key(&self) -> &QueueDiffKey { + &self.key + } +} + +pub struct PersistentStateData {} + +// Actually, we may already have [MsgEnvelope] and [EnqueuedMsg] types. +// They little bit different from current declarations. Possibly we'll use existing, +// but own types may be more efficient +pub struct MessageEnvelope { + message: Message, + from_contract: Address, + to_contract: Address, +} +pub struct EnqueuedMessage { + created_lt: Lt, + enqueued_lt: Lt, + hash: MessageHash, + env: MessageEnvelope, +} + +// STUBS FOR EXTERNAL TYPES +// further we should use types crate + +pub(super) mod ext_types_stubs { + pub struct Message {} + pub type Address = String; + pub type ShardIdent = i64; + pub type UInt256 = String; + pub type BlockId = UInt256; + + pub struct BlockIdExt { + pub shard_id: ShardIdent, + pub seq_no: u32, + pub root_hash: UInt256, + pub file_hash: UInt256, + } +} +use ext_types_stubs::*; From 8002a6f4a94ee4823c26af5d0fcbacdd73e71644 Mon Sep 17 00:00:00 2001 From: Vitaly Terekhov Date: Thu, 11 Jan 2024 17:34:37 +0000 Subject: [PATCH 5/6] description for persistent state creation method --- core/src/msg_queue/queue.rs | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/core/src/msg_queue/queue.rs b/core/src/msg_queue/queue.rs index b97deed84..5334417d4 100644 --- a/core/src/msg_queue/queue.rs +++ b/core/src/msg_queue/queue.rs @@ -97,6 +97,20 @@ where /// Should execute it first when syncing queue from other nodes. fn apply_persistent_state(&mut self, p_state: PersistentStateData) -> Result<()>; + /// Shrinks diff history in storage, create and save persistent state. + /// Persistent state is identified by the block on which it was created. + /// + /// Actions: + /// 1. find diff (Dn) in storage by specified block + /// 2. get all uprocessed messages (Mu) from it and prev diffs (from [D0..Dn]) + /// 3. get top "processed upto" marks (Upto) from it and diffs (from [D0..Dn]) + /// 4. create and store persistent state data object with: + /// uprocessed messages (Mu) and top "processed upto" marks (Upto) + /// 5. remove diff (Dn) and all prevs ([D0..Dn]) from storage + /// + /// New persistent state replaces the previous one. + fn create_persistent_state(&mut self, block_ident: BlockIdent) -> Result; + /// Get queue state data on the specified block for syncing to another node: /// - persistent state if exists /// - all diffs upto specified block @@ -204,6 +218,10 @@ where todo!() } + fn create_persistent_state(&mut self, block_ident: BlockIdent) -> Result { + todo!() + } + fn get_sync_state_on_block( &self, block_ident: BlockIdent, From 85b86a221e5d9a8a0f3ec80877c177a8acf7ab26 Mon Sep 17 00:00:00 2001 From: Maksim Greshniakov Date: Wed, 1 May 2024 13:24:37 +0200 Subject: [PATCH 6/6] refactor(validator): fmt clippy --- collator/src/manager/collation_manager.rs | 8 +++ collator/src/manager/collation_processor.rs | 4 +- collator/src/validator/config.rs | 8 ++- collator/src/validator/network/dto.rs | 2 - collator/src/validator/network/handlers.rs | 2 +- .../src/validator/network/network_service.rs | 14 +--- collator/src/validator/state.rs | 41 ++--------- collator/src/validator/validator.rs | 69 +++++++++---------- collator/tests/validator_tests.rs | 43 ++++++------ core/src/msg_queue/cache_persistent_fs.rs | 1 + core/src/msg_queue/diff_mgmt.rs | 1 + core/src/msg_queue/state_persistent_fs.rs | 1 + core/src/msg_queue/storage_rocksdb.rs | 1 + 13 files changed, 81 insertions(+), 114 deletions(-) diff --git a/collator/src/manager/collation_manager.rs b/collator/src/manager/collation_manager.rs index 3126a8007..16d5bcd5f 100644 --- a/collator/src/manager/collation_manager.rs +++ b/collator/src/manager/collation_manager.rs @@ -1,4 +1,5 @@ use std::sync::Arc; +use std::time::Duration; use anyhow::Result; use async_trait::async_trait; @@ -8,6 +9,7 @@ use tycho_block_util::state::ShardStateStuff; use tycho_core::internal_queue::iterator::QueueIteratorImpl; +use crate::validator::config::ValidatorConfig; use crate::{ collator::{ collator_processor::CollatorProcessorStdImpl, Collator, CollatorEventListener, @@ -148,12 +150,18 @@ where let state_node_adapter = state_adapter_builder.build(dispatcher.clone()); let state_node_adapter = Arc::new(state_node_adapter); + let validator_config = ValidatorConfig { + base_loop_delay: Duration::from_millis(50), + max_loop_delay: Duration::from_secs(10), + }; + // create validator and start its tasks queue let validator = Validator::create( vec![dispatcher.clone()], state_node_adapter.clone(), node_network.into(), config.key_pair, + validator_config, ); // create collation processor that will use these adapters diff --git a/collator/src/manager/collation_processor.rs b/collator/src/manager/collation_processor.rs index ee4cd4b7b..af3ffbea4 100644 --- a/collator/src/manager/collation_processor.rs +++ b/collator/src/manager/collation_processor.rs @@ -5,9 +5,7 @@ use std::{ use anyhow::{anyhow, bail, Result}; -use everscale_types::models::{ - BlockId, BlockInfo, ShardIdent, ValidatorDescription, ValidatorSet, ValueFlow, -}; +use everscale_types::models::{BlockId, BlockInfo, ShardIdent, ValueFlow}; use tycho_block_util::{ block::ValidatorSubsetInfo, state::{MinRefMcStateTracker, ShardStateStuff}, diff --git a/collator/src/validator/config.rs b/collator/src/validator/config.rs index 678ce3664..29c749b34 100644 --- a/collator/src/validator/config.rs +++ b/collator/src/validator/config.rs @@ -1,4 +1,6 @@ -struct ValidatorConfig { - base_elapsed_time: u64, +use std::time::Duration; -} \ No newline at end of file +pub struct ValidatorConfig { + pub base_loop_delay: Duration, + pub max_loop_delay: Duration, +} diff --git a/collator/src/validator/network/dto.rs b/collator/src/validator/network/dto.rs index 1785fc8eb..71b192901 100644 --- a/collator/src/validator/network/dto.rs +++ b/collator/src/validator/network/dto.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use everscale_types::cell::HashBytes; use everscale_types::models::{BlockIdShort, Signature}; use tl_proto::{TlRead, TlWrite}; diff --git a/collator/src/validator/network/handlers.rs b/collator/src/validator/network/handlers.rs index 4f5158358..d352e5a72 100644 --- a/collator/src/validator/network/handlers.rs +++ b/collator/src/validator/network/handlers.rs @@ -10,7 +10,7 @@ pub async fn handle_signatures_query( session_seqno: u32, block_id_short: BlockIdShort, signatures: Vec<([u8; 32], [u8; 64])>, - listeners: Vec>, + listeners: &[Arc], ) -> Result, anyhow::Error> where { diff --git a/collator/src/validator/network/network_service.rs b/collator/src/validator/network/network_service.rs index 09f5cbea3..b912d2e95 100644 --- a/collator/src/validator/network/network_service.rs +++ b/collator/src/validator/network/network_service.rs @@ -1,5 +1,4 @@ use std::future::Future; -use std::marker::PhantomData; use std::pin::Pin; use std::sync::Arc; @@ -11,28 +10,21 @@ use tycho_network::{Response, Service, ServiceRequest}; use crate::validator::network::dto::SignaturesQuery; use crate::validator::network::handlers::handle_signatures_query; -use crate::validator::state::{SessionInfo, ValidationState, ValidationStateStdImpl}; +use crate::validator::state::{ValidationState, ValidationStateStdImpl}; use crate::validator::ValidatorEventListener; -use crate::{state_node::StateNodeAdapter, utils::async_queued_dispatcher::AsyncQueuedDispatcher}; #[derive(Clone)] pub struct NetworkService { listeners: Vec>, state: Arc, - session_seqno: u32, } impl NetworkService { pub fn new( listeners: Vec>, state: Arc, - session_seqno: u32, ) -> Self { - Self { - listeners, - state, - session_seqno, - } + Self { listeners, state } } } @@ -66,7 +58,7 @@ impl Service for NetworkService { session_seqno, block_id_short, signatures, - listeners, + &listeners, ) .await { diff --git a/collator/src/validator/state.rs b/collator/src/validator/state.rs index a1d449492..272a0c36a 100644 --- a/collator/src/validator/state.rs +++ b/collator/src/validator/state.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use anyhow::{bail, Context}; -use async_trait::async_trait; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, BlockIdShort, Signature}; use tokio::sync::{Mutex, RwLock}; @@ -129,10 +128,6 @@ impl SessionInfo { .map(|ref_data| ref_data.0) } - pub(crate) async fn blocks_count(&self) -> usize { - self.blocks_signatures.len() - } - /// Determines the validation status of a block. pub async fn get_validation_status( &self, @@ -202,13 +197,8 @@ impl SessionInfo { &self, block_id_short: &BlockIdShort, ) -> FastHashMap { - let cached_signatures = self.cached_signatures.len(); - let normal_signatures = self.blocks_signatures.len(); - let block_signatures = self.blocks_signatures.get(block_id_short); - let valid_signatures = block_signatures.map(|ref_data| ref_data.1.valid_signatures.clone()); let block_signatures = self.blocks_signatures.get(block_id_short); - let invalid_signatures = - block_signatures.map(|ref_data| ref_data.1.invalid_signatures.clone()); + block_signatures.map(|ref_data| ref_data.1.invalid_signatures.clone()); if let Some(ref_data) = self.blocks_signatures.get(block_id_short) { ref_data.1.valid_signatures.clone() @@ -248,33 +238,11 @@ impl SessionInfo { } } - /// Determines if a block is considered invalid based on the signatures. - fn is_invalid(&self, signature_maps: &SignatureMaps, valid_weight: u64) -> bool { - let total_invalid_weight: u64 = signature_maps - .invalid_signatures - .keys() - .map(|validator_id| { - self.validation_session_info - .validators - .get(validator_id) - .map_or(0, |vi| vi.weight) - }) - .sum(); - - let total_possible_weight = self - .validation_session_info - .validators - .values() - .map(|vi| vi.weight) - .sum::(); - total_possible_weight - total_invalid_weight < valid_weight - } - pub async fn process_signatures_and_update_status( &self, block_id_short: BlockIdShort, signatures: Vec<([u8; 32], [u8; 64])>, - listeners: Vec>, + listeners: &[Arc], ) -> anyhow::Result<()> { trace!( "Processing signatures for block in state {:?}", @@ -294,7 +262,7 @@ impl SessionInfo { ) }); - let mut event_guard = entry.1.event_dispatched.lock().await; + let event_guard = entry.1.event_dispatched.lock().await; if *event_guard { debug!( "Validation event already dispatched for block {:?}", @@ -391,10 +359,11 @@ impl SessionInfo { fn notify_listeners( block: BlockId, event: OnValidatedBlockEvent, - listeners: Vec>, + listeners: &[Arc], ) { for listener in listeners { let cloned_event = event.clone(); + let listener = listener.clone(); tokio::spawn(async move { listener .on_block_validated(block, cloned_event) diff --git a/collator/src/validator/validator.rs b/collator/src/validator/validator.rs index 5dc9eefb0..939eb1be1 100644 --- a/collator/src/validator/validator.rs +++ b/collator/src/validator/validator.rs @@ -1,35 +1,24 @@ -use std::mem::take; use std::sync::Arc; use std::time::Duration; -use anyhow::{anyhow, bail, Context, Result}; +use anyhow::{bail, Context, Result}; use async_trait::async_trait; use everscale_crypto::ed25519::KeyPair; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, BlockIdShort, Signature}; -use futures_util::future::join_all; -use tokio::select; -use tokio::task::{JoinError, JoinHandle}; +use tokio::task::JoinHandle; use tracing::{debug, trace, warn}; use tycho_network::{OverlayId, PeerId, PrivateOverlay, Request}; -use tycho_util::FastHashMap; -use crate::state_node::StateNodeAdapterStdImpl; -use crate::types::{BlockSignatures, OnValidatedBlockEvent, ValidatorNetwork}; +use crate::types::{OnValidatedBlockEvent, ValidatorNetwork}; +use crate::validator::config::ValidatorConfig; use crate::validator::network::dto::SignaturesQuery; use crate::validator::network::network_service::NetworkService; use crate::validator::state::{SessionInfo, ValidationState, ValidationStateStdImpl}; use crate::validator::types::{ BlockValidationCandidate, OverlayNumber, ValidationResult, ValidationSessionInfo, ValidatorInfo, }; -use crate::{ - method_to_async_task_closure, - state_node::StateNodeAdapter, - tracing_targets, - utils::async_queued_dispatcher::{ - AsyncQueuedDispatcher, STANDARD_DISPATCHER_QUEUE_BUFFER_SIZE, - }, -}; +use crate::{state_node::StateNodeAdapter, tracing_targets}; #[async_trait] pub trait ValidatorEventEmitter { @@ -61,6 +50,7 @@ where state_node_adapter: Arc, network: ValidatorNetwork, keypair: KeyPair, + config: ValidatorConfig, ) -> Self; /// Enqueue block candidate validation task @@ -78,11 +68,11 @@ where { _marker_state_node_adapter: std::marker::PhantomData, validation_state: Arc, - validation_semaphore: Arc, listeners: Vec>, network: ValidatorNetwork, state_node_adapter: Arc, keypair: KeyPair, + config: ValidatorConfig, } #[async_trait] @@ -95,6 +85,7 @@ where state_node_adapter: Arc, network: ValidatorNetwork, keypair: KeyPair, + config: ValidatorConfig, ) -> Self { tracing::info!(target: tracing_targets::VALIDATOR, "Creating validator..."); @@ -102,12 +93,12 @@ where Self { _marker_state_node_adapter: std::marker::PhantomData, - validation_semaphore: Arc::new(tokio::sync::Semaphore::new(1)), validation_state, listeners, network, state_node_adapter, keypair, + config, } } @@ -125,9 +116,10 @@ where candidate, session, &self.keypair, - self.listeners.clone(), - self.network.clone(), - self.state_node_adapter.clone(), + &self.listeners, + &self.network, + &self.state_node_adapter, + &self.config, ) .await?; Ok(()) @@ -157,10 +149,8 @@ where trace!(target: tracing_targets::VALIDATOR, overlay_id = ?validators_session_info.seqno, "Creating private overlay"); let overlay_id = OverlayId(tl_proto::hash(overlay_id)); - let seqno = validators_session_info.seqno; - let network_service = - NetworkService::new(self.listeners.clone(), self.validation_state.clone(), seqno); + NetworkService::new(self.listeners.clone(), self.validation_state.clone()); let private_overlay = PrivateOverlay::builder(overlay_id) .with_peer_resolver(peer_resolver) @@ -208,11 +198,11 @@ async fn start_candidate_validation( block_id: BlockId, session: Arc, current_validator_keypair: &KeyPair, - listeners: Vec>, - network: ValidatorNetwork, - state_node_adapter: Arc, + listeners: &[Arc], + network: &ValidatorNetwork, + state_node_adapter: &Arc, + config: &ValidatorConfig, ) -> Result<()> { - let base_delay_ms = 100; let cancellation_token = tokio_util::sync::CancellationToken::new(); let short_id = block_id.as_short_id(); let our_signature = sign_block(current_validator_keypair, &block_id)?; @@ -232,7 +222,7 @@ async fn start_candidate_validation( session.clone(), short_id, vec![(current_validator_pubkey.0, our_signature.0)], - listeners.clone(), + listeners, ) .await?; trace!(target: tracing_targets::VALIDATOR, "Validation finished: {:?}", is_validation_finished); @@ -268,6 +258,9 @@ async fn start_candidate_validation( let mut handlers: Vec>> = Vec::new(); + let delay = config.base_loop_delay; + let max_delay = config.max_loop_delay; + let listeners = listeners.to_vec(); for validator in filtered_validators { let cloned_private_overlay = session.get_overlay().clone(); let cloned_network = network.dht_client.network().clone(); @@ -326,7 +319,7 @@ async fn start_candidate_validation( cloned_session.clone(), short_id, signatures.signatures, - cloned_listeners.clone(), + &cloned_listeners, ) .await?; @@ -338,19 +331,21 @@ async fn start_candidate_validation( } } Err(e) => { - warn!(target: tracing_targets::VALIDATOR, "Error receiving signatures from validator {:?}: {:?}", validator.public_key.to_bytes(), e); - let delay = base_delay_ms * 2_u64.pow(attempt); - tokio::time::sleep(Duration::from_millis(delay)).await; + warn!(target: tracing_targets::VALIDATOR, "Elapsed validator response {:?}: {:?}", validator.public_key.to_bytes(), e); + let delay = delay * 2_u32.pow(attempt); + let delay = std::cmp::min(delay, max_delay); + tokio::time::sleep(delay).await; attempt += 1; } Ok(Err(e)) => { warn!(target: tracing_targets::VALIDATOR, "Error receiving signatures from validator {:?}: {:?}", validator.public_key.to_bytes(), e); - let delay = base_delay_ms * 2_u64.pow(attempt); - tokio::time::sleep(Duration::from_millis(delay)).await; + let delay = delay * 2_u32.pow(attempt); + let delay = std::cmp::min(delay, max_delay); + tokio::time::sleep(delay).await; attempt += 1; } } - tokio::time::sleep(Duration::from_millis(base_delay_ms)).await; + tokio::time::sleep(delay).await; } }); @@ -369,7 +364,7 @@ pub async fn process_candidate_signature_response( session: Arc, block_id_short: BlockIdShort, signatures: Vec<([u8; 32], [u8; 64])>, - listeners: Vec>, + listeners: &[Arc], ) -> Result { trace!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Processing candidate signature response"); let validation_status = session.get_validation_status(&block_id_short).await?; diff --git a/collator/tests/validator_tests.rs b/collator/tests/validator_tests.rs index 7f250cc0e..2369bd168 100644 --- a/collator/tests/validator_tests.rs +++ b/collator/tests/validator_tests.rs @@ -1,41 +1,37 @@ -use std::collections::HashMap; use std::net::Ipv4Addr; -use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; - -use async_trait::async_trait; -use bytesize::ByteSize; use std::time::Duration; use anyhow::Result; +use async_trait::async_trait; use everscale_crypto::ed25519; use everscale_crypto::ed25519::KeyPair; use everscale_types::models::{BlockId, ValidatorDescription}; use rand::prelude::ThreadRng; use tokio::sync::{Mutex, Notify}; use tokio::time::sleep; - -use tracing::{debug, error}; +use tracing::debug; use tycho_block_util::block::ValidatorSubsetInfo; use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_collator::state_node::{ - StateNodeAdapter, StateNodeAdapterBuilder, StateNodeAdapterBuilderStdImpl, - StateNodeAdapterStdImpl, StateNodeEventListener, + StateNodeAdapterBuilder, StateNodeAdapterBuilderStdImpl, StateNodeAdapterStdImpl, + StateNodeEventListener, }; use tycho_collator::test_utils::{prepare_test_storage, try_init_test_tracing}; use tycho_collator::types::{CollationSessionInfo, OnValidatedBlockEvent, ValidatorNetwork}; +use tycho_collator::validator::config::ValidatorConfig; use tycho_collator::validator::state::{ValidationState, ValidationStateStdImpl}; use tycho_collator::validator::types::ValidationSessionInfo; use tycho_collator::validator::validator::{Validator, ValidatorEventListener, ValidatorStdImpl}; use tycho_core::block_strider::state::BlockStriderState; use tycho_core::block_strider::subscriber::test::PrintSubscriber; -use tycho_core::block_strider::{prepare_state_apply, BlockStrider}; +use tycho_core::block_strider::BlockStrider; use tycho_network::{ DhtClient, DhtConfig, DhtService, Network, OverlayService, PeerId, PeerResolver, Router, }; -use tycho_storage::{build_tmp_storage, Db, DbOptions, Storage}; +use tycho_storage::build_tmp_storage; pub struct TestValidatorEventListener { validated_blocks: Mutex>, @@ -59,11 +55,6 @@ impl TestValidatorEventListener { pub async fn increment_and_check(&self) { let mut received = self.received_notifications.lock().await; *received += 1; - error!( - "received: {}, expected: {}", - *received, - *self.expected_notifications.lock().await - ); if *received == *self.expected_notifications.lock().await { self.notify.notify_one(); } @@ -80,10 +71,11 @@ impl ValidatorEventListener for TestValidatorEventListener { let mut validated_blocks = self.validated_blocks.lock().await; if validated_blocks.contains(&block_id) { return Ok(()); + } else { + validated_blocks.push(block_id); } - let current_count = self.global_validated_blocks.fetch_add(1, Ordering::SeqCst); - debug!("Block validated, new global count: {}", current_count); + self.global_validated_blocks.fetch_add(1, Ordering::SeqCst); self.increment_and_check().await; Ok(()) @@ -92,7 +84,7 @@ impl ValidatorEventListener for TestValidatorEventListener { #[async_trait] impl StateNodeEventListener for TestValidatorEventListener { - async fn on_block_accepted(&self, block_id: &BlockId) -> Result<()> { + async fn on_block_accepted(&self, _block_id: &BlockId) -> Result<()> { unimplemented!("Not implemented"); } @@ -232,6 +224,10 @@ async fn test_validator_accept_block_by_state() -> anyhow::Result<()> { state_node_adapter, validator_network, KeyPair::generate(&mut ThreadRng::default()), + ValidatorConfig { + base_loop_delay: Duration::from_millis(50), + max_loop_delay: Duration::from_secs(10), + }, ); let validator_description = ValidatorDescription { @@ -343,12 +339,17 @@ async fn test_validator_accept_block_by_network() -> Result<()> { dht_client: node.dht_client.clone(), peer_resolver: node.peer_resolver.clone(), }; + let validator_config = ValidatorConfig { + base_loop_delay: Duration::from_millis(50), + max_loop_delay: Duration::from_secs(10), + }; let validator = Arc::new(ValidatorStdImpl::<_>::create( vec![test_listener.clone()], state_node_adapter, network, node.keypair.clone(), + validator_config, )); let semaphore = Arc::new(tokio::sync::Semaphore::new(max_concurrent_blocks)); @@ -367,7 +368,7 @@ async fn test_validator_accept_block_by_network() -> Result<()> { // Await all validator tasks to complete for task in tasks { - task.await?; + task.await.unwrap().unwrap(); } // Assert that all validations are completed as expected @@ -395,7 +396,7 @@ async fn handle_validator( let collator_session_info = Arc::new(CollationSessionInfo::new( session, validators_subset_info.clone(), - Some(validator.get_keypair().clone()), // Assuming you have access to node's keypair here + Some(*validator.get_keypair()), // Assuming you have access to node's keypair here )); validator diff --git a/core/src/msg_queue/cache_persistent_fs.rs b/core/src/msg_queue/cache_persistent_fs.rs index e69de29bb..8b1378917 100644 --- a/core/src/msg_queue/cache_persistent_fs.rs +++ b/core/src/msg_queue/cache_persistent_fs.rs @@ -0,0 +1 @@ + diff --git a/core/src/msg_queue/diff_mgmt.rs b/core/src/msg_queue/diff_mgmt.rs index e69de29bb..8b1378917 100644 --- a/core/src/msg_queue/diff_mgmt.rs +++ b/core/src/msg_queue/diff_mgmt.rs @@ -0,0 +1 @@ + diff --git a/core/src/msg_queue/state_persistent_fs.rs b/core/src/msg_queue/state_persistent_fs.rs index e69de29bb..8b1378917 100644 --- a/core/src/msg_queue/state_persistent_fs.rs +++ b/core/src/msg_queue/state_persistent_fs.rs @@ -0,0 +1 @@ + diff --git a/core/src/msg_queue/storage_rocksdb.rs b/core/src/msg_queue/storage_rocksdb.rs index e69de29bb..8b1378917 100644 --- a/core/src/msg_queue/storage_rocksdb.rs +++ b/core/src/msg_queue/storage_rocksdb.rs @@ -0,0 +1 @@ +