diff --git a/core/Cargo.toml b/core/Cargo.toml index 514bc3f52..045fd2d36 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true anyhow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true, features = ["serde"] } +bytesize = { workspace = true } castaway = { workspace = true } everscale-types = { workspace = true } futures-util = { workspace = true } @@ -33,7 +34,6 @@ tycho-storage = { workspace = true } tycho-util = { workspace = true } [dev-dependencies] -bytesize = { workspace = true } everscale-crypto = { workspace = true } tempfile = { workspace = true } tracing-test = { workspace = true } diff --git a/core/src/blockchain_rpc/client.rs b/core/src/blockchain_rpc/client.rs index 6469fcc12..03f8df412 100644 --- a/core/src/blockchain_rpc/client.rs +++ b/core/src/blockchain_rpc/client.rs @@ -3,9 +3,12 @@ use std::sync::Arc; use anyhow::Result; use everscale_types::models::BlockId; use futures_util::stream::{FuturesUnordered, StreamExt}; +use tycho_block_util::state::ShardStateStuff; use tycho_network::{PublicOverlay, Request}; +use tycho_storage::Storage; +use tycho_util::futures::JoinTask; -use crate::overlay_client::{Error, PublicOverlayClient, QueryResponse}; +use crate::overlay_client::{Error, Neighbour, PublicOverlayClient, QueryResponse}; use crate::proto::blockchain::*; use crate::proto::overlay::BroadcastPrefix; @@ -127,15 +130,28 @@ impl BlockchainRpcClient { pub async fn get_archive_slice( &self, archive_id: u64, + limit: u32, offset: u64, - max_size: u32, ) -> Result, Error> { let client = &self.inner.overlay_client; let data = client .query::<_, Data>(&rpc::GetArchiveSlice { archive_id, + limit, offset, - max_size, + }) + .await?; + Ok(data) + } + + pub async fn get_persistent_state_info( + &self, + block_id: &BlockId, + ) -> Result, Error> { + let client = &self.inner.overlay_client; + let data = client + .query::<_, PersistentStateInfo>(&rpc::GetPersistentStateInfo { + block_id: *block_id, }) .await?; Ok(data) @@ -143,20 +159,143 @@ impl BlockchainRpcClient { pub async fn get_persistent_state_part( &self, - mc_block: &BlockId, - block: &BlockId, - limit: u64, + neighbour: &Neighbour, + block_id: &BlockId, + limit: u32, offset: u64, - ) -> Result, Error> { + ) -> Result, Error> { let client = &self.inner.overlay_client; let data = client - .query::<_, PersistentStatePart>(&rpc::GetPersistentStatePart { - block_id: *block, - mc_block_id: *mc_block, - offset, - max_size: limit, - }) + .query_raw::( + neighbour.clone(), + Request::from_tl(rpc::GetPersistentStatePart { + block_id: *block_id, + limit, + offset, + }), + ) .await?; Ok(data) } + + pub async fn download_and_store_state( + &self, + block_id: &BlockId, + storage: Storage, + ) -> Result { + const PARALLEL_REQUESTS: usize = 10; + const CHUNK_SIZE: u32 = 2 << 20; // 2 MB + const MAX_STATE_SIZE: u64 = 10 << 30; // 10 GB + + // TODO: Iterate through all known (or unknown) neighbours + const NEIGHBOUR_COUNT: usize = 10; + let neighbours = self + .overlay_client() + .neighbours() + .choose_multiple(NEIGHBOUR_COUNT) + .await; + + // Find a neighbour which has the requested state + let (neighbour, max_size) = 'info: { + let req = Request::from_tl(rpc::GetPersistentStateInfo { + block_id: *block_id, + }); + + let mut futures = FuturesUnordered::new(); + for neighbour in neighbours { + futures.push(self.overlay_client().query_raw(neighbour, req.clone())); + } + + let mut err = None; + while let Some(info) = futures.next().await { + let (handle, info) = match info { + Ok(res) => res.split(), + Err(e) => { + err = Some(e); + continue; + } + }; + + match info { + PersistentStateInfo::Found { size } if size <= MAX_STATE_SIZE => { + break 'info (handle.accept(), size) + } + PersistentStateInfo::Found { size } => { + let neighbour = handle.reject(); + tracing::warn!( + peer_id = %neighbour.peer_id(), + size, + "malicious neighbour has a too large state", + ); + return Err(Error::Internal(anyhow::anyhow!("malicious neighbour"))); + } + PersistentStateInfo::NotFound => continue, + } + } + + return match err { + None => Err(Error::Internal(anyhow::anyhow!( + "no neighbour has the requested state" + ))), + Some(err) => Err(err), + }; + }; + + // Download the state + let chunk_count = (max_size + CHUNK_SIZE as u64 - 1) / CHUNK_SIZE as u64; + let mut stream = + futures_util::stream::iter((0..chunk_count).map(|i| i * CHUNK_SIZE as u64)) + .map(|offset| { + let neighbour = neighbour.clone(); + let req = Request::from_tl(rpc::GetPersistentStatePart { + block_id: *block_id, + limit: CHUNK_SIZE, + offset, + }); + + let client = self.overlay_client().clone(); + JoinTask::new(async move { + // TODO: Retry on error + client.query_raw::(neighbour, req).await + }) + }) + .buffered(PARALLEL_REQUESTS); + + let mut store_state_op = storage + .shard_state_storage() + .begin_store_state_raw(block_id) + .map(Box::new) + .map_err(Error::Internal)?; + + // NOTE: Buffered items in stream will be polled because they are spawned as tasks + while let Some(response) = stream.next().await.transpose()? { + let (op, finished) = tokio::task::spawn_blocking(move || { + let (handle, part) = response.split(); + match store_state_op.process_part(part.data) { + Ok(finished) => Ok((store_state_op, finished)), + Err(e) => { + handle.reject(); + Err(e) + } + } + }) + .await + .map_err(|e| Error::Internal(e.into()))? + .map_err(Error::Internal)?; + + if !finished { + store_state_op = op; + continue; + } + + return tokio::task::spawn_blocking(move || op.finalize()) + .await + .map_err(|e| Error::Internal(e.into()))? + .map_err(Error::Internal); + } + + Err(Error::Internal(anyhow::anyhow!( + "downloaded incomplete state" + ))) + } } diff --git a/core/src/blockchain_rpc/mod.rs b/core/src/blockchain_rpc/mod.rs index 690f96bd7..3ac73cbd1 100644 --- a/core/src/blockchain_rpc/mod.rs +++ b/core/src/blockchain_rpc/mod.rs @@ -7,4 +7,6 @@ pub use self::service::{ mod client; mod service; -pub const INTERNAL_ERROR_CODE: u32 = 1; +pub const BAD_REQUEST_ERROR_CODE: u32 = 1; +pub const INTERNAL_ERROR_CODE: u32 = 2; +pub const NOT_FOUND_ERROR_CODE: u32 = 3; diff --git a/core/src/blockchain_rpc/service.rs b/core/src/blockchain_rpc/service.rs index b9ab84483..995361364 100644 --- a/core/src/blockchain_rpc/service.rs +++ b/core/src/blockchain_rpc/service.rs @@ -2,13 +2,14 @@ use std::sync::Arc; use anyhow::Context; use bytes::{Buf, Bytes}; +use bytesize::ByteSize; use futures_util::Future; use serde::{Deserialize, Serialize}; use tycho_network::{InboundRequestMeta, Response, Service, ServiceRequest}; use tycho_storage::{BlockConnection, KeyBlocksDirection, Storage}; use tycho_util::futures::BoxFutureOrNoop; -use crate::blockchain_rpc::INTERNAL_ERROR_CODE; +use crate::blockchain_rpc::{BAD_REQUEST_ERROR_CODE, INTERNAL_ERROR_CODE, NOT_FOUND_ERROR_CODE}; use crate::proto::blockchain::*; use crate::proto::overlay; @@ -196,12 +197,20 @@ impl Service for BlockchainRpcService { Some(Response::from_tl(res)) }) }, + rpc::GetPersistentStateInfo as req => { + tracing::debug!(block_id = %req.block_id, "getPersistentStateInfo"); + + let inner = self.inner.clone(); + BoxFutureOrNoop::future(async move { + let res = inner.handle_get_persistent_state_info(&req); + Some(Response::from_tl(res)) + }) + }, rpc::GetPersistentStatePart as req => { tracing::debug!( block_id = %req.block_id, - mc_block_id = %req.mc_block_id, + limit = %req.limit, offset = %req.offset, - max_size = %req.max_size, "getPersistentStatePart" ); @@ -223,8 +232,8 @@ impl Service for BlockchainRpcService { rpc::GetArchiveSlice as req => { tracing::debug!( archive_id = %req.archive_id, + limit = %req.limit, offset = %req.offset, - max_size = %req.max_size, "getArchiveSlice" ); @@ -439,44 +448,6 @@ impl Inner { } } - async fn handle_get_persistent_state_part( - &self, - req: &rpc::GetPersistentStatePart, - ) -> overlay::Response { - const PART_MAX_SIZE: u64 = 1 << 21; - - let persistent_state_storage = self.storage().persistent_state_storage(); - - let persistent_state_request_validation = || { - anyhow::ensure!( - self.config.serve_persistent_states, - "persistent states are disabled" - ); - anyhow::ensure!(req.max_size <= PART_MAX_SIZE, "too large max_size"); - Ok::<_, anyhow::Error>(()) - }; - - if let Err(e) = persistent_state_request_validation() { - tracing::warn!("persistent_state_request_validation failed: {e:?}"); - return overlay::Response::Err(INTERNAL_ERROR_CODE); - } - - if !persistent_state_storage.state_exists(&req.mc_block_id, &req.block_id) { - return overlay::Response::Ok(PersistentStatePart::NotFound); - } - - match persistent_state_storage - .read_state_part(&req.mc_block_id, &req.block_id, req.offset, req.max_size) - .await - { - Ok(data) => overlay::Response::Ok(PersistentStatePart::Found { data }), - Err(e) => { - tracing::debug!("failed to read persistent state part: {e}"); - overlay::Response::Ok(PersistentStatePart::NotFound) - } - } - } - async fn handle_get_archive_info( &self, req: &rpc::GetArchiveInfo, @@ -514,7 +485,7 @@ impl Inner { let Some(archive_slice) = block_storage.get_archive_slice( req.archive_id as u32, req.offset as usize, - req.max_size as usize, + req.limit as usize, )? else { anyhow::bail!("archive not found"); @@ -531,6 +502,60 @@ impl Inner { } } } + + fn handle_get_persistent_state_info( + &self, + req: &rpc::GetPersistentStateInfo, + ) -> overlay::Response { + let persistent_state_storage = self.storage().persistent_state_storage(); + + let res = 'res: { + if self.config.serve_persistent_states { + if let Some(info) = persistent_state_storage.get_state_info(&req.block_id) { + break 'res PersistentStateInfo::Found { + size: info.size as u64, + }; + } + } + PersistentStateInfo::NotFound + }; + + overlay::Response::Ok(res) + } + + async fn handle_get_persistent_state_part( + &self, + req: &rpc::GetPersistentStatePart, + ) -> overlay::Response { + const PART_MAX_SIZE: u64 = ByteSize::mib(2).as_u64(); + + let persistent_state_storage = self.storage().persistent_state_storage(); + + let persistent_state_request_validation = || { + anyhow::ensure!( + self.config.serve_persistent_states, + "persistent states are disabled" + ); + anyhow::ensure!(req.limit as u64 <= PART_MAX_SIZE, "too large max_size"); + Ok::<_, anyhow::Error>(()) + }; + + if let Err(e) = persistent_state_request_validation() { + tracing::debug!("persistent state request validation failed: {e:?}"); + return overlay::Response::Err(BAD_REQUEST_ERROR_CODE); + } + + match persistent_state_storage + .read_state_part(&req.block_id, req.limit, req.offset) + .await + { + Some(data) => overlay::Response::Ok(Data { data: data.into() }), + None => { + tracing::debug!("failed to read persistent state part"); + overlay::Response::Err(NOT_FOUND_ERROR_CODE) + } + } + } } fn try_handle_prefix(req: &ServiceRequest) -> Result<(u32, &[u8]), tl_proto::TlError> { diff --git a/core/src/lib.rs b/core/src/lib.rs index 18b23735c..ae9166cfd 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -3,4 +3,3 @@ pub mod blockchain_rpc; pub mod global_config; pub mod overlay_client; pub mod proto; -pub mod sync; diff --git a/core/src/overlay_client/mod.rs b/core/src/overlay_client/mod.rs index 84a179f4f..e8b188b15 100644 --- a/core/src/overlay_client/mod.rs +++ b/core/src/overlay_client/mod.rs @@ -99,6 +99,18 @@ impl PublicOverlayClient { { self.inner.query(data).await } + + #[inline] + pub async fn query_raw( + &self, + neighbour: Neighbour, + req: Request, + ) -> Result, Error> + where + for<'a> A: tl_proto::TlRead<'a, Repr = tl_proto::Boxed>, + { + self.inner.query_impl(neighbour, req).await?.parse() + } } #[derive(thiserror::Error, Debug)] @@ -111,6 +123,8 @@ pub enum Error { InvalidResponse(#[source] tl_proto::TlError), #[error("request failed with code: {0}")] RequestFailed(u32), + #[error("internal error: {0}")] + Internal(#[source] anyhow::Error), } struct Inner { @@ -240,27 +254,9 @@ impl Inner { return Err(Error::NoNeighbours); }; - let res = self.query_impl(neighbour, Request::from_tl(data)).await?; - - let response = match tl_proto::deserialize::>(&res.data) { - Ok(r) => r, - Err(e) => { - res.reject(); - return Err(Error::InvalidResponse(e)); - } - }; - - match response { - overlay::Response::Ok(data) => Ok(QueryResponse { - data, - roundtrip_ms: res.roundtrip_ms, - neighbour: res.neighbour, - }), - overlay::Response::Err(code) => { - res.reject(); - Err(Error::RequestFailed(code)) - } - } + self.query_impl(neighbour, Request::from_tl(data)) + .await? + .parse() } async fn send_impl(&self, neighbour: Neighbour, req: Request) -> Result<(), Error> { @@ -332,6 +328,14 @@ impl QueryResponse { &self.data } + pub fn split(self) -> (QueryResponseHandle, A) { + let handle = QueryResponseHandle { + neighbour: self.neighbour, + roundtrip_ms: self.roundtrip_ms, + }; + (handle, self.data) + } + pub fn accept(self) -> (Neighbour, A) { self.track_request(true); (self.neighbour, self.data) @@ -347,3 +351,52 @@ impl QueryResponse { .track_request(&Duration::from_millis(self.roundtrip_ms), success); } } + +impl QueryResponse { + pub fn parse(self) -> Result, Error> + where + for<'a> A: tl_proto::TlRead<'a, Repr = tl_proto::Boxed>, + { + let response = match tl_proto::deserialize::>(&self.data) { + Ok(r) => r, + Err(e) => { + self.reject(); + return Err(Error::InvalidResponse(e)); + } + }; + + match response { + overlay::Response::Ok(data) => Ok(QueryResponse { + data, + roundtrip_ms: self.roundtrip_ms, + neighbour: self.neighbour, + }), + overlay::Response::Err(code) => { + self.reject(); + Err(Error::RequestFailed(code)) + } + } + } +} + +pub struct QueryResponseHandle { + neighbour: Neighbour, + roundtrip_ms: u64, +} + +impl QueryResponseHandle { + pub fn accept(self) -> Neighbour { + self.track_request(true); + self.neighbour + } + + pub fn reject(self) -> Neighbour { + self.track_request(false); + self.neighbour + } + + fn track_request(&self, success: bool) { + self.neighbour + .track_request(&Duration::from_millis(self.roundtrip_ms), success); + } +} diff --git a/core/src/proto.tl b/core/src/proto.tl index b90f0e53b..acc2fe1cc 100644 --- a/core/src/proto.tl +++ b/core/src/proto.tl @@ -68,27 +68,27 @@ blockchain.blockFull.found block_id:blockchain.blockId proof:bytes block:bytes i */ blockchain.blockFull.empty = blockchain.BlockFull; -/** -* An unsuccessul response for the 'getArchiveInfo' query -*/ -blockchain.archiveNotFound = blockchain.ArchiveInfo; /** * A successul response for the 'getArchiveInfo' query * * @param id archive id */ -blockchain.archiveInfo id:long = blockchain.ArchiveInfo; - +blockchain.archiveInfo.found id:long = blockchain.ArchiveInfo; /** -* An unsuccessul response for the 'getPersistentStatePart' query +* An unsuccessul response for the 'getArchiveInfo' query */ -blockchain.persistentStatePart.notFound = blockchain.PersistentStatePart; +blockchain.archiveInfo.notFound = blockchain.ArchiveInfo; + /** -* A successul response for the 'getPersistentStatePart' query +* Node response when it has the requested persistent state * -* @param data persistent state part +* @param size state file size in bytes +*/ +blockchain.persistentStateInfo.found size:long = blockchain.PersistentStateInfo; +/** +* Node response when it doesn't have the requested state */ -blockchain.persistentStatePart.found data:bytes = blockchain.PersistentStatePart; +blockchain.persistentStateInfo.notFound = blockchain.PersistentStateInfo; /** * Raw data bytes @@ -145,26 +145,33 @@ blockchain.getArchiveInfo * Get archive slice * * @param archive_id archive id (masterchain seqno) +* @param limit max response size in bytes * @param offset part offset in bytes -* @param max_size max response size in bytes */ blockchain.getArchiveSlice archive_id:long + limit:int offset:long - max_size:int = overlay.Response blockchain.Data; +/** +* Get persistent state info +* +* @param block_id requested block id +*/ +blockchain.getPersistentStateInfo + block_id:blockchain.blockId + = overlay.Response blockchain.PersistentStateInfo; + /** * Get persisten state part * * @param block_id requested block id -* @param mc_block_id reference masterchain block id +* @param limit max response size in bytes * @param offset part offset in bytes -* @param max_size max response size in bytes */ blockchain.getPersistentStatePart block_id:blockchain.blockId - mc_block_id:blockchain.blockId + limit:int offset:long - max_size:long - = overlay.Response blockchain.PersistentStatePart; + = overlay.Response blockchain.Data; diff --git a/core/src/proto/blockchain.rs b/core/src/proto/blockchain.rs index 89253e0cf..17fbb3c82 100644 --- a/core/src/proto/blockchain.rs +++ b/core/src/proto/blockchain.rs @@ -42,19 +42,19 @@ pub enum BlockFull { #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] #[tl(boxed, scheme = "proto.tl")] -pub enum PersistentStatePart { - #[tl(id = "blockchain.persistentStatePart.found")] - Found { data: Bytes }, - #[tl(id = "blockchain.persistentStatePart.notFound")] +pub enum PersistentStateInfo { + #[tl(id = "blockchain.persistentStateInfo.found")] + Found { size: u64 }, + #[tl(id = "blockchain.persistentStateInfo.notFound")] NotFound, } #[derive(Debug, Clone, PartialEq, Eq, TlRead, TlWrite)] #[tl(boxed, scheme = "proto.tl")] pub enum ArchiveInfo { - #[tl(id = "blockchain.archiveInfo", size_hint = 8)] + #[tl(id = "blockchain.archiveInfo.found", size_hint = 8)] Found { id: u64 }, - #[tl(id = "blockchain.archiveNotFound")] + #[tl(id = "blockchain.archiveInfo.notFound")] NotFound, } @@ -68,7 +68,7 @@ pub struct MessageBroadcastRef<'tl> { pub mod rpc { use super::*; - #[derive(Clone, TlRead, TlWrite)] + #[derive(Debug, Clone, TlRead, TlWrite)] #[tl(boxed, id = "blockchain.getNextKeyBlockIds", scheme = "proto.tl")] pub struct GetNextKeyBlockIds { #[tl(with = "tl_block_id")] @@ -76,32 +76,37 @@ pub mod rpc { pub max_size: u32, } - #[derive(Clone, TlRead, TlWrite)] + #[derive(Debug, Clone, TlRead, TlWrite)] #[tl(boxed, id = "blockchain.getBlockFull", scheme = "proto.tl")] pub struct GetBlockFull { #[tl(with = "tl_block_id")] pub block_id: everscale_types::models::BlockId, } - #[derive(Clone, TlRead, TlWrite)] + #[derive(Debug, Clone, TlRead, TlWrite)] #[tl(boxed, id = "blockchain.getNextBlockFull", scheme = "proto.tl")] pub struct GetNextBlockFull { #[tl(with = "tl_block_id")] pub prev_block_id: everscale_types::models::BlockId, } - #[derive(Clone, TlRead, TlWrite)] + #[derive(Debug, Clone, TlRead, TlWrite)] + #[tl(boxed, id = "blockchain.getPersistentStateInfo", scheme = "proto.tl")] + pub struct GetPersistentStateInfo { + #[tl(with = "tl_block_id")] + pub block_id: everscale_types::models::BlockId, + } + + #[derive(Debug, Clone, TlRead, TlWrite)] #[tl(boxed, id = "blockchain.getPersistentStatePart", scheme = "proto.tl")] pub struct GetPersistentStatePart { #[tl(with = "tl_block_id")] pub block_id: everscale_types::models::BlockId, - #[tl(with = "tl_block_id")] - pub mc_block_id: everscale_types::models::BlockId, + pub limit: u32, pub offset: u64, - pub max_size: u64, } - #[derive(Clone, TlRead, TlWrite)] + #[derive(Debug, Clone, TlRead, TlWrite)] #[tl( boxed, id = "blockchain.getArchiveInfo", @@ -112,7 +117,7 @@ pub mod rpc { pub mc_seqno: u32, } - #[derive(Clone, TlRead, TlWrite)] + #[derive(Debug, Clone, TlRead, TlWrite)] #[tl( boxed, id = "blockchain.getArchiveSlice", @@ -121,7 +126,7 @@ pub mod rpc { )] pub struct GetArchiveSlice { pub archive_id: u64, + pub limit: u32, pub offset: u64, - pub max_size: u32, } } diff --git a/core/src/sync/mod.rs b/core/src/sync/mod.rs deleted file mode 100644 index d3c6f52cd..000000000 --- a/core/src/sync/mod.rs +++ /dev/null @@ -1,101 +0,0 @@ -use std::sync::Arc; - -use bytes::Bytes; -use everscale_types::models::BlockId; -use futures_util::{StreamExt, TryStreamExt}; -use parking_lot::Mutex; -use tycho_storage::Storage; - -use crate::blockchain_rpc::BlockchainRpcClient; -use crate::proto::blockchain::PersistentStatePart; - -const MAX_REQUEST_SIZE: u64 = 1 << 20; - -pub struct StateDownloader { - storage: Storage, - blockchain_rpc_client: BlockchainRpcClient, - parallel_requests: usize, -} - -impl StateDownloader { - pub fn new( - storage: Storage, - blockchain_rpc_client: BlockchainRpcClient, - parallel_requests: usize, - ) -> Self { - Self { - storage, - blockchain_rpc_client, - parallel_requests, - } - } - - pub async fn download_and_save_state( - &self, - block_id: &BlockId, - mc_block_id: &BlockId, - ) -> anyhow::Result<()> { - let stream = - futures_util::stream::iter((0..).step_by(MAX_REQUEST_SIZE as usize)) - .map(|offset| { - let blockchain_rpc_client = self.blockchain_rpc_client.clone(); - async move { - download_part(blockchain_rpc_client, mc_block_id, block_id, offset).await - } - }) - .buffered(self.parallel_requests); - - let state_store_op = self - .storage - .shard_state_storage() - .begin_store_state_raw(block_id)?; - - // you can use &mut state_store_op, but we don't want to block runtime - let state_store_op = Arc::new(Mutex::new(state_store_op)); - - // not using while let because it will make polling switch between download and store futures, - // and we want to do all of this stuff concurrently - let mut stream = stream.try_filter_map(|x| { - let state_store_op = Arc::clone(&state_store_op); - async move { - match x { - Some(bytes) => { - let res = tokio::task::spawn_blocking(move || { - let mut state_store_op = state_store_op.lock(); - state_store_op.process_part(bytes) - }) - .await - .expect("tokio::task::spawn_blocking failed"); - res.map(Some) - } - None => Ok(None), - } - } - }); - - let mut stream = std::pin::pin!(stream); - while stream.next().await.is_some() {} - - Ok(()) - } -} - -async fn download_part( - blockchain_rpc_client: BlockchainRpcClient, - mc_block_id: &BlockId, - block_id: &BlockId, - offset: u64, -) -> anyhow::Result> { - let state_part = blockchain_rpc_client - .get_persistent_state_part(block_id, mc_block_id, offset, MAX_REQUEST_SIZE) - .await?; - - let data = match state_part.data() { - PersistentStatePart::Found { data } => data.clone(), - PersistentStatePart::NotFound => { - return Ok(None); - } - }; - - Ok(Some(data)) -} diff --git a/storage/src/db/file_db/mapped_file.rs b/storage/src/db/file_db/mapped_file.rs index 437bf632e..e2a64a249 100644 --- a/storage/src/db/file_db/mapped_file.rs +++ b/storage/src/db/file_db/mapped_file.rs @@ -5,6 +5,7 @@ use std::path::Path; /// Memory buffer that is mapped to a file pub struct MappedFile { + #[allow(unused)] file: File, length: usize, ptr: *mut libc::c_void, @@ -80,6 +81,11 @@ impl MappedFile { buffer.len(), ); } + + pub fn as_slice(&self) -> &[u8] { + // SAFETY: ptr and length were initialized once on creation + unsafe { std::slice::from_raw_parts(self.ptr.cast::(), self.length) } + } } impl Drop for MappedFile { @@ -89,9 +95,6 @@ impl Drop for MappedFile { // TODO: how to handle this? panic!("failed to unmap file: {}", std::io::Error::last_os_error()); } - - let _ = self.file.set_len(0); - let _ = self.file.sync_all(); } } diff --git a/storage/src/store/persistent_state/mod.rs b/storage/src/store/persistent_state/mod.rs index 4bbe61f7a..0d2967c97 100644 --- a/storage/src/store/persistent_state/mod.rs +++ b/storage/src/store/persistent_state/mod.rs @@ -1,19 +1,19 @@ -use std::io::{BufReader, Read, Seek, SeekFrom}; +use std::collections::BTreeMap; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use anyhow::{Context, Result}; -use bytes::{Bytes, BytesMut}; +use dashmap::DashMap; use everscale_types::cell::HashBytes; use everscale_types::models::BlockId; +use parking_lot::Mutex; use tokio::time::Instant; -use crate::db::BaseDb; +use crate::db::{BaseDb, FileDb, MappedFile}; use crate::store::BlockHandleStorage; -use crate::FileDb; -mod cell_writer; +mod state_writer; const KEY_BLOCK_UTIME_STEP: u32 = 86400; const BASE_DIR: &str = "states"; @@ -22,6 +22,8 @@ pub struct PersistentStateStorage { db: BaseDb, storage_dir: FileDb, block_handle_storage: Arc, + descriptor_cache: Arc>>, + mc_seqno_to_block_ids: Mutex>>, is_cancelled: Arc, } @@ -34,17 +36,58 @@ impl PersistentStateStorage { let storage_dir = files_dir.create_subdir(BASE_DIR)?; let is_cancelled = Arc::new(AtomicBool::new(false)); - Ok(Self { + let res = Self { db, storage_dir, block_handle_storage, + descriptor_cache: Default::default(), + mc_seqno_to_block_ids: Default::default(), is_cancelled, + }; + res.preload_states()?; + + Ok(res) + } + + pub fn state_exists(&self, block_id: &BlockId) -> bool { + self.descriptor_cache.contains_key(block_id) + } + + pub fn get_state_info(&self, block_id: &BlockId) -> Option { + self.descriptor_cache + .get(block_id) + .map(|cached| PersistentStateInfo { + size: cached.file.length(), + }) + } + + pub async fn read_state_part( + &self, + block_id: &BlockId, + limit: u32, + offset: u64, + ) -> Option> { + // NOTE: Should be noop on x64 + let offset = usize::try_from(offset).ok()?; + let limit = limit as usize; + + let cached = self.descriptor_cache.get(block_id)?.clone(); + if offset > cached.file.length() { + return None; + } + + // NOTE: Cached file is a mapped file, therefore it can take a while to read from it + tokio::task::spawn_blocking(move || { + let end = std::cmp::min(offset.saturating_add(limit), cached.file.length()); + cached.file.as_slice()[offset..end].to_vec() }) + .await + .ok() } - pub async fn save_state( + pub async fn store_state( &self, - mc_block_id: &BlockId, + mc_seqno: u32, block_id: &BlockId, root_hash: &HashBytes, ) -> Result<()> { @@ -53,21 +96,16 @@ impl PersistentStateStorage { let is_cancelled = Some(self.is_cancelled.clone()); let db = self.db.clone(); - let states_dir = self.prepare_persistent_states_dir(mc_block_id)?; + let states_dir = self.prepare_persistent_states_dir(mc_seqno)?; tokio::task::spawn_blocking(move || { - let cell_writer = cell_writer::CellWriter::new(&db, &states_dir, &block_id.root_hash); - match cell_writer.write(&root_hash.0, is_cancelled) { - Ok(()) => { - tracing::info!( - block_id = %block_id, - "successfully wrote persistent state to a file", - ); - } + let cell_writer = state_writer::StateWriter::new(&db, &states_dir, &block_id); + match cell_writer.write(&root_hash, is_cancelled.as_deref()) { + Ok(()) => tracing::info!(block_id = %block_id, "persistent state saved"), Err(e) => { tracing::error!( block_id = %block_id, - "writing persistent state failed: {e:?}" + "failed to write persistent state: {e:?}" ); if let Err(e) = cell_writer.remove() { @@ -76,91 +114,20 @@ impl PersistentStateStorage { } } }) - .await - .map_err(From::from) - } - - pub async fn read_state_part( - &self, - mc_block_id: &BlockId, - block_id: &BlockId, - offset: u64, - size: u64, - ) -> Result { - // todo: add validation for offset and size - // so it won't eat all the memory - let mut builder = self - .mc_states_dir(mc_block_id) - .file(block_id.root_hash.to_string()); - let file_path = builder.path().to_path_buf(); - tokio::task::spawn_blocking(move || { - // TODO: cache file handles - let mut file = builder.read(true).open()?; - - file.seek(SeekFrom::Start(offset)).with_context(|| { - format!( - "failed to seek state file offset, path: {}", - file_path.display() - ) - })?; - - let mut buf_reader = BufReader::new(file); - - let mut result = BytesMut::zeroed(size as usize); - let mut result_cursor = 0; - - let now = Instant::now(); - loop { - match buf_reader.read(&mut result[result_cursor..]) { - Ok(bytes_read) => { - tracing::debug!(bytes_read, "reading state file"); - if bytes_read == 0 || bytes_read == size as usize { - break; - } - result_cursor += bytes_read; - } - Err(e) => { - return Err(anyhow::Error::new(e).context(format!( - "failed to read state file. Path: {}", - file_path.display() - ))) - } - } - } - tracing::debug!( - "finished reading buffer after: {} ms", - now.elapsed().as_millis() - ); - - Ok(result.freeze()) - }) - .await - .unwrap() - } + .await?; - pub fn state_exists(&self, mc_block_id: &BlockId, block_id: &BlockId) -> bool { - // TODO: cache file handles - self.mc_states_dir(mc_block_id) - .file_exists(block_id.root_hash.to_string()) + self.cache_state(mc_seqno, &block_id) } - pub fn prepare_persistent_states_dir(&self, mc_block: &BlockId) -> Result { - let states_dir = self.storage_dir.subdir_readonly(mc_block.seqno.to_string()); + pub fn prepare_persistent_states_dir(&self, mc_seqno: u32) -> Result { + let states_dir = self.mc_states_dir(mc_seqno); if !states_dir.path().is_dir() { - tracing::info!(mc_block = %mc_block, "creating persistent state directory"); + tracing::info!(mc_seqno, "creating persistent state directory"); states_dir.create_if_not_exists()?; } Ok(states_dir) } - fn mc_states_dir(&self, mc_block_id: &BlockId) -> FileDb { - FileDb::new_readonly(self.storage_dir.path().join(mc_block_id.seqno.to_string())) - } - - pub fn cancel(&self) { - self.is_cancelled.store(true, Ordering::Release); - } - pub async fn clear_old_persistent_states(&self) -> Result<()> { tracing::info!("started clearing old persistent state directories"); let start = Instant::now(); @@ -190,6 +157,24 @@ impl PersistentStateStorage { } }; + // Remove cached states + { + let recent_mc_seqno = block.id().seqno; + + let mut index = self.mc_seqno_to_block_ids.lock(); + index.retain(|&mc_seqno, block_ids| { + if mc_seqno >= recent_mc_seqno { + return true; + } + + for block_id in block_ids.drain(..) { + self.descriptor_cache.remove(&block_id); + } + false + }); + } + + // Remove files self.clear_outdated_state_entries(block.id())?; tracing::info!( @@ -241,4 +226,106 @@ impl PersistentStateStorage { Ok(()) } + + fn preload_states(&self) -> Result<()> { + // For each mc_seqno directory + let process_states = |path: &PathBuf, mc_seqno: u32| -> Result<()> { + 'outer: for entry in std::fs::read_dir(path)?.flatten() { + let path = entry.path(); + // Skip subdirectories + if path.is_dir() { + tracing::warn!(path = %path.display(), "unexpected directory"); + continue; + } + + 'file: { + // Try to parse the file name as a block_id + let Ok(name) = entry.file_name().into_string() else { + break 'file; + }; + let Ok(block_id) = name.parse::() else { + break 'file; + }; + + // Cache the state + self.cache_state(mc_seqno, &block_id)?; + continue 'outer; + } + tracing::warn!(path = %path.display(), "unexpected file"); + } + Ok(()) + }; + + // For each entry in the storage directory + 'outer: for entry in self.storage_dir.entries()?.flatten() { + let path = entry.path(); + // Skip files + if path.is_file() { + tracing::warn!(path = %path.display(), "unexpected file"); + continue; + } + + 'dir: { + // Try to parse the directory name as an mc_seqno + let Ok(name) = entry.file_name().into_string() else { + break 'dir; + }; + let Ok(mc_seqno) = name.parse::() else { + break 'dir; + }; + + // Try to load files in the directory as persistent states + process_states(&path, mc_seqno)?; + continue 'outer; + } + tracing::warn!(path = %path.display(), "unexpected directory"); + } + Ok(()) + } + + fn cache_state(&self, mc_seqno: u32, block_id: &BlockId) -> Result<()> { + use dashmap::mapref::entry::Entry; + + let states = self.mc_states_dir(mc_seqno); + let mut file = states.file(block_id.to_string()); + + let mut is_new = false; + if let Entry::Vacant(entry) = self.descriptor_cache.entry(*block_id) { + let file = file + .read(true) + .write(true) + .create(false) + .append(false) + .open_as_mapped()?; + + entry.insert(Arc::new(CachedState { file })); + is_new = true; + } + + if is_new { + let mut index = self.mc_seqno_to_block_ids.lock(); + index.entry(mc_seqno).or_default().push(*block_id); + } + + Ok(()) + } + + fn mc_states_dir(&self, mc_seqno: u32) -> FileDb { + FileDb::new_readonly(self.storage_dir.path().join(mc_seqno.to_string())) + } +} + +impl Drop for PersistentStateStorage { + fn drop(&mut self) { + self.is_cancelled.store(true, Ordering::Release); + } +} + +#[derive(Debug, Clone, Copy)] +pub struct PersistentStateInfo { + pub size: usize, +} + +struct CachedState { + file: MappedFile, } diff --git a/storage/src/store/persistent_state/cell_writer.rs b/storage/src/store/persistent_state/state_writer.rs similarity index 96% rename from storage/src/store/persistent_state/cell_writer.rs rename to storage/src/store/persistent_state/state_writer.rs index 3606032c5..d39ae0944 100644 --- a/storage/src/store/persistent_state/cell_writer.rs +++ b/storage/src/store/persistent_state/state_writer.rs @@ -2,37 +2,37 @@ use std::collections::hash_map; use std::io::{Read, Seek, SeekFrom, Write}; use std::path::PathBuf; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; use anyhow::{Context, Result}; use everscale_types::cell::{CellDescriptor, HashBytes}; +use everscale_types::models::*; use smallvec::SmallVec; use tycho_util::FastHashMap; use crate::db::{BaseDb, FileDb, TempFile}; -pub struct CellWriter<'a> { +pub struct StateWriter<'a> { db: &'a BaseDb, states_dir: &'a FileDb, - block_root_hash: &'a HashBytes, + block_id: &'a BlockId, } -impl<'a> CellWriter<'a> { +impl<'a> StateWriter<'a> { #[allow(unused)] - pub fn new(db: &'a BaseDb, states_dir: &'a FileDb, block_root_hash: &'a HashBytes) -> Self { + pub fn new(db: &'a BaseDb, states_dir: &'a FileDb, block_id: &'a BlockId) -> Self { Self { db, states_dir, - block_root_hash, + block_id, } } #[allow(unused)] - pub fn write(&self, root_hash: &[u8; 32], is_cancelled: Option>) -> Result<()> { + pub fn write(&self, root_hash: &HashBytes, is_cancelled: Option<&AtomicBool>) -> Result<()> { // Load cells from db in reverse order into the temp file tracing::info!("started loading cells"); let mut intermediate = self - .write_rev(root_hash, &is_cancelled) + .write_rev(&root_hash.0, is_cancelled) .context("Failed to write reversed cells data")?; tracing::info!("finished loading cells"); let cell_count = intermediate.cell_sizes.len() as u32; @@ -138,7 +138,7 @@ impl<'a> CellWriter<'a> { fn write_rev( &self, root_hash: &[u8; 32], - is_cancelled: &Option>, + is_cancelled: Option<&AtomicBool>, ) -> Result { enum StackItem { New([u8; 32]), @@ -308,7 +308,7 @@ impl<'a> CellWriter<'a> { } fn file_name(&self) -> PathBuf { - PathBuf::from(self.block_root_hash.to_string()) + PathBuf::from(self.block_id.to_string()) } } diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs index 31541dd58..939e45d04 100644 --- a/storage/src/store/shard_state/mod.rs +++ b/storage/src/store/shard_state/mod.rs @@ -151,13 +151,13 @@ impl ShardStateStorage { Ok(updated) } - pub fn begin_store_state_raw(&'_ self, block_id: &BlockId) -> Result { + pub fn begin_store_state_raw(&self, block_id: &BlockId) -> Result { StoreStateRaw::new( block_id, - &self.db, &self.downloads_dir, - &self.cell_storage, - &self.min_ref_mc_state, + self.db.clone(), + self.cell_storage.clone(), + self.min_ref_mc_state.clone(), ) } diff --git a/storage/src/store/shard_state/store_state_raw.rs b/storage/src/store/shard_state/store_state_raw.rs index 06f60aa59..078f9e4f0 100644 --- a/storage/src/store/shard_state/store_state_raw.rs +++ b/storage/src/store/shard_state/store_state_raw.rs @@ -22,9 +22,9 @@ pub const MAX_DEPTH: u16 = u16::MAX - 1; pub struct StoreStateRaw { block_id: BlockId, - db: &'a BaseDb, - cell_storage: &'a Arc, - min_ref_mc_state: &'a MinRefMcStateTracker, + db: BaseDb, + cell_storage: Arc, + min_ref_mc_state: MinRefMcStateTracker, reader: ShardStatePacketReader, header: Option, cells_read: u64, @@ -36,23 +36,23 @@ pub struct StoreStateRaw { impl StoreStateRaw { pub(crate) fn new( block_id: &BlockId, - db: &'a BaseDb, downloads_dir: &FileDb, - cell_storage: &Arc, - min_ref_mc_state: &MinRefMcStateTracker, + db: BaseDb, + cell_storage: Arc, + min_ref_mc_state: MinRefMcStateTracker, ) -> Result { let file_ctx = FilesContext::new(downloads_dir, block_id).context("failed to create files context")?; - let pg = ProgressBar::builder("downloading state") + let pg = ProgressBar::builder() .exact_unit("cells") - .build(); + .build(|msg| tracing::info!("downloading state... {msg}")); Ok(Self { block_id: *block_id, - db: db.clone(), + db, file_ctx, - cell_storage: cell_storage.clone(), - min_ref_mc_state: min_ref_mc_state.clone(), + cell_storage, + min_ref_mc_state, reader: ShardStatePacketReader::new(), header: None, cells_read: 0, @@ -129,9 +129,9 @@ impl StoreStateRaw { const MAX_DATA_SIZE: usize = 128; const CELLS_PER_BATCH: u64 = 1_000_000; - let mut progress_bar = ProgressBar::builder("processing state") + let mut progress_bar = ProgressBar::builder() .with_mapper(|x| bytesize::to_string(x, false)) - .build(); + .build(|msg| tracing::info!("processing state... {msg}")); let header = match &self.header { Some(header) => header, @@ -626,9 +626,14 @@ mod test { let block_id = parse_filename(filename.as_ref()); - let mut store_state = - StoreStateRaw::new(&block_id, base_db, &download_dir, cell_storage, &tracker) - .context("Failed to create ShardStateReplaceTransaction")?; + let mut store_state = StoreStateRaw::new( + &block_id, + &download_dir, + base_db.clone(), + cell_storage.clone(), + tracker.clone(), + ) + .context("Failed to create ShardStateReplaceTransaction")?; let file = File::open(file.path())?; let mut file = BufReader::new(file); diff --git a/storage/tests/mod.rs b/storage/tests/mod.rs index f1b5d3f09..f6a0887a3 100644 --- a/storage/tests/mod.rs +++ b/storage/tests/mod.rs @@ -53,7 +53,7 @@ fn compare_cells(orig_cell: &DynCell, stored_cell: &DynCell) { #[tokio::test] async fn persistent_storage_everscale() -> Result<()> { - tracing_subscriber::fmt::try_init().ok(); + tycho_util::test::init_logger("persistent_storage_everscale", "debug"); let (storage, _tmp_dir) = Storage::new_temp()?; assert!(storage.node_state().load_init_mc_block_id().is_none()); @@ -107,12 +107,12 @@ async fn persistent_storage_everscale() -> Result<()> { storage .persistent_state_storage() - .prepare_persistent_states_dir(zerostate.block_id())?; + .prepare_persistent_states_dir(zerostate.state().seqno)?; storage .persistent_state_storage() - .save_state( - zerostate.block_id(), + .store_state( + zerostate.state().seqno, zerostate.block_id(), zero_state_raw.cell.repr_hash(), ) @@ -121,22 +121,24 @@ async fn persistent_storage_everscale() -> Result<()> { // Check if state exists let exist = storage .persistent_state_storage() - .state_exists(zerostate.block_id(), zerostate.block_id()); + .state_exists(zerostate.block_id()); assert_eq!(exist, true); - // Read persistent state - let offset = 0u64; - let max_size = 1_000_000u64; + // Read persistent state a couple of times to check if it is stateless + for _ in 0..2 { + let limit = bytesize::mb(1u32); + let offset = 0u64; - let persistent_state_storage = storage.persistent_state_storage(); - let persistent_state_data = persistent_state_storage - .read_state_part(zerostate.block_id(), zerostate.block_id(), offset, max_size) - .await - .unwrap(); + let persistent_state_storage = storage.persistent_state_storage(); + let persistent_state_data = persistent_state_storage + .read_state_part(zerostate.block_id(), limit, offset) + .await + .unwrap(); - // Check state - let cell = Boc::decode(&persistent_state_data)?; - assert_eq!(&cell, zerostate.root_cell()); + // Check state + let cell = Boc::decode(&persistent_state_data)?; + assert_eq!(&cell, zerostate.root_cell()); + } Ok(()) } diff --git a/util/src/progress_bar.rs b/util/src/progress_bar.rs index a4d59ed15..7a1c7da67 100644 --- a/util/src/progress_bar.rs +++ b/util/src/progress_bar.rs @@ -1,15 +1,23 @@ pub struct ProgressBar { - name: &'static str, percentage_step: u64, current: u64, total: Option, exact_unit: Option<&'static str>, - mapper: Box String + Send + 'static>, + mapper: Box, + printer: Box, } +type MapperFn = dyn Fn(u64) -> String + Send + 'static; +type PrinterFn = dyn Fn(&dyn std::fmt::Display) + Send + 'static; + impl ProgressBar { - pub fn builder(name: &'static str) -> ProgressBarBuilder { - ProgressBarBuilder::new(name) + pub fn builder() -> ProgressBarBuilder { + ProgressBarBuilder { + percentage_step: PERCENTAGE_STEP, + total: None, + exact_unit: None, + mapper: None, + } } pub fn set_total(&mut self, total: impl Into) { @@ -54,7 +62,7 @@ impl ProgressBar { #[inline(always)] fn message(&self, text: impl std::fmt::Display) { - tracing::info!("{}... {text}", self.name); + (self.printer)(&text); } fn compute_current_progress(&self) -> Option { @@ -66,7 +74,6 @@ impl ProgressBar { } pub struct ProgressBarBuilder { - name: &'static str, percentage_step: u64, total: Option, exact_unit: Option<&'static str>, @@ -74,16 +81,6 @@ pub struct ProgressBarBuilder { } impl ProgressBarBuilder { - pub fn new(name: &'static str) -> Self { - Self { - name, - percentage_step: PERCENTAGE_STEP, - total: None, - exact_unit: None, - mapper: None, - } - } - pub fn with_mapper(mut self, mapper: F) -> Self where F: Fn(u64) -> String + Send + 'static, @@ -107,14 +104,17 @@ impl ProgressBarBuilder { self } - pub fn build(self) -> ProgressBar { + pub fn build(self, printer: F) -> ProgressBar + where + F: Fn(&dyn std::fmt::Display) + Send + 'static, + { let pg = ProgressBar { - name: self.name, percentage_step: self.percentage_step, current: 0, total: self.total, exact_unit: self.exact_unit, mapper: self.mapper.unwrap_or_else(|| Box::new(|x| x.to_string())), + printer: Box::new(printer), }; if self.total.is_some() {