diff --git a/core/src/block_strider/provider.rs b/core/src/block_strider/provider.rs index a74c2e2cc..3ab1799f0 100644 --- a/core/src/block_strider/provider.rs +++ b/core/src/block_strider/provider.rs @@ -4,9 +4,8 @@ use std::sync::Arc; use everscale_types::models::BlockId; use futures_util::future::BoxFuture; -use tycho_block_util::archive::WithArchiveData; use tycho_block_util::block::{BlockStuff, BlockStuffAug}; -use tycho_storage::{BlockConnection, Storage}; +use tycho_storage::Storage; use crate::blockchain_client::BlockchainClient; use crate::proto::overlay::BlockFull; @@ -103,7 +102,7 @@ impl BlockProvider for BlockchainClient { BlockFull::Empty => unreachable!(), }; - match BlockStuff::deserialize_checked(block_id, data) { + match BlockStuff::deserialize(block_id, data) { Ok(block) => { res.mark_response(true); Some(Ok(BlockStuffAug::new(block, data.clone()))) @@ -150,7 +149,7 @@ impl BlockProvider for BlockchainClient { block_id, block: data, .. - } => match BlockStuff::deserialize_checked(*block_id, data) { + } => match BlockStuff::deserialize(*block_id, data) { Ok(block) => Some(Ok(BlockStuffAug::new(block, data.clone()))), Err(e) => { res.mark_response(false); diff --git a/core/src/overlay_client/public_overlay_client.rs b/core/src/overlay_client/public_overlay_client.rs index 6c2c967a5..531e30014 100644 --- a/core/src/overlay_client/public_overlay_client.rs +++ b/core/src/overlay_client/public_overlay_client.rs @@ -70,7 +70,7 @@ impl PublicOverlayClient { } pub async fn entries_removed(&self) { - self.0.overlay.entries_removed().notified().await + self.0.overlay.entries_removed().notified().await; } pub fn neighbour_update_interval_ms(&self) -> u64 { self.0.settings.neighbours_update_interval diff --git a/core/tests/block_strider.rs b/core/tests/block_strider.rs index 3b46f1e9d..c65424d9f 100644 --- a/core/tests/block_strider.rs +++ b/core/tests/block_strider.rs @@ -1,7 +1,6 @@ use std::collections::BTreeMap; use std::time::Duration; -use everscale_types::models::BlockId; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; use tycho_core::block_strider::provider::BlockProvider; @@ -18,11 +17,15 @@ async fn storage_block_strider() -> anyhow::Result<()> { let (storage, tmp_dir) = common::storage::init_storage().await?; - let block = storage.get_block(&BlockId::default()).await; - assert!(block.is_none()); + let block_ids = common::storage::get_block_ids()?; + for block_id in block_ids { + if block_id.shard.is_masterchain() { + let block = storage.get_block(&block_id).await; - let next_block = storage.get_next_block(&BlockId::default()).await; - assert!(next_block.is_none()); + assert!(block.is_some()); + assert_eq!(&block_id, block.unwrap()?.id()); + } + } tmp_dir.close()?; @@ -42,7 +45,7 @@ async fn overlay_block_strider() -> anyhow::Result<()> { let (storage, tmp_dir) = common::storage::init_storage().await?; - const NODE_COUNT: usize = 5; + const NODE_COUNT: usize = 10; let nodes = common::node::make_network(storage, NODE_COUNT); tracing::info!("discovering nodes"); @@ -113,11 +116,17 @@ async fn overlay_block_strider() -> anyhow::Result<()> { Default::default(), ); - let block = client.get_block(&BlockId::default()).await; - assert!(block.is_none()); + let block_ids = common::storage::get_block_ids()?; + for block_id in block_ids { + if block_id.shard.is_masterchain() { + let block = client.get_block(&block_id).await; + + assert!(block.is_some()); + assert_eq!(&block_id, block.unwrap()?.id()); - let block = client.get_next_block(&BlockId::default()).await; - assert!(block.is_none()); + break; + } + } tmp_dir.close()?; diff --git a/core/tests/common/archive.rs b/core/tests/common/archive.rs new file mode 100644 index 000000000..c5a6fcf60 --- /dev/null +++ b/core/tests/common/archive.rs @@ -0,0 +1,104 @@ +#![allow(clippy::map_err_ignore)] + +use std::collections::BTreeMap; + +use anyhow::Result; +use everscale_types::cell::Load; +use everscale_types::models::{Block, BlockId, BlockIdShort, BlockProof}; +use sha2::Digest; + +use tycho_block_util::archive::{ArchiveEntryId, ArchiveReader}; + +pub struct Archive { + pub blocks: BTreeMap, +} + +impl Archive { + pub fn new(data: &[u8]) -> Result { + let reader = ArchiveReader::new(data)?; + + let mut res = Archive { + blocks: Default::default(), + }; + + for data in reader { + let entry = data?; + match ArchiveEntryId::from_filename(entry.name)? { + ArchiveEntryId::Block(id) => { + let block = deserialize_block(&id, entry.data)?; + res.blocks.entry(id).or_default().block = Some(block); + } + ArchiveEntryId::Proof(id) if id.shard.workchain() == -1 => { + let proof = deserialize_block_proof(&id, entry.data, false)?; + res.blocks.entry(id).or_default().proof = Some(proof); + } + ArchiveEntryId::ProofLink(id) if id.shard.workchain() != -1 => { + let proof = deserialize_block_proof(&id, entry.data, true)?; + res.blocks.entry(id).or_default().proof = Some(proof); + } + _ => continue, + } + } + Ok(res) + } +} + +#[derive(Default)] +pub struct ArchiveDataEntry { + pub block: Option, + pub proof: Option, +} + +pub(crate) fn deserialize_block(id: &BlockId, data: &[u8]) -> Result { + let file_hash = sha2::Sha256::digest(data); + if id.file_hash.as_slice() != file_hash.as_slice() { + Err(ArchiveDataError::InvalidFileHash(id.as_short_id())) + } else { + let root = everscale_types::boc::Boc::decode(data) + .map_err(|_| ArchiveDataError::InvalidBlockData)?; + if &id.root_hash != root.repr_hash() { + return Err(ArchiveDataError::InvalidRootHash); + } + + Block::load_from(&mut root.as_slice()?).map_err(|_| ArchiveDataError::InvalidBlockData) + } +} + +pub(crate) fn deserialize_block_proof( + block_id: &BlockId, + data: &[u8], + is_link: bool, +) -> Result { + let root = + everscale_types::boc::Boc::decode(data).map_err(|_| ArchiveDataError::InvalidBlockProof)?; + let proof = BlockProof::load_from(&mut root.as_slice()?) + .map_err(|_| ArchiveDataError::InvalidBlockProof)?; + + if &proof.proof_for != block_id { + return Err(ArchiveDataError::ProofForAnotherBlock); + } + + if !block_id.shard.workchain() == -1 && !is_link { + Err(ArchiveDataError::ProofForNonMasterchainBlock) + } else { + Ok(proof) + } +} + +#[derive(thiserror::Error, Debug)] +pub(crate) enum ArchiveDataError { + #[error("Invalid file hash {0}")] + InvalidFileHash(BlockIdShort), + #[error("Invalid root hash")] + InvalidRootHash, + #[error("Invalid block data")] + InvalidBlockData, + #[error("Invalid block proof")] + InvalidBlockProof, + #[error("Proof for another block")] + ProofForAnotherBlock, + #[error("Proof for non-masterchain block")] + ProofForNonMasterchainBlock, + #[error(transparent)] + TypeError(#[from] everscale_types::error::Error), +} diff --git a/core/tests/common/mod.rs b/core/tests/common/mod.rs index 4dc8b5c77..7b1740ed6 100644 --- a/core/tests/common/mod.rs +++ b/core/tests/common/mod.rs @@ -1,2 +1,3 @@ +pub mod archive; pub mod node; pub mod storage; diff --git a/core/tests/common/storage.rs b/core/tests/common/storage.rs index 796e62563..5d5c3c5e4 100644 --- a/core/tests/common/storage.rs +++ b/core/tests/common/storage.rs @@ -1,11 +1,15 @@ use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use bytesize::ByteSize; +use everscale_types::models::BlockId; use tempfile::TempDir; -use tycho_storage::{Db, DbOptions, Storage}; +use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug}; +use tycho_storage::{BlockMetaData, Db, DbOptions, Storage}; -pub(crate) async fn init_storage() -> Result<(Arc, TempDir)> { +use crate::common::*; + +pub(crate) async fn init_empty_storage() -> Result<(Arc, TempDir)> { let tmp_dir = tempfile::tempdir()?; let root_path = tmp_dir.path(); @@ -26,3 +30,107 @@ pub(crate) async fn init_storage() -> Result<(Arc, TempDir)> { Ok((storage, tmp_dir)) } + +pub(crate) fn get_block_ids() -> Result> { + let data = include_bytes!("../../tests/data/00001"); + let archive = archive::Archive::new(data)?; + + let block_ids = archive + .blocks + .into_iter() + .map(|(block_id, _)| block_id) + .collect(); + + Ok(block_ids) +} + +pub(crate) fn get_archive() -> Result { + let data = include_bytes!("../../tests/data/00001"); + let archive = archive::Archive::new(data)?; + + Ok(archive) +} + +pub(crate) async fn init_storage() -> Result<(Arc, TempDir)> { + let (storage, tmp_dir) = init_empty_storage().await?; + + let data = include_bytes!("../../tests/data/00001"); + let provider = archive::Archive::new(data)?; + + for (block_id, archive) in provider.blocks { + if block_id.shard.is_masterchain() { + let block = archive.block.unwrap(); + let proof = archive.proof.unwrap(); + + let info = block.info.load().context("Failed to load block info")?; + + let meta = BlockMetaData { + is_key_block: info.key_block, + gen_utime: info.gen_utime, + mc_ref_seqno: info + .master_ref + .map(|r| { + r.load() + .context("Failed to load master ref") + .map(|mr| mr.seqno) + }) + .transpose() + .context("Failed to process master ref")?, + }; + + let block_data = everscale_types::boc::BocRepr::encode(&block)?; + let block_stuff = + BlockStuffAug::new(BlockStuff::with_block(block_id, block.clone()), block_data); + + let block_result = storage + .block_storage() + .store_block_data(&block_stuff, meta) + .await?; + + assert!(block_result.new); + + let handle = storage + .block_handle_storage() + .load_handle(&block_id)? + .unwrap(); + + assert_eq!(handle.id(), block_stuff.data.id()); + + let bs = storage + .block_storage() + .load_block_data(&block_result.handle) + .await?; + + assert_eq!(bs.id(), &block_id); + assert_eq!(bs.block(), &block); + + let block_proof = BlockProofStuff::deserialize( + block_id, + everscale_types::boc::BocRepr::encode(&proof)?.as_slice(), + false, + )?; + + let block_proof_with_data = BlockProofStuffAug::new( + block_proof.clone(), + everscale_types::boc::BocRepr::encode(&proof)?, + ); + + let handle = storage + .block_storage() + .store_block_proof(&block_proof_with_data, handle.into()) + .await? + .handle; + + let bp = storage + .block_storage() + .load_block_proof(&handle, false) + .await?; + + assert_eq!(bp.is_link(), block_proof.is_link()); + assert_eq!(bp.proof().root, block_proof.proof().root); + assert_eq!(bp.proof().proof_for, block_proof.proof().proof_for); + } + } + + Ok((storage, tmp_dir)) +} diff --git a/core/tests/overlay_server.rs b/core/tests/overlay_server.rs index 1640d96b2..5d85d9977 100644 --- a/core/tests/overlay_server.rs +++ b/core/tests/overlay_server.rs @@ -1,29 +1,16 @@ use std::collections::BTreeMap; -use std::net::Ipv4Addr; -use std::str::FromStr; -use std::sync::Arc; use std::time::Duration; use anyhow::Result; -use bytesize::ByteSize; -use everscale_crypto::ed25519; use everscale_types::models::BlockId; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; -use tl_proto::{TlRead, TlWrite}; -use tycho_core::block_strider::provider::BlockProvider; use tycho_core::blockchain_client::BlockchainClient; use tycho_core::overlay_client::public_overlay_client::PublicOverlayClient; use tycho_core::overlay_client::settings::OverlayClientSettings; -use tycho_core::overlay_server::{OverlayServer, DEFAULT_ERROR_CODE}; -use tycho_core::proto::overlay::{ - ArchiveInfo, BlockFull, Data, KeyBlockIds, PersistentStatePart, Response, -}; -use tycho_network::{ - DhtClient, DhtConfig, DhtService, Network, OverlayConfig, OverlayId, OverlayService, PeerId, - PeerResolver, PublicOverlay, Request, Router, Service, ServiceRequest, -}; -use tycho_storage::{Db, DbOptions, Storage}; +use tycho_core::overlay_server::DEFAULT_ERROR_CODE; +use tycho_core::proto::overlay::{BlockFull, KeyBlockIds, PersistentStatePart}; +use tycho_network::PeerId; mod common; @@ -37,9 +24,9 @@ async fn overlay_server_with_empty_storage() -> Result<()> { known_by: usize, } - let (storage, tmp_dir) = common::storage::init_storage().await?; + let (storage, tmp_dir) = common::storage::init_empty_storage().await?; - const NODE_COUNT: usize = 5; + const NODE_COUNT: usize = 10; let nodes = common::node::make_network(storage, NODE_COUNT); tracing::info!("discovering nodes"); @@ -170,3 +157,116 @@ async fn overlay_server_with_empty_storage() -> Result<()> { tracing::info!("done!"); Ok(()) } + +#[tokio::test] +async fn overlay_server_blocks() -> Result<()> { + tycho_util::test::init_logger("overlay_server_blocks"); + + #[derive(Debug, Default)] + struct PeerState { + knows_about: usize, + known_by: usize, + } + + let (storage, tmp_dir) = common::storage::init_storage().await?; + + const NODE_COUNT: usize = 10; + let nodes = common::node::make_network(storage, NODE_COUNT); + + tracing::info!("discovering nodes"); + loop { + tokio::time::sleep(Duration::from_secs(1)).await; + + let mut peer_states = BTreeMap::<&PeerId, PeerState>::new(); + + for (i, left) in nodes.iter().enumerate() { + for (j, right) in nodes.iter().enumerate() { + if i == j { + continue; + } + + let left_id = left.network().peer_id(); + let right_id = right.network().peer_id(); + + if left.public_overlay().read_entries().contains(right_id) { + peer_states.entry(left_id).or_default().knows_about += 1; + peer_states.entry(right_id).or_default().known_by += 1; + } + } + } + + tracing::info!("{peer_states:#?}"); + + let total_filled = peer_states + .values() + .filter(|state| state.knows_about == nodes.len() - 1) + .count(); + + tracing::info!( + "peers with filled overlay: {} / {}", + total_filled, + nodes.len() + ); + if total_filled == nodes.len() { + break; + } + } + + tracing::info!("resolving entries..."); + for node in &nodes { + let resolved = FuturesUnordered::new(); + for entry in node.public_overlay().read_entries().iter() { + let handle = entry.resolver_handle.clone(); + resolved.push(async move { handle.wait_resolved().await }); + } + + // Ensure all entries are resolved. + resolved.collect::>().await; + tracing::info!( + peer_id = %node.network().peer_id(), + "all entries resolved", + ); + } + + tracing::info!("making overlay requests..."); + + let node = nodes.first().unwrap(); + + let client = BlockchainClient::new( + PublicOverlayClient::new( + node.network().clone(), + node.public_overlay().clone(), + OverlayClientSettings::default(), + ) + .await, + Default::default(), + ); + + let archive = common::storage::get_archive()?; + for (block_id, block) in archive.blocks { + if block_id.shard.is_masterchain() { + let result = client.get_block_full(block_id.clone()).await; + assert!(result.is_ok()); + + if let Ok(response) = &result { + let proof = everscale_types::boc::BocRepr::encode(block.proof.unwrap())?.into(); + let block = everscale_types::boc::BocRepr::encode(block.block.unwrap())?.into(); + + assert_eq!( + response.data(), + &BlockFull::Found { + block_id, + block, + proof, + is_link: false, + } + ); + } + } + } + + tmp_dir.close()?; + + tracing::info!("done!"); + Ok(()) +} diff --git a/network/src/overlay/public_overlay.rs b/network/src/overlay/public_overlay.rs index a43df9726..69e9f4572 100644 --- a/network/src/overlay/public_overlay.rs +++ b/network/src/overlay/public_overlay.rs @@ -315,7 +315,7 @@ impl PublicOverlay { && !this.banned_peer_ids.contains(&item.entry.peer_id) }); - self.inner.entries_removed.notify_waiters() + self.inner.entries_removed.notify_waiters(); } fn prepend_prefix_to_body(&self, body: &mut Bytes) {