Skip to content

Commit

Permalink
refactor(validator): network request timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
drmick committed Apr 24, 2024
1 parent 68422da commit 4110af2
Show file tree
Hide file tree
Showing 15 changed files with 336 additions and 235 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions collator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,14 @@ tycho-network = { workspace = true }
tycho-storage = { workspace = true }
tycho-util = { workspace = true }
tycho-block-util = { workspace = true }
log = "0.4.21"

[dev-dependencies]
tempfile = { workspace = true }
tracing-test = { workspace = true }
tycho-core = { workspace = true, features = ["test"] }
tycho-storage = { workspace = true, features = ["test"] }
tycho-util = { workspace = true, features = ["test"] }

[features]
test = []
Expand Down
6 changes: 4 additions & 2 deletions collator/src/manager/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ use everscale_types::{
models::{Block, BlockId, BlockIdShort, ShardIdent, Signature},
};

use tycho_util::FastHashMap;

use crate::types::BlockCandidate;

pub(super) type BlockCacheKey = BlockIdShort;
Expand All @@ -20,7 +22,7 @@ pub(super) struct BlocksCache {
pub struct BlockCandidateEntry {
pub key: BlockCacheKey,
pub candidate: BlockCandidate,
pub signatures: HashMap<HashBytes, Signature>,
pub signatures: FastHashMap<HashBytes, Signature>,
}

pub enum SendSyncStatus {
Expand Down Expand Up @@ -105,7 +107,7 @@ impl BlockCandidateContainer {
&mut self,
is_valid: bool,
already_synced: bool,
signatures: HashMap<HashBytes, Signature>,
signatures: FastHashMap<HashBytes, Signature>,
) {
if let Some(ref mut entry) = self.entry {
entry.signatures = signatures;
Expand Down
2 changes: 1 addition & 1 deletion collator/src/manager/utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use anyhow::Result;
use everscale_crypto::ed25519::PublicKey;
use everscale_types::boc::BocRepr;
use everscale_types::models::{Block, ValidatorDescription};
use everscale_types::models::ValidatorDescription;
use tycho_block_util::block::{BlockStuff, BlockStuffAug};

use crate::types::{BlockStuffForSync, CollationConfig};
Expand Down
2 changes: 1 addition & 1 deletion collator/src/mempool/mempool_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use async_trait::async_trait;

use everscale_types::{
cell::{CellBuilder, CellSliceRange, HashBytes},
models::{account, ExtInMsgInfo, IntAddr, MsgInfo, OwnedMessage, StdAddr},
models::{ExtInMsgInfo, IntAddr, MsgInfo, OwnedMessage, StdAddr},
};
use rand::Rng;
use tycho_block_util::state::ShardStateStuff;
Expand Down
107 changes: 107 additions & 0 deletions collator/src/test_utils.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,19 @@
use std::net::Ipv4Addr;
use std::sync::Arc;
use std::time::Duration;

use everscale_crypto::ed25519;
use everscale_types::boc::Boc;
use everscale_types::cell::HashBytes;
use everscale_types::models::{BlockId, ShardStateUnsplit};
use futures_util::future::BoxFuture;
use futures_util::FutureExt;
use sha2::Digest;
use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
use tycho_core::block_strider::provider::{BlockProvider, OptionalBlockStuff};

use tycho_network::{DhtConfig, DhtService, Network, OverlayService, PeerId, Router};
use tycho_storage::{BlockMetaData, Db, DbOptions, Storage};

use crate::types::NodeNetwork;

Expand Down Expand Up @@ -58,3 +68,100 @@ pub fn create_node_network() -> NodeNetwork {
dht_client,
}
}

pub async fn prepare_test_storage() -> anyhow::Result<(DummyArchiveProvider, Arc<Storage>)> {
let provider = DummyArchiveProvider;
let temp = tempfile::tempdir().unwrap();
let db = Db::open(temp.path().to_path_buf(), DbOptions::default()).unwrap();
let storage = Storage::new(db, temp.path().join("file"), 1_000_000).unwrap();
let tracker = MinRefMcStateTracker::default();

// master state
let master_bytes = include_bytes!("../src/state_node/tests/data/test_state_2_master.boc");
let master_file_hash: HashBytes = sha2::Sha256::digest(master_bytes).into();
let master_root = Boc::decode(master_bytes)?;
let master_root_hash = *master_root.repr_hash();
let master_state = master_root.parse::<ShardStateUnsplit>()?;

let mc_state_extra = master_state.load_custom()?;
let mc_state_extra = mc_state_extra.unwrap();
let mut shard_info_opt = None;
for shard_info in mc_state_extra.shards.iter() {
shard_info_opt = Some(shard_info?);
break;
}
let shard_info = shard_info_opt.unwrap();

let master_id = BlockId {
shard: master_state.shard_ident,
seqno: master_state.seqno,
root_hash: master_root_hash,
file_hash: master_file_hash,
};
let master_state_stuff =
ShardStateStuff::from_state_and_root(master_id, master_state, master_root, &tracker)?;

let (handle, _) = storage.block_handle_storage().create_or_load_handle(
&master_id,
BlockMetaData {
is_key_block: mc_state_extra.after_key_block,
gen_utime: master_state_stuff.state().gen_utime,
mc_ref_seqno: Some(0),
},
)?;

storage
.shard_state_storage()
.store_state(&handle, &master_state_stuff)
.await?;

// shard state
let shard_bytes = include_bytes!("../src/state_node/tests/data/test_state_2_0:80.boc");
let shard_file_hash: HashBytes = sha2::Sha256::digest(shard_bytes).into();
let shard_root = Boc::decode(shard_bytes)?;
let shard_root_hash = *shard_root.repr_hash();
let shard_state = shard_root.parse::<ShardStateUnsplit>()?;
let shard_id = BlockId {
shard: shard_info.0,
seqno: shard_info.1.seqno,
root_hash: shard_info.1.root_hash,
file_hash: shard_info.1.file_hash,
};
let shard_state_stuff =
ShardStateStuff::from_state_and_root(shard_id, shard_state, shard_root, &tracker)?;

let (handle, _) = storage.block_handle_storage().create_or_load_handle(
&shard_id,
BlockMetaData {
is_key_block: false,
gen_utime: shard_state_stuff.state().gen_utime,
mc_ref_seqno: Some(0),
},
)?;

storage
.shard_state_storage()
.store_state(&handle, &shard_state_stuff)
.await?;

storage
.node_state()
.store_last_mc_block_id(&master_id)
.unwrap();

Ok((provider, storage))
}

pub struct DummyArchiveProvider;
impl BlockProvider for DummyArchiveProvider {
type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;

fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
futures_util::future::ready(None).boxed()
}

fn get_block<'a>(&'a self, block_id: &'a BlockId) -> Self::GetBlockFut<'a> {
futures_util::future::ready(None).boxed()
}
}
5 changes: 3 additions & 2 deletions collator/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use everscale_types::models::{
use tycho_block_util::block::{BlockStuffAug, ValidatorSubsetInfo};
use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff};
use tycho_network::{DhtClient, OverlayService, PeerResolver};
use tycho_util::FastHashMap;

pub struct CollationConfig {
pub key_pair: KeyPair,
Expand Down Expand Up @@ -121,7 +122,7 @@ impl OnValidatedBlockEvent {

#[derive(Default, Clone)]
pub struct BlockSignatures {
pub signatures: HashMap<HashBytes, Signature>,
pub signatures: FastHashMap<HashBytes, Signature>,
}

pub struct ValidatedBlock {
Expand Down Expand Up @@ -160,7 +161,7 @@ pub struct BlockStuffForSync {
//TODO: remove `block_id` and make `block_stuff: BlockStuff` when collator will generate real blocks
pub block_id: BlockId,
pub block_stuff_aug: BlockStuffAug,
pub signatures: HashMap<HashBytes, Signature>,
pub signatures: FastHashMap<HashBytes, Signature>,
pub prev_blocks_ids: Vec<BlockId>,
pub top_shard_blocks_ids: Vec<BlockId>,
}
Expand Down
8 changes: 7 additions & 1 deletion collator/src/utils/async_queued_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::{future::Future, pin::Pin};
use std::{future::Future, pin::Pin, usize};

use anyhow::{anyhow, Result};
use log::trace;
use tokio::sync::{mpsc, oneshot};

use crate::tracing_targets;
Expand Down Expand Up @@ -33,6 +34,11 @@ where
pub fn run(mut worker: W, mut receiver: mpsc::Receiver<AsyncTaskDesc<W, R>>) {
tokio::spawn(async move {
while let Some(task) = receiver.recv().await {
trace!(
target: tracing_targets::ASYNC_QUEUE_DISPATCHER,
"Task #{} ({}): received",
task.id(),
task.get_descr());
let (task_id, task_descr) = (task.id(), task.get_descr());
let (func, responder) = task.extract();
tracing::trace!(
Expand Down
3 changes: 2 additions & 1 deletion collator/src/validator/network/dto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use everscale_types::cell::HashBytes;
use everscale_types::models::{BlockIdShort, Signature};
use tl_proto::{TlRead, TlWrite};
use tycho_util::FastHashMap;

#[derive(Debug, Clone, TlRead, TlWrite)]
#[tl(boxed, id = 0x11112222)]
Expand All @@ -17,7 +18,7 @@ impl SignaturesQuery {
pub(crate) fn create(
session_seqno: u32,
block_header: BlockIdShort,
current_signatures: &HashMap<HashBytes, Signature>,
current_signatures: &FastHashMap<HashBytes, Signature>,
) -> Self {
let signatures = current_signatures.iter().map(|(k, v)| (k.0, v.0)).collect();
Self {
Expand Down
26 changes: 13 additions & 13 deletions collator/src/validator/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,15 @@ use anyhow::{bail, Context};
use everscale_types::cell::HashBytes;
use everscale_types::models::{BlockId, BlockIdShort, Signature};

use tycho_network::PrivateOverlay;

use crate::validator::types::{
BlockValidationCandidate, ValidationResult, ValidationSessionInfo, ValidatorInfo,
};
use tycho_network::PrivateOverlay;
use tycho_util::FastHashMap;

struct SignatureMaps {
valid_signatures: HashMap<HashBytes, Signature>,
invalid_signatures: HashMap<HashBytes, Signature>,
valid_signatures: FastHashMap<HashBytes, Signature>,
invalid_signatures: FastHashMap<HashBytes, Signature>,
}

/// Represents the state of validation for blocks and sessions.
Expand All @@ -35,8 +35,8 @@ pub trait ValidationState: Send + Sync + 'static {
pub struct SessionInfo {
session_id: u32,
max_weight: u64,
blocks_signatures: HashMap<BlockIdShort, (BlockId, SignatureMaps)>,
cached_signatures: HashMap<BlockIdShort, HashMap<HashBytes, Signature>>,
blocks_signatures: FastHashMap<BlockIdShort, (BlockId, SignatureMaps)>,
cached_signatures: FastHashMap<BlockIdShort, FastHashMap<HashBytes, Signature>>,
validation_session_info: Arc<ValidationSessionInfo>,
private_overlay: PrivateOverlay,
}
Expand Down Expand Up @@ -108,6 +108,7 @@ impl SessionInfo {

/// Determines the validation status of a block.
pub fn validation_status(&self, block_id_short: &BlockIdShort) -> ValidationResult {
let valid_weight = self.max_weight * 2 / 3 + 1;
if let Some((_, signature_maps)) = self.blocks_signatures.get(block_id_short) {
let total_valid_weight: u64 = signature_maps
.valid_signatures
Expand All @@ -120,16 +121,15 @@ impl SessionInfo {
})
.sum();

let valid_weight = self.max_weight * 2 / 3 + 1;
if total_valid_weight >= valid_weight {
ValidationResult::Valid
} else if self.is_invalid(signature_maps, valid_weight) {
ValidationResult::Invalid
} else {
ValidationResult::Insufficient
ValidationResult::Insufficient(total_valid_weight, valid_weight)
}
} else {
ValidationResult::Insufficient
ValidationResult::Insufficient(0, valid_weight)
}
}
/// Lists validators without signatures for a given block.
Expand Down Expand Up @@ -187,11 +187,11 @@ impl SessionInfo {
pub fn get_valid_signatures(
&self,
block_id_short: &BlockIdShort,
) -> HashMap<HashBytes, Signature> {
) -> FastHashMap<HashBytes, Signature> {
if let Some((_, signature_maps)) = self.blocks_signatures.get(block_id_short) {
signature_maps.valid_signatures.clone()
} else {
HashMap::new()
FastHashMap::default()
}
}

Expand All @@ -211,8 +211,8 @@ impl SessionInfo {
(
*block_id,
SignatureMaps {
valid_signatures: HashMap::new(),
invalid_signatures: HashMap::new(),
valid_signatures: FastHashMap::default(),
invalid_signatures: FastHashMap::default(),
},
)
});
Expand Down
3 changes: 2 additions & 1 deletion collator/src/validator/test_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use everscale_crypto::ed25519::{KeyPair, PublicKey};
use everscale_types::models::{BlockId, BlockIdShort, Signature};

use tycho_block_util::state::ShardStateStuff;
use tycho_util::FastHashMap;

use crate::tracing_targets;
use crate::types::{BlockSignatures, OnValidatedBlockEvent, ValidatorNetwork};
Expand Down Expand Up @@ -75,7 +76,7 @@ where
_session_seqno: u32,
current_validator_keypair: KeyPair,
) -> Result<ValidatorTaskResult> {
let mut signatures = HashMap::new();
let mut signatures = FastHashMap::default();
signatures.insert(
current_validator_keypair.public_key.to_bytes().into(),
Signature::default(),
Expand Down
2 changes: 1 addition & 1 deletion collator/src/validator/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,5 +106,5 @@ pub(crate) struct OverlayNumber {
pub enum ValidationResult {
Valid,
Invalid,
Insufficient,
Insufficient(u64, u64),
}
Loading

0 comments on commit 4110af2

Please sign in to comment.