From 6eac0eefb197f97e38a9bc5f413964f36d3f722f Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Mon, 29 Apr 2024 15:40:04 +0200 Subject: [PATCH] refactor(blockchain-server): unify models --- core/src/block_strider/provider.rs | 6 +- .../mod.rs => blockchain_rpc/client.rs} | 37 +- core/src/blockchain_rpc/mod.rs | 7 + core/src/blockchain_rpc/service.rs | 415 +++++++++++++++++ core/src/lib.rs | 3 +- core/src/overlay_server/mod.rs | 424 ------------------ core/src/proto.tl | 115 +++-- core/src/proto/blockchain.rs | 113 +++++ core/src/proto/mod.rs | 76 ++++ core/src/proto/overlay.rs | 192 +------- core/tests/block_strider.rs | 4 +- core/tests/common/node.rs | 4 +- core/tests/overlay_server.rs | 25 +- 13 files changed, 714 insertions(+), 707 deletions(-) rename core/src/{blockchain_client/mod.rs => blockchain_rpc/client.rs} (75%) create mode 100644 core/src/blockchain_rpc/mod.rs create mode 100644 core/src/blockchain_rpc/service.rs delete mode 100644 core/src/overlay_server/mod.rs create mode 100644 core/src/proto/blockchain.rs diff --git a/core/src/block_strider/provider.rs b/core/src/block_strider/provider.rs index 1b77a1a70..d23d84360 100644 --- a/core/src/block_strider/provider.rs +++ b/core/src/block_strider/provider.rs @@ -7,8 +7,8 @@ use futures_util::future::BoxFuture; use tycho_block_util::block::{BlockStuff, BlockStuffAug}; use tycho_storage::Storage; -use crate::blockchain_client::BlockchainClient; -use crate::proto::overlay::BlockFull; +use crate::blockchain_rpc::BlockchainRpcClient; +use crate::proto::blockchain::BlockFull; pub type OptionalBlockStuff = Option>; @@ -82,7 +82,7 @@ impl BlockProvider for ChainBlockProvider< } } -impl BlockProvider for BlockchainClient { +impl BlockProvider for BlockchainRpcClient { type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; diff --git a/core/src/blockchain_client/mod.rs b/core/src/blockchain_rpc/client.rs similarity index 75% rename from core/src/blockchain_client/mod.rs rename to core/src/blockchain_rpc/client.rs index 06af585e1..7eccbb9b0 100644 --- a/core/src/blockchain_client/mod.rs +++ b/core/src/blockchain_rpc/client.rs @@ -5,15 +5,14 @@ use anyhow::Result; use everscale_types::models::BlockId; use crate::overlay_client::{PublicOverlayClient, QueryResponse}; -use crate::proto::overlay::rpc::*; -use crate::proto::overlay::*; +use crate::proto::blockchain::*; -pub struct BlockchainClientConfig { +pub struct BlockchainRpcClientConfig { pub get_next_block_polling_interval: Duration, pub get_block_polling_interval: Duration, } -impl Default for BlockchainClientConfig { +impl Default for BlockchainRpcClientConfig { fn default() -> Self { Self { get_block_polling_interval: Duration::from_millis(50), @@ -24,17 +23,17 @@ impl Default for BlockchainClientConfig { #[derive(Clone)] #[repr(transparent)] -pub struct BlockchainClient { +pub struct BlockchainRpcClient { inner: Arc, } struct Inner { overlay_client: PublicOverlayClient, - config: BlockchainClientConfig, + config: BlockchainRpcClientConfig, } -impl BlockchainClient { - pub fn new(overlay_client: PublicOverlayClient, config: BlockchainClientConfig) -> Self { +impl BlockchainRpcClient { + pub fn new(overlay_client: PublicOverlayClient, config: BlockchainRpcClientConfig) -> Self { Self { inner: Arc::new(Inner { overlay_client, @@ -47,7 +46,7 @@ impl BlockchainClient { &self.inner.overlay_client } - pub fn config(&self) -> &BlockchainClientConfig { + pub fn config(&self) -> &BlockchainRpcClientConfig { &self.inner.config } @@ -58,8 +57,8 @@ impl BlockchainClient { ) -> Result> { let client = &self.inner.overlay_client; let data = client - .query::<_, KeyBlockIds>(&GetNextKeyBlockIds { - block: *block, + .query::<_, KeyBlockIds>(&rpc::GetNextKeyBlockIds { + block_id: *block, max_size, }) .await?; @@ -69,7 +68,7 @@ impl BlockchainClient { pub async fn get_block_full(&self, block: &BlockId) -> Result> { let client = &self.inner.overlay_client; let data = client - .query::<_, BlockFull>(GetBlockFull { block: *block }) + .query::<_, BlockFull>(&rpc::GetBlockFull { block_id: *block }) .await?; Ok(data) } @@ -80,8 +79,8 @@ impl BlockchainClient { ) -> Result> { let client = &self.inner.overlay_client; let data = client - .query::<_, BlockFull>(GetNextBlockFull { - prev_block: *prev_block, + .query::<_, BlockFull>(&rpc::GetNextBlockFull { + prev_block_id: *prev_block, }) .await?; Ok(data) @@ -90,7 +89,7 @@ impl BlockchainClient { pub async fn get_archive_info(&self, mc_seqno: u32) -> Result> { let client = &self.inner.overlay_client; let data = client - .query::<_, ArchiveInfo>(GetArchiveInfo { mc_seqno }) + .query::<_, ArchiveInfo>(&rpc::GetArchiveInfo { mc_seqno }) .await?; Ok(data) } @@ -103,7 +102,7 @@ impl BlockchainClient { ) -> Result> { let client = &self.inner.overlay_client; let data = client - .query::<_, Data>(GetArchiveSlice { + .query::<_, Data>(&rpc::GetArchiveSlice { archive_id, offset, max_size, @@ -121,9 +120,9 @@ impl BlockchainClient { ) -> Result> { let client = &self.inner.overlay_client; let data = client - .query::<_, PersistentStatePart>(GetPersistentStatePart { - block: *block, - mc_block: *mc_block, + .query::<_, PersistentStatePart>(&rpc::GetPersistentStatePart { + block_id: *block, + mc_block_id: *mc_block, offset, max_size, }) diff --git a/core/src/blockchain_rpc/mod.rs b/core/src/blockchain_rpc/mod.rs new file mode 100644 index 000000000..dac1d9510 --- /dev/null +++ b/core/src/blockchain_rpc/mod.rs @@ -0,0 +1,7 @@ +pub use self::client::{BlockchainRpcClient, BlockchainRpcClientConfig}; +pub use self::service::{BlockchainRpcService, BlockchainRpcServiceConfig}; + +mod client; +mod service; + +pub const INTERNAL_ERROR_CODE: u32 = 1; diff --git a/core/src/blockchain_rpc/service.rs b/core/src/blockchain_rpc/service.rs new file mode 100644 index 000000000..8f642536e --- /dev/null +++ b/core/src/blockchain_rpc/service.rs @@ -0,0 +1,415 @@ +use std::sync::Arc; + +use bytes::Buf; +use serde::{Deserialize, Serialize}; +use tycho_network::{Response, Service, ServiceRequest}; +use tycho_storage::{BlockConnection, KeyBlocksDirection, Storage}; +use tycho_util::futures::BoxFutureOrNoop; + +use crate::blockchain_rpc::INTERNAL_ERROR_CODE; +use crate::proto::blockchain::*; +use crate::proto::overlay; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(default)] +#[non_exhaustive] +pub struct BlockchainRpcServiceConfig { + /// The maximum number of key blocks in the response. + /// + /// Default: 8. + pub max_key_blocks_list_len: usize, + + /// Whether to serve persistent states. + /// + /// Default: yes. + pub serve_persistent_states: bool, +} + +impl Default for BlockchainRpcServiceConfig { + fn default() -> Self { + Self { + max_key_blocks_list_len: 8, + serve_persistent_states: true, + } + } +} + +#[derive(Clone)] +#[repr(transparent)] +pub struct BlockchainRpcService { + inner: Arc, +} + +impl BlockchainRpcService { + pub fn new(storage: Arc, config: BlockchainRpcServiceConfig) -> Self { + Self { + inner: Arc::new(Inner { storage, config }), + } + } +} + +impl Service for BlockchainRpcService { + type QueryResponse = Response; + type OnQueryFuture = BoxFutureOrNoop>; + type OnMessageFuture = futures_util::future::Ready<()>; + type OnDatagramFuture = futures_util::future::Ready<()>; + + #[tracing::instrument( + level = "debug", + name = "on_blockchain_server_query", + skip_all, + fields(peer_id = %req.metadata.peer_id, addr = %req.metadata.remote_address) + )] + fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture { + let (constructor, body) = match self.inner.try_handle_prefix(&req) { + Ok(rest) => rest, + Err(e) => { + tracing::debug!("failed to deserialize query: {e}"); + return BoxFutureOrNoop::Noop; + } + }; + + tycho_network::match_tl_request!(body, tag = constructor, { + rpc::GetNextKeyBlockIds as req => { + tracing::debug!( + block_id = %req.block_id, + max_size = req.max_size, + "getNextKeyBlockIds", + ); + + let inner = self.inner.clone(); + BoxFutureOrNoop::future(async move { + let res = inner.handle_get_next_key_block_ids(&req); + Some(Response::from_tl(res)) + }) + }, + rpc::GetBlockFull as req => { + tracing::debug!(block_id = %req.block_id, "getBlockFull"); + + let inner = self.inner.clone(); + BoxFutureOrNoop::future(async move { + let res = inner.handle_get_block_full(&req).await; + Some(Response::from_tl(res)) + }) + }, + rpc::GetNextBlockFull as req => { + tracing::debug!(prev_block_id = %req.prev_block_id, "getNextBlockFull"); + + let inner = self.inner.clone(); + BoxFutureOrNoop::future(async move { + let res = inner.handle_get_next_block_full(&req).await; + Some(Response::from_tl(res)) + }) + }, + rpc::GetPersistentStatePart as req => { + tracing::debug!( + block_id = %req.block_id, + mc_block_id = %req.mc_block_id, + offset = %req.offset, + max_size = %req.max_size, + "getPersistentStatePart" + ); + + let inner = self.inner.clone(); + BoxFutureOrNoop::future(async move { + let res = inner.handle_get_persistent_state_part(&req).await; + Some(Response::from_tl(res)) + }) + }, + rpc::GetArchiveInfo as req => { + tracing::debug!(mc_seqno = %req.mc_seqno, "getArchiveInfo"); + + let inner = self.inner.clone(); + BoxFutureOrNoop::future(async move { + let res = inner.handle_get_archive_info(&req).await; + Some(Response::from_tl(res)) + }) + }, + rpc::GetArchiveSlice as req => { + tracing::debug!( + archive_id = %req.archive_id, + offset = %req.offset, + max_size = %req.max_size, + "getArchiveSlice" + ); + + let inner = self.inner.clone(); + BoxFutureOrNoop::future(async move { + let res = inner.handle_get_archive_slice(&req).await; + Some(Response::from_tl(res)) + }) + }, + }, e => { + tracing::debug!("failed to deserialize query: {e}"); + BoxFutureOrNoop::Noop + }) + } + + #[inline] + fn on_message(&self, _req: ServiceRequest) -> Self::OnMessageFuture { + futures_util::future::ready(()) + } + + #[inline] + fn on_datagram(&self, _req: ServiceRequest) -> Self::OnDatagramFuture { + futures_util::future::ready(()) + } +} + +struct Inner { + storage: Arc, + config: BlockchainRpcServiceConfig, +} + +impl Inner { + fn storage(&self) -> &Storage { + self.storage.as_ref() + } + + fn try_handle_prefix<'a>( + &self, + req: &'a ServiceRequest, + ) -> Result<(u32, &'a [u8]), tl_proto::TlError> { + let body = req.as_ref(); + if body.len() < 4 { + return Err(tl_proto::TlError::UnexpectedEof); + } + + let constructor = std::convert::identity(body).get_u32_le(); + Ok((constructor, body)) + } + + fn handle_get_next_key_block_ids( + &self, + req: &rpc::GetNextKeyBlockIds, + ) -> overlay::Response { + let block_handle_storage = self.storage().block_handle_storage(); + + let limit = std::cmp::min(req.max_size as usize, self.config.max_key_blocks_list_len); + + let get_next_key_block_ids = || { + if !req.block_id.shard.is_masterchain() { + anyhow::bail!("first block id is not from masterchain"); + } + + let mut iterator = block_handle_storage + .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(req.block_id.seqno)) + .take(limit + 1); + + if let Some(id) = iterator.next().transpose()? { + anyhow::ensure!( + id.root_hash == req.block_id.root_hash, + "first block root hash mismatch" + ); + anyhow::ensure!( + id.file_hash == req.block_id.file_hash, + "first block file hash mismatch" + ); + } + + let mut ids = Vec::with_capacity(limit); + while let Some(id) = iterator.next().transpose()? { + ids.push(id); + if ids.len() >= limit { + break; + } + } + + Ok::<_, anyhow::Error>(ids) + }; + + match get_next_key_block_ids() { + Ok(ids) => { + let incomplete = ids.len() < limit; + overlay::Response::Ok(KeyBlockIds { + block_ids: ids, + incomplete, + }) + } + Err(e) => { + tracing::warn!("get_next_key_block_ids failed: {e:?}"); + overlay::Response::Err(INTERNAL_ERROR_CODE) + } + } + } + + async fn handle_get_block_full(&self, req: &rpc::GetBlockFull) -> overlay::Response { + let block_handle_storage = self.storage().block_handle_storage(); + let block_storage = self.storage().block_storage(); + + let get_block_full = async { + let mut is_link = false; + let block = match block_handle_storage.load_handle(&req.block_id)? { + Some(handle) + if handle.meta().has_data() && handle.has_proof_or_link(&mut is_link) => + { + let block = block_storage.load_block_data_raw(&handle).await?; + let proof = block_storage.load_block_proof_raw(&handle, is_link).await?; + + BlockFull::Found { + block_id: req.block_id, + proof: proof.into(), + block: block.into(), + is_link, + } + } + _ => BlockFull::Empty, + }; + + Ok::<_, anyhow::Error>(block) + }; + + match get_block_full.await { + Ok(block_full) => overlay::Response::Ok(block_full), + Err(e) => { + tracing::warn!("get_block_full failed: {e:?}"); + overlay::Response::Err(INTERNAL_ERROR_CODE) + } + } + } + + async fn handle_get_next_block_full( + &self, + req: &rpc::GetNextBlockFull, + ) -> overlay::Response { + let block_handle_storage = self.storage().block_handle_storage(); + let block_connection_storage = self.storage().block_connection_storage(); + let block_storage = self.storage().block_storage(); + + let get_next_block_full = async { + let next_block_id = match block_handle_storage.load_handle(&req.prev_block_id)? { + Some(handle) if handle.meta().has_next1() => block_connection_storage + .load_connection(&req.prev_block_id, BlockConnection::Next1)?, + _ => return Ok(BlockFull::Empty), + }; + + let mut is_link = false; + let block = match block_handle_storage.load_handle(&next_block_id)? { + Some(handle) + if handle.meta().has_data() && handle.has_proof_or_link(&mut is_link) => + { + let block = block_storage.load_block_data_raw(&handle).await?; + let proof = block_storage.load_block_proof_raw(&handle, is_link).await?; + + BlockFull::Found { + block_id: next_block_id, + proof: proof.into(), + block: block.into(), + is_link, + } + } + _ => BlockFull::Empty, + }; + + Ok::<_, anyhow::Error>(block) + }; + + match get_next_block_full.await { + Ok(block_full) => overlay::Response::Ok(block_full), + Err(e) => { + tracing::warn!("get_next_block_full failed: {e:?}"); + overlay::Response::Err(INTERNAL_ERROR_CODE) + } + } + } + + async fn handle_get_persistent_state_part( + &self, + req: &rpc::GetPersistentStatePart, + ) -> overlay::Response { + const PART_MAX_SIZE: u64 = 1 << 21; + + let persistent_state_storage = self.storage().persistent_state_storage(); + + let persistent_state_request_validation = || { + anyhow::ensure!( + self.config.serve_persistent_states, + "persistent states are disabled" + ); + anyhow::ensure!(req.max_size <= PART_MAX_SIZE, "too large max_size"); + Ok::<_, anyhow::Error>(()) + }; + + if let Err(e) = persistent_state_request_validation() { + tracing::warn!("persistent_state_request_validation failed: {e:?}"); + return overlay::Response::Err(INTERNAL_ERROR_CODE); + } + + if !persistent_state_storage.state_exists(&req.mc_block_id, &req.block_id) { + return overlay::Response::Ok(PersistentStatePart::NotFound); + } + + match persistent_state_storage + .read_state_part(&req.mc_block_id, &req.block_id, req.offset, req.max_size) + .await + { + Some(data) => overlay::Response::Ok(PersistentStatePart::Found { data }), + None => overlay::Response::Ok(PersistentStatePart::NotFound), + } + } + + async fn handle_get_archive_info( + &self, + req: &rpc::GetArchiveInfo, + ) -> overlay::Response { + let mc_seqno = req.mc_seqno; + let node_state = self.storage.node_state(); + + let get_archive_id = || { + let last_applied_mc_block = node_state.load_last_mc_block_id()?; + let shards_client_mc_block_id = node_state.load_shards_client_mc_block_id()?; + Ok::<_, anyhow::Error>((last_applied_mc_block, shards_client_mc_block_id)) + }; + + match get_archive_id() { + Ok((last_applied_mc_block, shards_client_mc_block_id)) => { + if mc_seqno > last_applied_mc_block.seqno { + return overlay::Response::Ok(ArchiveInfo::NotFound); + } + + if mc_seqno > shards_client_mc_block_id.seqno { + return overlay::Response::Ok(ArchiveInfo::NotFound); + } + + let block_storage = self.storage().block_storage(); + + overlay::Response::Ok(match block_storage.get_archive_id(mc_seqno) { + Some(id) => ArchiveInfo::Found { id: id as u64 }, + None => ArchiveInfo::NotFound, + }) + } + Err(e) => { + tracing::warn!("get_archive_id failed: {e:?}"); + overlay::Response::Err(INTERNAL_ERROR_CODE) + } + } + } + + async fn handle_get_archive_slice( + &self, + req: &rpc::GetArchiveSlice, + ) -> overlay::Response { + let block_storage = self.storage.block_storage(); + + let get_archive_slice = || { + let Some(archive_slice) = block_storage.get_archive_slice( + req.archive_id as u32, + req.offset as usize, + req.max_size as usize, + )? + else { + anyhow::bail!("archive not found"); + }; + + Ok::<_, anyhow::Error>(archive_slice) + }; + + match get_archive_slice() { + Ok(data) => overlay::Response::Ok(Data { data: data.into() }), + Err(e) => { + tracing::warn!("get_archive_slice failed: {e:?}"); + overlay::Response::Err(INTERNAL_ERROR_CODE) + } + } + } +} diff --git a/core/src/lib.rs b/core/src/lib.rs index d680d4a13..81040b37d 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -1,6 +1,5 @@ pub mod block_strider; -pub mod blockchain_client; +pub mod blockchain_rpc; pub mod internal_queue; pub mod overlay_client; -pub mod overlay_server; pub mod proto; diff --git a/core/src/overlay_server/mod.rs b/core/src/overlay_server/mod.rs deleted file mode 100644 index 08424d860..000000000 --- a/core/src/overlay_server/mod.rs +++ /dev/null @@ -1,424 +0,0 @@ -use std::sync::Arc; - -use bytes::Buf; -use tycho_network::{Response, Service, ServiceRequest}; -use tycho_storage::{BlockConnection, KeyBlocksDirection, Storage}; -use tycho_util::futures::BoxFutureOrNoop; - -use crate::proto; - -pub struct OverlayServer(Arc); - -impl OverlayServer { - pub fn new(storage: Arc, support_persistent_states: bool) -> Self { - Self(Arc::new(OverlayServerInner { - storage, - support_persistent_states, - })) - } -} - -impl Service for OverlayServer { - type QueryResponse = Response; - type OnQueryFuture = BoxFutureOrNoop>; - type OnMessageFuture = futures_util::future::Ready<()>; - type OnDatagramFuture = futures_util::future::Ready<()>; - - #[tracing::instrument( - level = "debug", - name = "on_overlay_server_query", - skip_all, - fields(peer_id = %req.metadata.peer_id, addr = %req.metadata.remote_address) - )] - fn on_query(&self, req: ServiceRequest) -> Self::OnQueryFuture { - let (constructor, body) = match self.0.try_handle_prefix(&req) { - Ok(rest) => rest, - Err(e) => { - tracing::debug!("failed to deserialize query: {e}"); - return BoxFutureOrNoop::Noop; - } - }; - - tycho_network::match_tl_request!(body, tag = constructor, { - proto::overlay::rpc::GetNextKeyBlockIds as req => { - BoxFutureOrNoop::future({ - tracing::debug!(blockId = %req.block, max_size = req.max_size, "getNextKeyBlockIds"); - - let inner = self.0.clone(); - - async move { - let res = inner.handle_get_next_key_block_ids(req); - Some(Response::from_tl(res)) - } - }) - }, - proto::overlay::rpc::GetBlockFull as req => { - BoxFutureOrNoop::future({ - tracing::debug!(blockId = %req.block, "getBlockFull"); - - let inner = self.0.clone(); - - async move { - let res = inner.handle_get_block_full(req).await; - Some(Response::from_tl(res)) - } - }) - }, - proto::overlay::rpc::GetNextBlockFull as req => { - BoxFutureOrNoop::future({ - tracing::debug!(prevBlockId = %req.prev_block, "getNextBlockFull"); - - let inner = self.0.clone(); - - async move { - let res = inner.handle_get_next_block_full(req).await; - Some(Response::from_tl(res)) - } - }) - }, - proto::overlay::rpc::GetPersistentStatePart as req => { - BoxFutureOrNoop::future({ - tracing::debug!( - block = %req.block, - mc_block = %req.mc_block, - offset = %req.offset, - max_size = %req.max_size, - "пetPersistentStatePart" - ); - - let inner = self.0.clone(); - - async move { - let res = inner.handle_get_persistent_state_part(req).await; - Some(Response::from_tl(res)) - } - }) - }, - proto::overlay::rpc::GetArchiveInfo as req => { - BoxFutureOrNoop::future({ - tracing::debug!(mc_seqno = %req.mc_seqno, "getArchiveInfo"); - - let inner = self.0.clone(); - - async move { - let res = inner.handle_get_archive_info(req).await; - Some(Response::from_tl(res)) - } - }) - }, - proto::overlay::rpc::GetArchiveSlice as req => { - BoxFutureOrNoop::future({ - tracing::debug!( - archive_id = %req.archive_id, - offset = %req.offset, - max_size = %req.max_size, - "getArchiveSlice" - ); - - let inner = self.0.clone(); - - async move { - let res = inner.handle_get_archive_slice(req).await; - Some(Response::from_tl(res)) - } - }) - }, - }, e => { - tracing::debug!("failed to deserialize query: {e}"); - BoxFutureOrNoop::Noop - }) - } - - #[inline] - fn on_message(&self, _req: ServiceRequest) -> Self::OnMessageFuture { - futures_util::future::ready(()) - } - - #[inline] - fn on_datagram(&self, _req: ServiceRequest) -> Self::OnDatagramFuture { - futures_util::future::ready(()) - } -} - -struct OverlayServerInner { - storage: Arc, - support_persistent_states: bool, -} - -impl OverlayServerInner { - fn storage(&self) -> &Storage { - self.storage.as_ref() - } - - fn supports_persistent_state_handling(&self) -> bool { - self.support_persistent_states - } - - fn try_handle_prefix<'a>(&self, req: &'a ServiceRequest) -> anyhow::Result<(u32, &'a [u8])> { - let body = req.as_ref(); - anyhow::ensure!(body.len() >= 4, tl_proto::TlError::UnexpectedEof); - - let constructor = std::convert::identity(body).get_u32_le(); - - Ok((constructor, body)) - } - - fn handle_get_next_key_block_ids( - &self, - req: proto::overlay::rpc::GetNextKeyBlockIds, - ) -> proto::overlay::Response { - const NEXT_KEY_BLOCKS_LIMIT: usize = 8; - - let block_handle_storage = self.storage().block_handle_storage(); - - let limit = std::cmp::min(req.max_size as usize, NEXT_KEY_BLOCKS_LIMIT); - - let get_next_key_block_ids = || { - let start_block_id = &req.block; - if !start_block_id.shard.is_masterchain() { - return Err(OverlayServerError::BlockNotFromMasterChain.into()); - } - - let mut iterator = block_handle_storage - .key_blocks_iterator(KeyBlocksDirection::ForwardFrom(start_block_id.seqno)) - .take(limit) - .peekable(); - - if let Some(Ok(id)) = iterator.peek() { - if id.root_hash != start_block_id.root_hash { - return Err(OverlayServerError::InvalidRootHash.into()); - } - if id.file_hash != start_block_id.file_hash { - return Err(OverlayServerError::InvalidFileHash.into()); - } - } - - let mut ids = Vec::with_capacity(limit); - while let Some(id) = iterator.next().transpose()? { - ids.push(id); - if ids.len() >= limit { - break; - } - } - - Ok::<_, anyhow::Error>(ids) - }; - - match get_next_key_block_ids() { - Ok(ids) => { - let incomplete = ids.len() < limit; - proto::overlay::Response::Ok(proto::overlay::KeyBlockIds { - blocks: ids, - incomplete, - }) - } - Err(e) => { - tracing::warn!("get_next_key_block_ids failed: {e:?}"); - proto::overlay::Response::Err(DEFAULT_ERROR_CODE) - } - } - } - - async fn handle_get_block_full( - &self, - req: proto::overlay::rpc::GetBlockFull, - ) -> proto::overlay::Response { - let block_handle_storage = self.storage().block_handle_storage(); - let block_storage = self.storage().block_storage(); - - let get_block_full = || async { - let mut is_link = false; - let block = match block_handle_storage.load_handle(&req.block)? { - Some(handle) - if handle.meta().has_data() && handle.has_proof_or_link(&mut is_link) => - { - let block = block_storage.load_block_data_raw(&handle).await?; - let proof = block_storage.load_block_proof_raw(&handle, is_link).await?; - - proto::overlay::BlockFull::Found { - block_id: req.block, - proof: proof.into(), - block: block.into(), - is_link, - } - } - _ => proto::overlay::BlockFull::Empty, - }; - - Ok::<_, anyhow::Error>(block) - }; - - match get_block_full().await { - Ok(block_full) => proto::overlay::Response::Ok(block_full), - Err(e) => { - tracing::warn!("get_block_full failed: {e:?}"); - proto::overlay::Response::Err(DEFAULT_ERROR_CODE) - } - } - } - - async fn handle_get_next_block_full( - &self, - req: proto::overlay::rpc::GetNextBlockFull, - ) -> proto::overlay::Response { - let block_handle_storage = self.storage().block_handle_storage(); - let block_connection_storage = self.storage().block_connection_storage(); - let block_storage = self.storage().block_storage(); - - let get_next_block_full = || async { - let next_block_id = match block_handle_storage.load_handle(&req.prev_block)? { - Some(handle) if handle.meta().has_next1() => block_connection_storage - .load_connection(&req.prev_block, BlockConnection::Next1)?, - _ => return Ok(proto::overlay::BlockFull::Empty), - }; - - let mut is_link = false; - let block = match block_handle_storage.load_handle(&next_block_id)? { - Some(handle) - if handle.meta().has_data() && handle.has_proof_or_link(&mut is_link) => - { - let block = block_storage.load_block_data_raw(&handle).await?; - let proof = block_storage.load_block_proof_raw(&handle, is_link).await?; - - proto::overlay::BlockFull::Found { - block_id: next_block_id, - proof: proof.into(), - block: block.into(), - is_link, - } - } - _ => proto::overlay::BlockFull::Empty, - }; - - Ok::<_, anyhow::Error>(block) - }; - - match get_next_block_full().await { - Ok(block_full) => proto::overlay::Response::Ok(block_full), - Err(e) => { - tracing::warn!("get_next_block_full failed: {e:?}"); - proto::overlay::Response::Err(DEFAULT_ERROR_CODE) - } - } - } - - async fn handle_get_persistent_state_part( - &self, - req: proto::overlay::rpc::GetPersistentStatePart, - ) -> proto::overlay::Response { - const PART_MAX_SIZE: u64 = 1 << 21; - - let persistent_state_request_validation = || { - anyhow::ensure!( - self.supports_persistent_state_handling(), - "Get persistent state not supported" - ); - - anyhow::ensure!(req.max_size <= PART_MAX_SIZE, "Unsupported max size"); - - Ok::<_, anyhow::Error>(()) - }; - - if let Err(e) = persistent_state_request_validation() { - tracing::warn!("persistent_state_request_validation failed: {e:?}"); - return proto::overlay::Response::Err(DEFAULT_ERROR_CODE); - } - - let persistent_state_storage = self.storage().persistent_state_storage(); - if !persistent_state_storage.state_exists(&req.mc_block, &req.block) { - return proto::overlay::Response::Ok(proto::overlay::PersistentStatePart::NotFound); - } - - let persistent_state_storage = self.storage.persistent_state_storage(); - match persistent_state_storage - .read_state_part(&req.mc_block, &req.block, req.offset, req.max_size) - .await - { - Some(data) => { - proto::overlay::Response::Ok(proto::overlay::PersistentStatePart::Found { data }) - } - None => proto::overlay::Response::Ok(proto::overlay::PersistentStatePart::NotFound), - } - } - - async fn handle_get_archive_info( - &self, - req: proto::overlay::rpc::GetArchiveInfo, - ) -> proto::overlay::Response { - let mc_seqno = req.mc_seqno; - let node_state = self.storage.node_state(); - - let get_archive_id = || { - let last_applied_mc_block = node_state.load_last_mc_block_id()?; - let shards_client_mc_block_id = node_state.load_shards_client_mc_block_id()?; - - Ok::<_, anyhow::Error>((last_applied_mc_block, shards_client_mc_block_id)) - }; - - match get_archive_id() { - Ok((last_applied_mc_block, shards_client_mc_block_id)) => { - if mc_seqno > last_applied_mc_block.seqno { - return proto::overlay::Response::Ok(proto::overlay::ArchiveInfo::NotFound); - } - - if mc_seqno > shards_client_mc_block_id.seqno { - return proto::overlay::Response::Ok(proto::overlay::ArchiveInfo::NotFound); - } - - let block_storage = self.storage().block_storage(); - - let res = match block_storage.get_archive_id(mc_seqno) { - Some(id) => proto::overlay::ArchiveInfo::Found { id: id as u64 }, - None => proto::overlay::ArchiveInfo::NotFound, - }; - - proto::overlay::Response::Ok(res) - } - Err(e) => { - tracing::warn!("get_archive_id failed: {e:?}"); - proto::overlay::Response::Err(DEFAULT_ERROR_CODE) - } - } - } - - async fn handle_get_archive_slice( - &self, - req: proto::overlay::rpc::GetArchiveSlice, - ) -> proto::overlay::Response { - let block_storage = self.storage.block_storage(); - - let get_archive_slice = || { - let archive_slice = block_storage - .get_archive_slice( - req.archive_id as u32, - req.offset as usize, - req.max_size as usize, - )? - .ok_or(OverlayServerError::ArchiveNotFound)?; - - Ok::<_, anyhow::Error>(archive_slice) - }; - - match get_archive_slice() { - Ok(data) => proto::overlay::Response::Ok(proto::overlay::Data { data: data.into() }), - Err(e) => { - tracing::warn!("get_archive_slice failed: {e:?}"); - proto::overlay::Response::Err(DEFAULT_ERROR_CODE) - } - } - } -} - -pub const DEFAULT_ERROR_CODE: u32 = 10; - -#[derive(Debug, thiserror::Error)] -enum OverlayServerError { - #[error("Block is not from masterchain")] - BlockNotFromMasterChain, - #[error("Invalid root hash")] - InvalidRootHash, - #[error("Invalid file hash")] - InvalidFileHash, - #[error("Archive not found")] - ArchiveNotFound, -} diff --git a/core/src/proto.tl b/core/src/proto.tl index d5963ff62..082db0c07 100644 --- a/core/src/proto.tl +++ b/core/src/proto.tl @@ -3,124 +3,149 @@ ---types--- -/** -* Public overlay ping model -* @param value unix timestamp in millis when ping was sent -*/ -overlay.ping - = overlay.Ping; - -/** -* Public overlay pong model. Sending pong back to sender should follow receiving ping model -* @param value unix timestamp in millis when ping was sent -*/ -overlay.pong - = overlay.Pong; +overlay.ping = overlay.Ping; +overlay.pong = overlay.Pong; /** * A successful response for the overlay query * * @param value an existing value */ -publicOverlay.response.ok value:T = publicOverlay.Response T; +overlay.response.ok value:T = overlay.Response T; /** * An unsuccessul response for the overlay query */ -publicOverlay.response.error error:bytes = publicOverlay.Response T; +overlay.response.err code:int = overlay.Response T; + +// Blockchain public overlay +//////////////////////////////////////////////////////////////////////////////// + +---types--- + +/** +* A full block id +*/ +blockchain.blockId + workchain:int + shard:long + seqno:int + root_hash:int256 + file_hash:int256 + = blockchain.BlockId; /** -* A response for the `publicOverlay.getNextKeyBlockIds` query -* @param blocks list of key blocks +* A response for the `getNextKeyBlockIds` query +* +* @param block_ids list of key block ids * @param incomplete flag points to finishinig query */ -publicOverlay.keyBlockIds blocks:(vector publicOverlay.blockId) incomplete:Bool = publicOverlay.KeyBlockIds; +blockchain.keyBlockIds block_ids:(vector blockchain.blockId) incomplete:Bool = blockchain.KeyBlockIds; /** * A response for getting full block info -* @param id block id +* +* @param block_id block id * @param proof block proof raw * @param block block data raw * @param is_link block proof link flag */ -publicOverlay.blockFull.found id:publicOverlay.blockId proof:bytes block:bytes is_link:Bool = publicOverlay.BlockFull; - +blockchain.blockFull.found block_id:blockchain.blockId proof:bytes block:bytes is_link:Bool = blockchain.BlockFull; /** * A response for getting empty block */ -publicOverlay.blockFull.empty = publicOverlay.BlockFull; +blockchain.blockFull.empty = blockchain.BlockFull; /** * An unsuccessul response for the 'getArchiveInfo' query */ -publicOverlay.archiveNotFound = publicOverlay.ArchiveInfo; +blockchain.archiveNotFound = blockchain.ArchiveInfo; /** * A successul response for the 'getArchiveInfo' query * * @param id archive id */ -publicOverlay.archiveInfo id:long = publicOverlay.ArchiveInfo; +blockchain.archiveInfo id:long = blockchain.ArchiveInfo; /** * An unsuccessul response for the 'getPersistentStatePart' query */ -publicOverlay.persistentStatePart.notFound = publicOverlay.PersistentStatePart; +blockchain.persistentStatePart.notFound = blockchain.PersistentStatePart; /** * A successul response for the 'getPersistentStatePart' query * * @param data persistent state part */ -publicOverlay.persistentStatePart.found data:bytes = publicOverlay.PersistentStatePart; +blockchain.persistentStatePart.found data:bytes = blockchain.PersistentStatePart; /** * Raw data bytes */ -publicOverlay.data data:bytes = publicOverlay.Data; +blockchain.data data:bytes = blockchain.Data; ---functions--- /** * Get list of next key block ids. * -* @param block block to start with +* @param block_id first key block id +* @param count max number of items in the response */ -publicOverlay.getNextKeyBlockIds block:publicOverlay.blockId max_size:int = publicOverlay.Response publicOverlay.KeyBlockIds; +blockchain.getNextKeyBlockIds + block_id:blockchain.blockId + count:int + = overlay.Response blockchain.KeyBlockIds; /** * Get full block info * -* @param block block id to get +* @param block_id target block id */ -publicOverlay.getBlockFull block:publicOverlay.blockId = publicOverlay.Response publicOverlay.blockFull; +blockchain.getBlockFull + block_id:blockchain.blockId + = overlay.Response blockchain.blockFull; /** * Get next full block info * -* @param prev_block previous block id +* @param prev_block_id previous block id */ -publicOverlay.getNextBlockFull prev_block:publicOverlay.blockId = publicOverlay.Response publicOverlay.blockFull; +blockchain.getNextBlockFull + prev_block_id:blockchain.blockId + = overlay.Response blockchain.blockFull; /** * Get archive info * -* @param mac_seqno masterchain sequence number +* @param mc_seqno masterchain block seqno */ -publicOverlay.getArchiveInfo mc_seqno:int = publicOverlay.ArchiveInfo; +blockchain.getArchiveInfo + mc_seqno:int + = overlay.Response blockchain.ArchiveInfo; /** * Get archive slice * -* @param archive_id -* @param offset -* @param max_size +* @param archive_id archive id (masterchain seqno) +* @param offset part offset in bytes +* @param max_size max response size in bytes */ -publicOverlay.getArchiveSlice archive_id:long offset:long max_size:int = publicOverlay.Data; +blockchain.getArchiveSlice + archive_id:long + offset:long + max_size:int + = overlay.Response blockchain.Data; /** * Get persisten state part * -* @param block -* @param masterchain_block -* @param offset -* @param max_size -*/ -publicOverlay.getPersistentStatePart block:publicOverlay.blockId mc_block:publicOverlay.blockId offset:long max_size:long = publicOverlay.PersistentStatePart; +* @param block_id requested block id +* @param mc_block_id reference masterchain block id +* @param offset part offset in bytes +* @param max_size max response size in bytes +*/ +blockchain.getPersistentStatePart + block_id:blockchain.blockId + mc_block_id:blockchain.blockId + offset:long + max_size:long + = overlay.Response blockchain.PersistentStatePart; diff --git a/core/src/proto/blockchain.rs b/core/src/proto/blockchain.rs new file mode 100644 index 000000000..a75713638 --- /dev/null +++ b/core/src/proto/blockchain.rs @@ -0,0 +1,113 @@ +use bytes::Bytes; +use tl_proto::{TlRead, TlWrite}; + +use crate::proto::{tl_block_id, tl_block_id_vec}; + +#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, id = "blockchain.data", scheme = "proto.tl")] +pub struct Data { + pub data: Bytes, +} + +#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, id = "blockchain.keyBlockIds", scheme = "proto.tl")] +pub struct KeyBlockIds { + #[tl(with = "tl_block_id_vec")] + pub block_ids: Vec, + pub incomplete: bool, +} + +#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, scheme = "proto.tl")] +pub enum BlockFull { + #[tl(id = "blockchain.blockFull.found")] + Found { + #[tl(with = "tl_block_id")] + block_id: everscale_types::models::BlockId, + proof: Bytes, + block: Bytes, + is_link: bool, + }, + #[tl(id = "blockchain.blockFull.empty")] + Empty, +} + +#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, scheme = "proto.tl")] +pub enum PersistentStatePart { + #[tl(id = "blockchain.persistentStatePart.found")] + Found { data: Bytes }, + #[tl(id = "blockchain.persistentStatePart.notFound")] + NotFound, +} + +#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] +#[tl(boxed, scheme = "proto.tl")] +pub enum ArchiveInfo { + #[tl(id = "blockchain.archiveInfo", size_hint = 8)] + Found { id: u64 }, + #[tl(id = "blockchain.archiveNotFound")] + NotFound, +} + +/// Blockchain RPC models. +pub mod rpc { + use super::*; + + #[derive(Clone, TlRead, TlWrite)] + #[tl(boxed, id = "blockchain.getNextKeyBlockIds", scheme = "proto.tl")] + pub struct GetNextKeyBlockIds { + #[tl(with = "tl_block_id")] + pub block_id: everscale_types::models::BlockId, + pub max_size: u32, + } + + #[derive(Clone, TlRead, TlWrite)] + #[tl(boxed, id = "blockchain.getBlockFull", scheme = "proto.tl")] + pub struct GetBlockFull { + #[tl(with = "tl_block_id")] + pub block_id: everscale_types::models::BlockId, + } + + #[derive(Clone, TlRead, TlWrite)] + #[tl(boxed, id = "blockchain.getNextBlockFull", scheme = "proto.tl")] + pub struct GetNextBlockFull { + #[tl(with = "tl_block_id")] + pub prev_block_id: everscale_types::models::BlockId, + } + + #[derive(Clone, TlRead, TlWrite)] + #[tl(boxed, id = "blockchain.getPersistentStatePart", scheme = "proto.tl")] + pub struct GetPersistentStatePart { + #[tl(with = "tl_block_id")] + pub block_id: everscale_types::models::BlockId, + #[tl(with = "tl_block_id")] + pub mc_block_id: everscale_types::models::BlockId, + pub offset: u64, + pub max_size: u64, + } + + #[derive(Clone, TlRead, TlWrite)] + #[tl( + boxed, + id = "blockchain.getArchiveInfo", + size_hint = 4, + scheme = "proto.tl" + )] + pub struct GetArchiveInfo { + pub mc_seqno: u32, + } + + #[derive(Clone, TlRead, TlWrite)] + #[tl( + boxed, + id = "blockchain.getArchiveSlice", + size_hint = 20, + scheme = "proto.tl" + )] + pub struct GetArchiveSlice { + pub archive_id: u64, + pub offset: u64, + pub max_size: u32, + } +} diff --git a/core/src/proto/mod.rs b/core/src/proto/mod.rs index 2b55280ec..379a38ba1 100644 --- a/core/src/proto/mod.rs +++ b/core/src/proto/mod.rs @@ -1 +1,77 @@ +pub mod blockchain; pub mod overlay; + +mod tl_block_id { + use everscale_types::models::{BlockId, ShardIdent}; + use everscale_types::prelude::HashBytes; + use tl_proto::{TlPacket, TlRead, TlResult, TlWrite}; + + pub const SIZE_HINT: usize = 80; + + pub const fn size_hint(_: &BlockId) -> usize { + SIZE_HINT + } + + pub fn write(block_id: &BlockId, packet: &mut P) { + block_id.shard.workchain().write_to(packet); + block_id.shard.prefix().write_to(packet); + block_id.seqno.write_to(packet); + block_id.root_hash.0.write_to(packet); + block_id.file_hash.0.write_to(packet); + } + + pub fn read(packet: &[u8], offset: &mut usize) -> TlResult { + let workchain = i32::read_from(packet, offset)?; + let prefix = u64::read_from(packet, offset)?; + let seqno = u32::read_from(packet, offset)?; + + let shard = ShardIdent::new(workchain, prefix); + + let shard = match shard { + None => return Err(tl_proto::TlError::InvalidData), + Some(shard) => shard, + }; + + let root_hash = HashBytes(<[u8; 32]>::read_from(packet, offset)?); + let file_hash = HashBytes(<[u8; 32]>::read_from(packet, offset)?); + + Ok(BlockId { + shard, + seqno, + root_hash, + file_hash, + }) + } +} + +mod tl_block_id_vec { + + use everscale_types::models::BlockId; + use tl_proto::{TlError, TlPacket, TlRead, TlResult}; + + use super::*; + + pub fn size_hint(ids: &[BlockId]) -> usize { + 4 + ids.len() * tl_block_id::SIZE_HINT + } + + pub fn write(blocks: &[BlockId], packet: &mut P) { + packet.write_u32(blocks.len() as u32); + for block in blocks { + tl_block_id::write(block, packet); + } + } + + pub fn read(packet: &[u8], offset: &mut usize) -> TlResult> { + let len = u32::read_from(packet, offset)?; + if *offset + len as usize * tl_block_id::SIZE_HINT > packet.len() { + return Err(TlError::UnexpectedEof); + } + + let mut ids = Vec::with_capacity(len as usize); + for _ in 0..len { + ids.push(tl_block_id::read(packet, offset)?); + } + Ok(ids) + } +} diff --git a/core/src/proto/overlay.rs b/core/src/proto/overlay.rs index 87e2140b8..a896691df 100644 --- a/core/src/proto/overlay.rs +++ b/core/src/proto/overlay.rs @@ -1,4 +1,3 @@ -use bytes::Bytes; use tl_proto::{TlError, TlPacket, TlRead, TlResult, TlWrite}; #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] @@ -17,8 +16,8 @@ pub enum Response { } impl Response { - const OK_ID: u32 = tl_proto::id!("publicOverlay.response.ok", scheme = "proto.tl"); - const ERR_ID: u32 = tl_proto::id!("publicOverlay.response.error", scheme = "proto.tl"); + const OK_ID: u32 = tl_proto::id!("overlay.response.ok", scheme = "proto.tl"); + const ERR_ID: u32 = tl_proto::id!("overlay.response.err", scheme = "proto.tl"); } impl TlWrite for Response @@ -68,190 +67,3 @@ where }) } } - -#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] -#[tl(boxed, id = "publicOverlay.data", scheme = "proto.tl")] -pub struct Data { - pub data: Bytes, -} - -#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] -#[tl(boxed, id = "publicOverlay.keyBlockIds", scheme = "proto.tl")] -pub struct KeyBlockIds { - #[tl(with = "tl_block_id_vec")] - pub blocks: Vec, - pub incomplete: bool, -} - -#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] -#[tl(boxed, scheme = "proto.tl")] -pub enum BlockFull { - #[tl(id = "publicOverlay.blockFull.found")] - Found { - #[tl(with = "tl_block_id")] - block_id: everscale_types::models::BlockId, - proof: Bytes, - block: Bytes, - is_link: bool, - }, - #[tl(id = "publicOverlay.blockFull.empty")] - Empty, -} - -#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] -#[tl(boxed, scheme = "proto.tl")] -pub enum PersistentStatePart { - #[tl(id = "publicOverlay.persistentStatePart.found")] - Found { data: Bytes }, - #[tl(id = "publicOverlay.persistentStatePart.notFound")] - NotFound, -} - -#[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] -#[tl(boxed, scheme = "proto.tl")] -pub enum ArchiveInfo { - #[tl(id = "publicOverlay.archiveInfo", size_hint = 8)] - Found { id: u64 }, - #[tl(id = "publicOverlay.archiveNotFound")] - NotFound, -} - -/// Overlay RPC models. -pub mod rpc { - use super::*; - - #[derive(Clone, TlRead, TlWrite)] - #[tl(boxed, id = "publicOverlay.getNextKeyBlockIds", scheme = "proto.tl")] - pub struct GetNextKeyBlockIds { - #[tl(with = "tl_block_id")] - pub block: everscale_types::models::BlockId, - pub max_size: u32, - } - - #[derive(Clone, TlRead, TlWrite)] - #[tl(boxed, id = "publicOverlay.getBlockFull", scheme = "proto.tl")] - pub struct GetBlockFull { - #[tl(with = "tl_block_id")] - pub block: everscale_types::models::BlockId, - } - - #[derive(Clone, TlRead, TlWrite)] - #[tl(boxed, id = "publicOverlay.getNextBlockFull", scheme = "proto.tl")] - pub struct GetNextBlockFull { - #[tl(with = "tl_block_id")] - pub prev_block: everscale_types::models::BlockId, - } - - #[derive(Clone, TlRead, TlWrite)] - #[tl( - boxed, - id = "publicOverlay.getPersistentStatePart", - scheme = "proto.tl" - )] - pub struct GetPersistentStatePart { - #[tl(with = "tl_block_id")] - pub block: everscale_types::models::BlockId, - #[tl(with = "tl_block_id")] - pub mc_block: everscale_types::models::BlockId, - pub offset: u64, - pub max_size: u64, - } - - #[derive(Clone, TlRead, TlWrite)] - #[tl( - boxed, - id = "publicOverlay.getArchiveInfo", - size_hint = 4, - scheme = "proto.tl" - )] - pub struct GetArchiveInfo { - pub mc_seqno: u32, - } - - #[derive(Clone, TlRead, TlWrite)] - #[tl( - boxed, - id = "publicOverlay.getArchiveSlice", - size_hint = 20, - scheme = "proto.tl" - )] - pub struct GetArchiveSlice { - pub archive_id: u64, - pub offset: u64, - pub max_size: u32, - } -} - -mod tl_block_id { - use everscale_types::models::{BlockId, ShardIdent}; - use everscale_types::prelude::HashBytes; - use tl_proto::{TlPacket, TlRead, TlResult, TlWrite}; - - pub const SIZE_HINT: usize = 80; - - pub const fn size_hint(_: &BlockId) -> usize { - SIZE_HINT - } - - pub fn write(block_id: &BlockId, packet: &mut P) { - block_id.shard.workchain().write_to(packet); - block_id.shard.prefix().write_to(packet); - block_id.seqno.write_to(packet); - block_id.root_hash.0.write_to(packet); - block_id.file_hash.0.write_to(packet); - } - - pub fn read(packet: &[u8], offset: &mut usize) -> TlResult { - let workchain = i32::read_from(packet, offset)?; - let prefix = u64::read_from(packet, offset)?; - let seqno = u32::read_from(packet, offset)?; - - let shard = ShardIdent::new(workchain, prefix); - - let shard = match shard { - None => return Err(tl_proto::TlError::InvalidData), - Some(shard) => shard, - }; - - let root_hash = HashBytes(<[u8; 32]>::read_from(packet, offset)?); - let file_hash = HashBytes(<[u8; 32]>::read_from(packet, offset)?); - - Ok(BlockId { - shard, - seqno, - root_hash, - file_hash, - }) - } -} - -mod tl_block_id_vec { - use everscale_types::models::BlockId; - use tl_proto::{TlError, TlPacket, TlRead, TlResult}; - - use crate::proto::overlay::tl_block_id; - - pub fn size_hint(ids: &[BlockId]) -> usize { - 4 + ids.len() * tl_block_id::SIZE_HINT - } - - pub fn write(blocks: &[BlockId], packet: &mut P) { - packet.write_u32(blocks.len() as u32); - for block in blocks { - tl_block_id::write(block, packet); - } - } - - pub fn read(packet: &[u8], offset: &mut usize) -> TlResult> { - let len = u32::read_from(packet, offset)?; - if *offset + len as usize * tl_block_id::SIZE_HINT > packet.len() { - return Err(TlError::UnexpectedEof); - } - - let mut ids = Vec::with_capacity(len as usize); - for _ in 0..len { - ids.push(tl_block_id::read(packet, offset)?); - } - Ok(ids) - } -} diff --git a/core/tests/block_strider.rs b/core/tests/block_strider.rs index 13b71fb3e..cb82fc617 100644 --- a/core/tests/block_strider.rs +++ b/core/tests/block_strider.rs @@ -4,7 +4,7 @@ use std::time::Duration; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; use tycho_core::block_strider::provider::BlockProvider; -use tycho_core::blockchain_client::BlockchainClient; +use tycho_core::blockchain_rpc::BlockchainRpcClient; use tycho_core::overlay_client::{PublicOverlayClient, PublicOverlayClientConfig}; use tycho_network::PeerId; @@ -110,7 +110,7 @@ async fn overlay_block_strider() -> anyhow::Result<()> { tracing::info!("making overlay requests..."); let node = nodes.first().unwrap(); - let client = BlockchainClient::new( + let client = BlockchainRpcClient::new( PublicOverlayClient::new( node.network().clone(), node.public_overlay().clone(), diff --git a/core/tests/common/node.rs b/core/tests/common/node.rs index 20b13ab49..93b994421 100644 --- a/core/tests/common/node.rs +++ b/core/tests/common/node.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use std::time::Duration; use everscale_crypto::ed25519; -use tycho_core::overlay_server::OverlayServer; +use tycho_core::blockchain_rpc::BlockchainRpcService; use tycho_network::{ DhtClient, DhtConfig, DhtService, Network, OverlayConfig, OverlayId, OverlayService, @@ -103,7 +103,7 @@ impl Node { } = NodeBase::with_random_key(); let public_overlay = PublicOverlay::builder(PUBLIC_OVERLAY_ID) .with_peer_resolver(peer_resolver) - .build(OverlayServer::new(storage, true)); + .build(BlockchainRpcService::new(storage, Default::default())); overlay_service.add_public_overlay(&public_overlay); let dht_client = dht_service.make_client(&network); diff --git a/core/tests/overlay_server.rs b/core/tests/overlay_server.rs index 0ecca5fa2..b8db2785f 100644 --- a/core/tests/overlay_server.rs +++ b/core/tests/overlay_server.rs @@ -5,10 +5,9 @@ use anyhow::Result; use everscale_types::models::BlockId; use futures_util::stream::FuturesUnordered; use futures_util::StreamExt; -use tycho_core::blockchain_client::BlockchainClient; +use tycho_core::blockchain_rpc::BlockchainRpcClient; use tycho_core::overlay_client::PublicOverlayClient; -use tycho_core::overlay_server::DEFAULT_ERROR_CODE; -use tycho_core::proto::overlay::{BlockFull, KeyBlockIds, PersistentStatePart}; +use tycho_core::proto::blockchain::{BlockFull, KeyBlockIds, PersistentStatePart}; use tycho_network::PeerId; use crate::common::archive::*; @@ -89,7 +88,7 @@ async fn overlay_server_with_empty_storage() -> Result<()> { let node = nodes.first().unwrap(); - let client = BlockchainClient::new( + let client = BlockchainRpcClient::new( PublicOverlayClient::new( node.network().clone(), node.public_overlay().clone(), @@ -117,7 +116,7 @@ async fn overlay_server_with_empty_storage() -> Result<()> { if let Ok(response) = &result { let ids = KeyBlockIds { - blocks: vec![], + block_ids: vec![], incomplete: true, }; assert_eq!(response.data(), &ids); @@ -135,23 +134,9 @@ async fn overlay_server_with_empty_storage() -> Result<()> { let result = client.get_archive_info(0).await; assert!(result.is_err()); - if let Err(e) = &result { - assert_eq!( - e.to_string(), - format!("Failed to get response: {DEFAULT_ERROR_CODE}") - ); - } - let result = client.get_archive_slice(0, 0, 100).await; assert!(result.is_err()); - if let Err(e) = &result { - assert_eq!( - e.to_string(), - format!("Failed to get response: {DEFAULT_ERROR_CODE}") - ); - } - tmp_dir.close()?; tracing::info!("done!"); @@ -232,7 +217,7 @@ async fn overlay_server_blocks() -> Result<()> { let node = nodes.first().unwrap(); - let client = BlockchainClient::new( + let client = BlockchainRpcClient::new( PublicOverlayClient::new( node.network().clone(), node.public_overlay().clone(),