From 4110af22306768bc62dc0ca60a47052fa16b8201 Mon Sep 17 00:00:00 2001 From: Maksim Greshnyakov Date: Wed, 24 Apr 2024 13:03:12 +0500 Subject: [PATCH] refactor(validator): network request timeout --- Cargo.lock | 1 + collator/Cargo.toml | 2 + collator/src/manager/types.rs | 6 +- collator/src/manager/utils.rs | 2 +- collator/src/mempool/mempool_adapter.rs | 2 +- collator/src/test_utils.rs | 107 ++++++++++ collator/src/types.rs | 5 +- collator/src/utils/async_queued_dispatcher.rs | 8 +- collator/src/validator/network/dto.rs | 3 +- collator/src/validator/state.rs | 26 +-- collator/src/validator/test_impl.rs | 3 +- collator/src/validator/types.rs | 2 +- collator/src/validator/validator_processor.rs | 191 ++++++++++++------ collator/tests/collation_tests.rs | 98 +-------- collator/tests/validator_tests.rs | 115 ++++++----- 15 files changed, 336 insertions(+), 235 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f02fcbabf..19282b647 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2102,6 +2102,7 @@ dependencies = [ "everscale-crypto", "everscale-types", "futures-util", + "log", "rand", "sha2", "tempfile", diff --git a/collator/Cargo.toml b/collator/Cargo.toml index 0526fcdf4..c69dd6fc2 100644 --- a/collator/Cargo.toml +++ b/collator/Cargo.toml @@ -32,12 +32,14 @@ tycho-network = { workspace = true } tycho-storage = { workspace = true } tycho-util = { workspace = true } tycho-block-util = { workspace = true } +log = "0.4.21" [dev-dependencies] tempfile = { workspace = true } tracing-test = { workspace = true } tycho-core = { workspace = true, features = ["test"] } tycho-storage = { workspace = true, features = ["test"] } +tycho-util = { workspace = true, features = ["test"] } [features] test = [] diff --git a/collator/src/manager/types.rs b/collator/src/manager/types.rs index 666e7aff2..ad3d8ea42 100644 --- a/collator/src/manager/types.rs +++ b/collator/src/manager/types.rs @@ -7,6 +7,8 @@ use everscale_types::{ models::{Block, BlockId, BlockIdShort, ShardIdent, Signature}, }; +use tycho_util::FastHashMap; + use crate::types::BlockCandidate; pub(super) type BlockCacheKey = BlockIdShort; @@ -20,7 +22,7 @@ pub(super) struct BlocksCache { pub struct BlockCandidateEntry { pub key: BlockCacheKey, pub candidate: BlockCandidate, - pub signatures: HashMap, + pub signatures: FastHashMap, } pub enum SendSyncStatus { @@ -105,7 +107,7 @@ impl BlockCandidateContainer { &mut self, is_valid: bool, already_synced: bool, - signatures: HashMap, + signatures: FastHashMap, ) { if let Some(ref mut entry) = self.entry { entry.signatures = signatures; diff --git a/collator/src/manager/utils.rs b/collator/src/manager/utils.rs index 343336d68..4c285c389 100644 --- a/collator/src/manager/utils.rs +++ b/collator/src/manager/utils.rs @@ -1,7 +1,7 @@ use anyhow::Result; use everscale_crypto::ed25519::PublicKey; use everscale_types::boc::BocRepr; -use everscale_types::models::{Block, ValidatorDescription}; +use everscale_types::models::ValidatorDescription; use tycho_block_util::block::{BlockStuff, BlockStuffAug}; use crate::types::{BlockStuffForSync, CollationConfig}; diff --git a/collator/src/mempool/mempool_adapter.rs b/collator/src/mempool/mempool_adapter.rs index b74538294..f40e8a9b7 100644 --- a/collator/src/mempool/mempool_adapter.rs +++ b/collator/src/mempool/mempool_adapter.rs @@ -8,7 +8,7 @@ use async_trait::async_trait; use everscale_types::{ cell::{CellBuilder, CellSliceRange, HashBytes}, - models::{account, ExtInMsgInfo, IntAddr, MsgInfo, OwnedMessage, StdAddr}, + models::{ExtInMsgInfo, IntAddr, MsgInfo, OwnedMessage, StdAddr}, }; use rand::Rng; use tycho_block_util::state::ShardStateStuff; diff --git a/collator/src/test_utils.rs b/collator/src/test_utils.rs index fd567f7b4..44ec18e32 100644 --- a/collator/src/test_utils.rs +++ b/collator/src/test_utils.rs @@ -1,9 +1,19 @@ use std::net::Ipv4Addr; +use std::sync::Arc; use std::time::Duration; use everscale_crypto::ed25519; +use everscale_types::boc::Boc; +use everscale_types::cell::HashBytes; +use everscale_types::models::{BlockId, ShardStateUnsplit}; +use futures_util::future::BoxFuture; +use futures_util::FutureExt; +use sha2::Digest; +use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; +use tycho_core::block_strider::provider::{BlockProvider, OptionalBlockStuff}; use tycho_network::{DhtConfig, DhtService, Network, OverlayService, PeerId, Router}; +use tycho_storage::{BlockMetaData, Db, DbOptions, Storage}; use crate::types::NodeNetwork; @@ -58,3 +68,100 @@ pub fn create_node_network() -> NodeNetwork { dht_client, } } + +pub async fn prepare_test_storage() -> anyhow::Result<(DummyArchiveProvider, Arc)> { + let provider = DummyArchiveProvider; + let temp = tempfile::tempdir().unwrap(); + let db = Db::open(temp.path().to_path_buf(), DbOptions::default()).unwrap(); + let storage = Storage::new(db, temp.path().join("file"), 1_000_000).unwrap(); + let tracker = MinRefMcStateTracker::default(); + + // master state + let master_bytes = include_bytes!("../src/state_node/tests/data/test_state_2_master.boc"); + let master_file_hash: HashBytes = sha2::Sha256::digest(master_bytes).into(); + let master_root = Boc::decode(master_bytes)?; + let master_root_hash = *master_root.repr_hash(); + let master_state = master_root.parse::()?; + + let mc_state_extra = master_state.load_custom()?; + let mc_state_extra = mc_state_extra.unwrap(); + let mut shard_info_opt = None; + for shard_info in mc_state_extra.shards.iter() { + shard_info_opt = Some(shard_info?); + break; + } + let shard_info = shard_info_opt.unwrap(); + + let master_id = BlockId { + shard: master_state.shard_ident, + seqno: master_state.seqno, + root_hash: master_root_hash, + file_hash: master_file_hash, + }; + let master_state_stuff = + ShardStateStuff::from_state_and_root(master_id, master_state, master_root, &tracker)?; + + let (handle, _) = storage.block_handle_storage().create_or_load_handle( + &master_id, + BlockMetaData { + is_key_block: mc_state_extra.after_key_block, + gen_utime: master_state_stuff.state().gen_utime, + mc_ref_seqno: Some(0), + }, + )?; + + storage + .shard_state_storage() + .store_state(&handle, &master_state_stuff) + .await?; + + // shard state + let shard_bytes = include_bytes!("../src/state_node/tests/data/test_state_2_0:80.boc"); + let shard_file_hash: HashBytes = sha2::Sha256::digest(shard_bytes).into(); + let shard_root = Boc::decode(shard_bytes)?; + let shard_root_hash = *shard_root.repr_hash(); + let shard_state = shard_root.parse::()?; + let shard_id = BlockId { + shard: shard_info.0, + seqno: shard_info.1.seqno, + root_hash: shard_info.1.root_hash, + file_hash: shard_info.1.file_hash, + }; + let shard_state_stuff = + ShardStateStuff::from_state_and_root(shard_id, shard_state, shard_root, &tracker)?; + + let (handle, _) = storage.block_handle_storage().create_or_load_handle( + &shard_id, + BlockMetaData { + is_key_block: false, + gen_utime: shard_state_stuff.state().gen_utime, + mc_ref_seqno: Some(0), + }, + )?; + + storage + .shard_state_storage() + .store_state(&handle, &shard_state_stuff) + .await?; + + storage + .node_state() + .store_last_mc_block_id(&master_id) + .unwrap(); + + Ok((provider, storage)) +} + +pub struct DummyArchiveProvider; +impl BlockProvider for DummyArchiveProvider { + type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; + type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; + + fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { + futures_util::future::ready(None).boxed() + } + + fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { + futures_util::future::ready(None).boxed() + } +} diff --git a/collator/src/types.rs b/collator/src/types.rs index 2069642cf..9a0c028ea 100644 --- a/collator/src/types.rs +++ b/collator/src/types.rs @@ -11,6 +11,7 @@ use everscale_types::models::{ use tycho_block_util::block::{BlockStuffAug, ValidatorSubsetInfo}; use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_network::{DhtClient, OverlayService, PeerResolver}; +use tycho_util::FastHashMap; pub struct CollationConfig { pub key_pair: KeyPair, @@ -121,7 +122,7 @@ impl OnValidatedBlockEvent { #[derive(Default, Clone)] pub struct BlockSignatures { - pub signatures: HashMap, + pub signatures: FastHashMap, } pub struct ValidatedBlock { @@ -160,7 +161,7 @@ pub struct BlockStuffForSync { //TODO: remove `block_id` and make `block_stuff: BlockStuff` when collator will generate real blocks pub block_id: BlockId, pub block_stuff_aug: BlockStuffAug, - pub signatures: HashMap, + pub signatures: FastHashMap, pub prev_blocks_ids: Vec, pub top_shard_blocks_ids: Vec, } diff --git a/collator/src/utils/async_queued_dispatcher.rs b/collator/src/utils/async_queued_dispatcher.rs index 81e93c606..588ede355 100644 --- a/collator/src/utils/async_queued_dispatcher.rs +++ b/collator/src/utils/async_queued_dispatcher.rs @@ -1,6 +1,7 @@ -use std::{future::Future, pin::Pin}; +use std::{future::Future, pin::Pin, usize}; use anyhow::{anyhow, Result}; +use log::trace; use tokio::sync::{mpsc, oneshot}; use crate::tracing_targets; @@ -33,6 +34,11 @@ where pub fn run(mut worker: W, mut receiver: mpsc::Receiver>) { tokio::spawn(async move { while let Some(task) = receiver.recv().await { + trace!( + target: tracing_targets::ASYNC_QUEUE_DISPATCHER, + "Task #{} ({}): received", + task.id(), + task.get_descr()); let (task_id, task_descr) = (task.id(), task.get_descr()); let (func, responder) = task.extract(); tracing::trace!( diff --git a/collator/src/validator/network/dto.rs b/collator/src/validator/network/dto.rs index f8d0bcbcb..1785fc8eb 100644 --- a/collator/src/validator/network/dto.rs +++ b/collator/src/validator/network/dto.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockIdShort, Signature}; use tl_proto::{TlRead, TlWrite}; +use tycho_util::FastHashMap; #[derive(Debug, Clone, TlRead, TlWrite)] #[tl(boxed, id = 0x11112222)] @@ -17,7 +18,7 @@ impl SignaturesQuery { pub(crate) fn create( session_seqno: u32, block_header: BlockIdShort, - current_signatures: &HashMap, + current_signatures: &FastHashMap, ) -> Self { let signatures = current_signatures.iter().map(|(k, v)| (k.0, v.0)).collect(); Self { diff --git a/collator/src/validator/state.rs b/collator/src/validator/state.rs index 99e37c9ab..f9df9b273 100644 --- a/collator/src/validator/state.rs +++ b/collator/src/validator/state.rs @@ -5,15 +5,15 @@ use anyhow::{bail, Context}; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, BlockIdShort, Signature}; -use tycho_network::PrivateOverlay; - use crate::validator::types::{ BlockValidationCandidate, ValidationResult, ValidationSessionInfo, ValidatorInfo, }; +use tycho_network::PrivateOverlay; +use tycho_util::FastHashMap; struct SignatureMaps { - valid_signatures: HashMap, - invalid_signatures: HashMap, + valid_signatures: FastHashMap, + invalid_signatures: FastHashMap, } /// Represents the state of validation for blocks and sessions. @@ -35,8 +35,8 @@ pub trait ValidationState: Send + Sync + 'static { pub struct SessionInfo { session_id: u32, max_weight: u64, - blocks_signatures: HashMap, - cached_signatures: HashMap>, + blocks_signatures: FastHashMap, + cached_signatures: FastHashMap>, validation_session_info: Arc, private_overlay: PrivateOverlay, } @@ -108,6 +108,7 @@ impl SessionInfo { /// Determines the validation status of a block. pub fn validation_status(&self, block_id_short: &BlockIdShort) -> ValidationResult { + let valid_weight = self.max_weight * 2 / 3 + 1; if let Some((_, signature_maps)) = self.blocks_signatures.get(block_id_short) { let total_valid_weight: u64 = signature_maps .valid_signatures @@ -120,16 +121,15 @@ impl SessionInfo { }) .sum(); - let valid_weight = self.max_weight * 2 / 3 + 1; if total_valid_weight >= valid_weight { ValidationResult::Valid } else if self.is_invalid(signature_maps, valid_weight) { ValidationResult::Invalid } else { - ValidationResult::Insufficient + ValidationResult::Insufficient(total_valid_weight, valid_weight) } } else { - ValidationResult::Insufficient + ValidationResult::Insufficient(0, valid_weight) } } /// Lists validators without signatures for a given block. @@ -187,11 +187,11 @@ impl SessionInfo { pub fn get_valid_signatures( &self, block_id_short: &BlockIdShort, - ) -> HashMap { + ) -> FastHashMap { if let Some((_, signature_maps)) = self.blocks_signatures.get(block_id_short) { signature_maps.valid_signatures.clone() } else { - HashMap::new() + FastHashMap::default() } } @@ -211,8 +211,8 @@ impl SessionInfo { ( *block_id, SignatureMaps { - valid_signatures: HashMap::new(), - invalid_signatures: HashMap::new(), + valid_signatures: FastHashMap::default(), + invalid_signatures: FastHashMap::default(), }, ) }); diff --git a/collator/src/validator/test_impl.rs b/collator/src/validator/test_impl.rs index 47f0e694d..e7c3607a2 100644 --- a/collator/src/validator/test_impl.rs +++ b/collator/src/validator/test_impl.rs @@ -7,6 +7,7 @@ use everscale_crypto::ed25519::{KeyPair, PublicKey}; use everscale_types::models::{BlockId, BlockIdShort, Signature}; use tycho_block_util::state::ShardStateStuff; +use tycho_util::FastHashMap; use crate::tracing_targets; use crate::types::{BlockSignatures, OnValidatedBlockEvent, ValidatorNetwork}; @@ -75,7 +76,7 @@ where _session_seqno: u32, current_validator_keypair: KeyPair, ) -> Result { - let mut signatures = HashMap::new(); + let mut signatures = FastHashMap::default(); signatures.insert( current_validator_keypair.public_key.to_bytes().into(), Signature::default(), diff --git a/collator/src/validator/types.rs b/collator/src/validator/types.rs index 6cd3c0010..75ad70ca9 100644 --- a/collator/src/validator/types.rs +++ b/collator/src/validator/types.rs @@ -106,5 +106,5 @@ pub(crate) struct OverlayNumber { pub enum ValidationResult { Valid, Invalid, - Insufficient, + Insufficient(u64, u64), } diff --git a/collator/src/validator/validator_processor.rs b/collator/src/validator/validator_processor.rs index a6503bac5..05f43e8ad 100644 --- a/collator/src/validator/validator_processor.rs +++ b/collator/src/validator/validator_processor.rs @@ -1,4 +1,3 @@ -use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; @@ -8,13 +7,13 @@ use everscale_crypto::ed25519::KeyPair; use everscale_types::cell::HashBytes; use everscale_types::models::{BlockId, BlockIdShort, Signature}; use tokio::sync::broadcast; -use tokio::time::interval; use tracing::warn; -use tracing::{debug, error, trace}; +use tracing::{debug, trace}; use crate::types::{BlockSignatures, OnValidatedBlockEvent, ValidatorNetwork}; use tycho_block_util::state::ShardStateStuff; use tycho_network::{OverlayId, PeerId, PrivateOverlay, Request}; +use tycho_util::FastHashMap; use crate::validator::network::dto::SignaturesQuery; use crate::validator::network::network_service::NetworkService; @@ -29,13 +28,15 @@ use crate::{ use super::{ValidatorEventEmitter, ValidatorEventListener}; -const MAX_VALIDATION_ATTEMPTS: u32 = 1000; -const VALIDATION_RETRY_TIMEOUT_SEC: u64 = 3; +const NETWORK_TIMEOUT: Duration = Duration::from_millis(1000); +const INITIAL_BACKOFF: Duration = Duration::from_millis(100); +const MAX_BACKOFF: Duration = Duration::from_secs(10); +const BACKOFF_FACTOR: u32 = 2; // Factor by which the timeout will increase #[derive(PartialEq, Debug)] pub enum ValidatorTaskResult { Void, - Signatures(HashMap), + Signatures(FastHashMap), ValidationStatus(ValidationResult), } @@ -93,7 +94,6 @@ where ) -> Result { self.on_block_validated_event(candidate_id, OnValidatedBlockEvent::ValidByState) .await?; - println!("VALIDATED BY STATE"); Ok(ValidatorTaskResult::Void) } async fn get_block_signatures( @@ -172,6 +172,7 @@ where &mut self, session: Arc, ) -> Result { + trace!(target: tracing_targets::VALIDATOR, "Trying to add session seqno {:?}", session.seqno); if self.validation_state.get_session(session.seqno).is_none() { let (peer_resolver, local_peer_id) = { let network = self.network.clone(); @@ -184,6 +185,7 @@ where let overlay_id = OverlayNumber { session_seqno: session.seqno, }; + trace!(target: tracing_targets::VALIDATOR, overlay_id = ?session.seqno, "Creating private overlay"); let overlay_id = OverlayId(tl_proto::hash(overlay_id)); let network_service = NetworkService::new(self.get_dispatcher().clone()); @@ -197,7 +199,7 @@ where .add_private_overlay(&private_overlay); if !overlay_added { - bail!("Failed to add private overlay"); + panic!("Failed to add private overlay"); } self.validation_state @@ -210,8 +212,10 @@ where continue; } entries.insert(&PeerId(validator.public_key.to_bytes())); + trace!(target: tracing_targets::VALIDATOR, validator_pubkey = ?validator.public_key.as_bytes(), "Added validator to overlay"); } } + trace!(target: tracing_targets::VALIDATOR, "Session seqno {:?} added", session.seqno); Ok(ValidatorTaskResult::Void) } @@ -223,6 +227,7 @@ where current_validator_keypair: KeyPair, ) -> Result { let mut stop_receiver = self.stop_sender.subscribe(); + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Starting candidate validation"); // Simplify session retrieval with clear, concise error handling. let session = self @@ -230,36 +235,67 @@ where .get_mut_session(session_seqno) .ok_or_else(|| anyhow!("Failed to start candidate validation. Session not found"))?; + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Signing block"); + let our_signature = sign_block(¤t_validator_keypair, &candidate_id)?; let current_validator_signature = HashBytes(current_validator_keypair.public_key.to_bytes()); + + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Adding block to session"); session.add_block(candidate_id)?; + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Adding our signature to session"); + let enqueue_task_result = self - .dispatcher - .enqueue_task(method_to_async_task_closure!( - process_candidate_signature_response, + .process_candidate_signature_response( session_seqno, candidate_id.as_short_id(), - vec![(current_validator_signature.0, our_signature.0)] - )) + vec![(current_validator_signature.0, our_signature.0)], + ) .await; + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Enqueued task for processing signatures response"); if let Err(e) = enqueue_task_result { + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Failed to enqueue task for processing signatures response {e:?}"); bail!("Failed to enqueue task for processing signatures response {e:?}"); } + let session = self + .validation_state + .get_session(session_seqno) + .ok_or_else(|| anyhow!("Failed to start candidate validation. Session not found"))?; + + let validation_status = session.validation_status(&candidate_id.as_short_id()); + + if validation_status == ValidationResult::Valid + || validation_status == ValidationResult::Invalid + { + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Validation status is already set for block {:?}", candidate_id); + return Ok(ValidatorTaskResult::Void); + } + let dispatcher = self.get_dispatcher().clone(); let current_validator_pubkey = current_validator_keypair.public_key; + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Starting validation loop"); tokio::spawn(async move { - let mut retry_interval = interval(Duration::from_secs(VALIDATION_RETRY_TIMEOUT_SEC)); - let max_retries = MAX_VALIDATION_ATTEMPTS; - let mut attempts = 0; + let mut iteration = 0; + loop { + let interval_duration = if iteration == 0 { + Duration::from_millis(0) + } else { + let exponential_backoff = INITIAL_BACKOFF * BACKOFF_FACTOR.pow(iteration - 1); + let calculated_duration = exponential_backoff + NETWORK_TIMEOUT; + + if calculated_duration > MAX_BACKOFF { + MAX_BACKOFF + } else { + calculated_duration + } + }; + + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, interval = ?interval_duration, "Waiting for next validation attempt"); - while attempts < max_retries { - trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Attempt to validate block"); - attempts += 1; let dispatcher_clone = dispatcher.clone(); let cloned_candidate = candidate_id; @@ -270,7 +306,8 @@ where break; } }, - _ = retry_interval.tick() => { + _ = tokio::time::sleep(interval_duration) => { + let validation_task_result = dispatcher_clone.enqueue_task_with_responder( method_to_async_task_closure!( get_validation_status, @@ -278,6 +315,8 @@ where &cloned_candidate.as_short_id()) ).await; + trace!(target: tracing_targets::VALIDATOR, block = %cloned_candidate, "Enqueued task for getting validation status"); + match validation_task_result { Ok(receiver) => match receiver.await.unwrap() { Ok(ValidatorTaskResult::ValidationStatus(validation_status)) => { @@ -286,27 +325,26 @@ where break; } + trace!(target: tracing_targets::VALIDATOR, block = %cloned_candidate, "Validation status is not set yet. Enqueueing validation task"); dispatcher_clone.enqueue_task(method_to_async_task_closure!( validate_candidate, cloned_candidate, session_seqno, current_validator_pubkey )).await.expect("Failed to validate candidate"); + trace!(target: tracing_targets::VALIDATOR, block = %cloned_candidate, "Enqueued validation task"); }, Ok(e) => panic!("Unexpected response from get_validation_status: {:?}", e), Err(e) => panic!("Failed to get validation status: {:?}", e), }, Err(e) => panic!("Failed to enqueue validation task: {:?}", e), } - - if attempts >= max_retries { - warn!(target: tracing_targets::VALIDATOR, "Max retries reached without successful validation for block {:?}.", cloned_candidate); - break; - } } } + iteration += 1; } }); + Ok(ValidatorTaskResult::Void) } @@ -333,6 +371,7 @@ where block_id_short: BlockIdShort, signatures: Vec<([u8; 32], [u8; 64])>, ) -> Result { + trace!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Processing candidate signature response"); // Simplified session retrieval let session = self .validation_state @@ -379,11 +418,17 @@ where .await?; } ValidationResult::Invalid => { + trace!(target: tracing_targets::VALIDATOR, block = %block_id_short, "Block is invalid"); self.on_block_validated_event(block, OnValidatedBlockEvent::Invalid) .await?; } - ValidationResult::Insufficient => { - debug!("Insufficient signatures for block {:?}", block_id_short); + ValidationResult::Insufficient(total_valid_weight, valid_weight) => { + trace!( + "Insufficient signatures for block {:?}. Total valid weight: {}. Required weight: {}", + block_id_short, + total_valid_weight, + valid_weight + ); } } } else { @@ -424,6 +469,7 @@ where session_seqno: u32, current_validator_pubkey: everscale_crypto::ed25519::PublicKey, ) -> Result { + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Validating candidate"); let block_id_short = candidate_id.as_short_id(); let validation_state = &self.validation_state; @@ -431,22 +477,22 @@ where .get_session(session_seqno) .ok_or(anyhow!("Session not found"))?; + trace!(target: tracing_targets::VALIDATOR, block = %candidate_id, "Getting validators"); let dispatcher = self.get_dispatcher(); - - let block_from_state = self - .state_node_adapter - .load_block_handle(&candidate_id) - .await?; + let state_node_adapter = self.state_node_adapter.clone(); let validators = session.validators_without_signatures(&block_id_short); let private_overlay = session.get_overlay().clone(); - let current_signatures = session.get_valid_signatures(&candidate_id.as_short_id()); - let network = self.network.clone(); tokio::spawn(async move { + let block_from_state = state_node_adapter + .load_block_handle(&candidate_id) + .await + .expect("Failed to load block from state"); + if block_from_state.is_some() { let result = dispatcher .clone() @@ -457,8 +503,7 @@ where .await; if let Err(e) = result { - error!(err = %e, "Failed to validate block by state"); - panic!("Failed to validate block by state {e}"); + panic!("Failed to validate block by state {e:?}"); } } else { let payload = SignaturesQuery::create( @@ -468,43 +513,57 @@ where ); for validator in validators { - if validator.public_key != current_validator_pubkey { - trace!(target: tracing_targets::VALIDATOR, validator_pubkey=?validator.public_key.as_bytes(), "trying to send request for getting signatures from validator"); - let response = private_overlay - .query( - network.dht_client.network(), - &PeerId(validator.public_key.to_bytes()), - Request::from_tl(payload.clone()), + let cloned_private_overlay = private_overlay.clone(); + let cloned_network = network.dht_client.network().clone(); + let cloned_payload = Request::from_tl(payload.clone()); + let cloned_dispatcher = dispatcher.clone(); + tokio::spawn(async move { + if validator.public_key != current_validator_pubkey { + trace!(target: tracing_targets::VALIDATOR, validator_pubkey=?validator.public_key.as_bytes(), "trying to send request for getting signatures from validator"); + + let response = tokio::time::timeout( + Duration::from_secs(1), + cloned_private_overlay.query( + &cloned_network, + &PeerId(validator.public_key.to_bytes()), + cloned_payload, + ), ) .await; - match response { - Ok(response) => { - let response = response.parse_tl::(); - match response { - Ok(signatures) => { - let enqueue_task_result = dispatcher - .enqueue_task(method_to_async_task_closure!( - process_candidate_signature_response, - signatures.session_seqno, - signatures.block_id_short, - signatures.signatures - )) - .await; - - if let Err(e) = enqueue_task_result { - error!(err = %e, "Failed to enqueue task for processing signatures response"); + + match response { + Ok(Ok(response)) => { + let response = response.parse_tl::(); + trace!(target: tracing_targets::VALIDATOR, "Received response from overlay"); + match response { + Ok(signatures) => { + let enqueue_task_result = cloned_dispatcher + .enqueue_task(method_to_async_task_closure!( + process_candidate_signature_response, + signatures.session_seqno, + signatures.block_id_short, + signatures.signatures + )) + .await; + trace!(target: tracing_targets::VALIDATOR, "Enqueued task for processing signatures response"); + if let Err(e) = enqueue_task_result { + panic!("Failed to enqueue task for processing signatures response: {e}"); + } + } + Err(e) => { + panic!("Failed convert signatures response to SignaturesQuery: {e}"); } - } - Err(e) => { - error!(err = %e, "Failed convert signatures response to SignaturesQuery"); } } - } - Err(e) => { - error!(err = %e, "Failed to get response from overlay"); + Ok(Err(e)) => { + warn!("Failed to get response from overlay: {e}"); + } + Err(e) => { + warn!("Network request timed out: {e}"); + } } } - } + }); } } }); diff --git a/collator/tests/collation_tests.rs b/collator/tests/collation_tests.rs index 6c74cccbc..17873a28a 100644 --- a/collator/tests/collation_tests.rs +++ b/collator/tests/collation_tests.rs @@ -11,6 +11,7 @@ use futures_util::{future::BoxFuture, FutureExt}; use sha2::Digest; use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_collator::manager::CollationManager; +use tycho_collator::test_utils::prepare_test_storage; use tycho_collator::{ mempool::{MempoolAdapterBuilder, MempoolAdapterBuilderStdImpl, MempoolAdapterStdImpl}, state_node::{StateNodeAdapterBuilder, StateNodeAdapterBuilderStdImpl}, @@ -117,100 +118,3 @@ fn supported_capabilities() -> u64 { | GlobalCapability::CapsTvmBugfixes2022 as u64; caps } - -struct DummyArchiveProvider; -impl BlockProvider for DummyArchiveProvider { - type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; - type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; - - fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> { - futures_util::future::ready(None).boxed() - } - - fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> { - futures_util::future::ready(None).boxed() - } -} - -async fn prepare_test_storage() -> Result<(DummyArchiveProvider, Arc)> { - let provider = DummyArchiveProvider; - let temp = tempfile::tempdir().unwrap(); - let db = Db::open(temp.path().to_path_buf(), DbOptions::default()).unwrap(); - let storage = Storage::new(db, temp.path().join("file"), 1_000_000).unwrap(); - let tracker = MinRefMcStateTracker::default(); - - // master state - let master_bytes = include_bytes!("../src/state_node/tests/data/test_state_2_master.boc"); - let master_file_hash: HashBytes = sha2::Sha256::digest(master_bytes).into(); - let master_root = Boc::decode(master_bytes)?; - let master_root_hash = *master_root.repr_hash(); - let master_state = master_root.parse::()?; - - let mc_state_extra = master_state.load_custom()?; - let mc_state_extra = mc_state_extra.unwrap(); - let mut shard_info_opt = None; - for shard_info in mc_state_extra.shards.iter() { - shard_info_opt = Some(shard_info?); - break; - } - let shard_info = shard_info_opt.unwrap(); - - let master_id = BlockId { - shard: master_state.shard_ident, - seqno: master_state.seqno, - root_hash: master_root_hash, - file_hash: master_file_hash, - }; - let master_state_stuff = - ShardStateStuff::from_state_and_root(master_id, master_state, master_root, &tracker)?; - - let (handle, _) = storage.block_handle_storage().create_or_load_handle( - &master_id, - BlockMetaData { - is_key_block: mc_state_extra.after_key_block, - gen_utime: master_state_stuff.state().gen_utime, - mc_ref_seqno: Some(0), - }, - )?; - - storage - .shard_state_storage() - .store_state(&handle, &master_state_stuff) - .await?; - - // shard state - let shard_bytes = include_bytes!("../src/state_node/tests/data/test_state_2_0:80.boc"); - let shard_file_hash: HashBytes = sha2::Sha256::digest(shard_bytes).into(); - let shard_root = Boc::decode(shard_bytes)?; - let shard_root_hash = *shard_root.repr_hash(); - let shard_state = shard_root.parse::()?; - let shard_id = BlockId { - shard: shard_info.0, - seqno: shard_info.1.seqno, - root_hash: shard_info.1.root_hash, - file_hash: shard_info.1.file_hash, - }; - let shard_state_stuff = - ShardStateStuff::from_state_and_root(shard_id, shard_state, shard_root, &tracker)?; - - let (handle, _) = storage.block_handle_storage().create_or_load_handle( - &shard_id, - BlockMetaData { - is_key_block: false, - gen_utime: shard_state_stuff.state().gen_utime, - mc_ref_seqno: Some(0), - }, - )?; - - storage - .shard_state_storage() - .store_state(&handle, &shard_state_stuff) - .await?; - - storage - .node_state() - .store_last_mc_block_id(&master_id) - .unwrap(); - - Ok((provider, storage)) -} diff --git a/collator/tests/validator_tests.rs b/collator/tests/validator_tests.rs index e1d24833c..a3d3d5083 100644 --- a/collator/tests/validator_tests.rs +++ b/collator/tests/validator_tests.rs @@ -17,17 +17,19 @@ use tokio::sync::{Mutex, Notify}; use tracing::debug; use tycho_block_util::block::ValidatorSubsetInfo; -use tycho_block_util::state::ShardStateStuff; +use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_collator::state_node::{ StateNodeAdapterBuilder, StateNodeAdapterBuilderStdImpl, StateNodeEventListener, }; -use tycho_collator::test_utils::try_init_test_tracing; +use tycho_collator::test_utils::{prepare_test_storage, try_init_test_tracing}; use tycho_collator::types::{CollationSessionInfo, OnValidatedBlockEvent, ValidatorNetwork}; use tycho_collator::validator::state::{ValidationState, ValidationStateStdImpl}; use tycho_collator::validator::types::ValidationSessionInfo; use tycho_collator::validator::validator::{Validator, ValidatorEventListener, ValidatorStdImpl}; use tycho_collator::validator::validator_processor::ValidatorProcessorStdImpl; -use tycho_core::block_strider::prepare_state_apply; +use tycho_core::block_strider::state::BlockStriderState; +use tycho_core::block_strider::subscriber::test::PrintSubscriber; +use tycho_core::block_strider::{prepare_state_apply, BlockStrider}; use tycho_network::{ DhtClient, DhtConfig, DhtService, Network, OverlayService, PeerId, PeerResolver, Router, }; @@ -54,6 +56,11 @@ impl TestValidatorEventListener { let mut received = self.received_notifications.lock().await; *received += 1; if *received == *self.expected_notifications.lock().await { + println!( + "received: {}, expected: {}", + *received, + *self.expected_notifications.lock().await + ); self.notify.notify_one(); } } @@ -69,7 +76,6 @@ impl ValidatorEventListener for TestValidatorEventListener { let mut validated_blocks = self.validated_blocks.lock().await; validated_blocks.push(block_id); self.increment_and_check().await; - println!("block validated event"); Ok(()) } } @@ -162,7 +168,16 @@ async fn test_validator_accept_block_by_state() -> anyhow::Result<()> { let test_listener = TestValidatorEventListener::new(1); let _state_node_event_listener: Arc = test_listener.clone(); - let (_, storage) = prepare_state_apply().await?; + let (provider, storage) = prepare_test_storage().await.unwrap(); + + let block_strider = BlockStrider::builder() + .with_provider(provider) + .with_subscriber(PrintSubscriber) + .with_state(storage.clone()) + .build_with_state_applier(MinRefMcStateTracker::default(), storage.clone()); + + block_strider.run().await.unwrap(); + let state_node_adapter = Arc::new(StateNodeAdapterBuilderStdImpl::new(storage.clone()).build(test_listener.clone())); let _validation_state = ValidationStateStdImpl::new(); @@ -227,7 +242,7 @@ async fn test_validator_accept_block_by_state() -> anyhow::Result<()> { prev_total_weight: 0, }; - let block_id = BlockId::from_str("-1:8000000000000000:0:58ffca1a178daff705de54216e5433c9bd2e7d850070d334d38997847ab9e845:d270b87b2952b5ba7daa70aaf0a8c361befcf4d8d2db92f9640d5443070838e4")?; + let block_id = storage.load_last_traversed_master_block_id(); let block_handle = storage.block_handle_storage().load_handle(&block_id)?; assert!(block_handle.is_some(), "Block handle not found in storage."); @@ -253,34 +268,36 @@ async fn test_validator_accept_block_by_state() -> anyhow::Result<()> { .unwrap(); test_listener.notify.notified().await; - let validated_blocks = test_listener.validated_blocks.lock().await; - assert!(!validated_blocks.is_empty(), "No blocks were validated."); + assert_eq!( + validated_blocks.len() as u32, + 1, + "Expected each validator to validate the block once." + ); Ok(()) } #[tokio::test] async fn test_validator_accept_block_by_network() -> anyhow::Result<()> { try_init_test_tracing(tracing_subscriber::filter::LevelFilter::DEBUG); + tycho_util::test::init_logger("test_validator_accept_block_by_network"); - let network_nodes = make_network(3); - let blocks_amount = 1; // Assuming you expect 3 validation per node. - - let expected_validations = network_nodes.len() as u32; // Expecting each node to validate - let _test_listener = TestValidatorEventListener::new(expected_validations); + let network_nodes = make_network(2); + let blocks_amount = 100; + let sessions = 2; let mut validators = vec![]; let mut listeners = vec![]; // Track listeners for later validation for node in network_nodes { // Create a unique listener for each validator - let test_listener = TestValidatorEventListener::new(blocks_amount); + let test_listener = TestValidatorEventListener::new(blocks_amount * sessions); listeners.push(test_listener.clone()); let state_node_adapter = Arc::new( StateNodeAdapterBuilderStdImpl::new(build_tmp_storage()?).build(test_listener.clone()), ); - let _validation_state = ValidationStateStdImpl::new(); + let network = ValidatorNetwork { overlay_service: node.overlay_service.clone(), dht_client: node.dht_client.clone(), @@ -295,9 +312,8 @@ async fn test_validator_accept_block_by_network() -> anyhow::Result<()> { } let mut validators_descriptions = vec![]; - for (_validator, node) in &validators { + for (_, node) in &validators { let peer_id = node.network.peer_id(); - let _keypair = node.keypair; validators_descriptions.push(ValidatorDescription { public_key: (*peer_id.as_bytes()).into(), weight: 1, @@ -307,55 +323,56 @@ async fn test_validator_accept_block_by_network() -> anyhow::Result<()> { }); } - let blocks = create_blocks(blocks_amount); - let validators_subset_info = ValidatorSubsetInfo { validators: validators_descriptions, short_hash: 0, }; - for (validator, _node) in &validators { - let collator_session_info = Arc::new(CollationSessionInfo::new( - 1, - validators_subset_info.clone(), - Some(_node.keypair), // Ensure you use the node's keypair correctly here - )); - // Assuming this setup is correct and necessary for each validator - - let validation_session = - Arc::new(ValidationSessionInfo::try_from(collator_session_info.clone()).unwrap()); - validator - .enqueue_add_session(validation_session) - .await - .unwrap(); - } - tokio::time::sleep(Duration::from_secs(1)).await; + for session in 1..=sessions { + let blocks = create_blocks(blocks_amount); - for (validator, _node) in &validators { - let collator_session_info = Arc::new(CollationSessionInfo::new( - 1, - validators_subset_info.clone(), - Some(_node.keypair), // Ensure you use the node's keypair correctly here - )); + for (validator, _node) in &validators { + let collator_session_info = Arc::new(CollationSessionInfo::new( + session, + validators_subset_info.clone(), + Some(_node.keypair), // Ensure you use the node's keypair correctly here + )); + // Assuming this setup is correct and necessary for each validator - for block in blocks.iter() { + let validation_session = + Arc::new(ValidationSessionInfo::try_from(collator_session_info.clone()).unwrap()); validator - .enqueue_candidate_validation( - *block, - collator_session_info.seqno(), - *collator_session_info.current_collator_keypair().unwrap(), - ) + .enqueue_add_session(validation_session) .await .unwrap(); } + + for (validator, _node) in &validators { + let collator_session_info = Arc::new(CollationSessionInfo::new( + session, + validators_subset_info.clone(), + Some(_node.keypair), // Ensure you use the node's keypair correctly here + )); + + for block in blocks.iter() { + validator + .enqueue_candidate_validation( + *block, + collator_session_info.seqno(), + *collator_session_info.current_collator_keypair().unwrap(), + ) + .await + .unwrap(); + } + } } for listener in listeners { listener.notify.notified().await; let validated_blocks = listener.validated_blocks.lock().await; assert_eq!( - validated_blocks.len(), - blocks_amount as usize, + validated_blocks.len() as u32, + sessions * blocks_amount, "Expected each validator to validate the block once." ); }