diff --git a/Cargo.lock b/Cargo.lock index d8fd4c94c..a2f770e6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3227,11 +3227,13 @@ dependencies = [ "bytes", "everscale-types", "hex", + "libc", "parking_lot", "rand", "sha2", "smallvec", "thiserror", + "tracing", "tycho-util", ] diff --git a/block-util/Cargo.toml b/block-util/Cargo.toml index eebf88908..350ad93bf 100644 --- a/block-util/Cargo.toml +++ b/block-util/Cargo.toml @@ -15,10 +15,12 @@ arc-swap = { workspace = true } bytes = { workspace = true } everscale-types = { workspace = true } hex = { workspace = true } +libc = { workspace = true } parking_lot = { workspace = true } sha2 = { workspace = true } smallvec = { workspace = true } thiserror = { workspace = true } +tracing = { workspace = true } # local deps tycho-util = { workspace = true } diff --git a/block-util/src/archive/mod.rs b/block-util/src/archive/mod.rs index c4a739389..4d6cc0cd6 100644 --- a/block-util/src/archive/mod.rs +++ b/block-util/src/archive/mod.rs @@ -1,15 +1,92 @@ +use std::collections::BTreeMap; + use bytes::Bytes; +use everscale_types::models::{Block, BlockId, BlockProof}; pub use self::entry_id::{ArchiveEntryId, GetFileName}; pub use self::reader::{ArchiveEntry, ArchiveReader, ArchiveReaderError, ArchiveVerifier}; +use crate::block::{BlockProofStuff, BlockStuff}; mod entry_id; mod reader; +mod writer; pub const ARCHIVE_PREFIX: [u8; 4] = u32::to_le_bytes(0xae8fdd01); pub const ARCHIVE_ENTRY_PREFIX: [u8; 2] = u16::to_le_bytes(0x1e8b); pub const ARCHIVE_ENTRY_HEADER_LEN: usize = ARCHIVE_ENTRY_PREFIX.len() + 2 + 4; // magic + filename len + data len +pub struct Archive { + pub mc_block_ids: BTreeMap, + pub blocks: BTreeMap, +} + +impl Archive { + pub fn new(data: &[u8]) -> anyhow::Result { + let reader = ArchiveReader::new(data)?; + + let mut res = Archive { + mc_block_ids: Default::default(), + blocks: Default::default(), + }; + + for entry_data in reader { + let entry = entry_data?; + match ArchiveEntryId::from_filename(entry.name)? { + ArchiveEntryId::Block(id) => { + let block = BlockStuff::deserialize_checked(&id, entry.data)?.into_block(); + + res.blocks.entry(id).or_default().block = + Some(WithArchiveData::new(block, entry.data.to_vec())); + + if id.shard.is_masterchain() { + res.mc_block_ids.insert(id.seqno, id); + } + } + ArchiveEntryId::Proof(id) if id.shard.workchain() == -1 => { + let proof = BlockProofStuff::deserialize(&id, entry.data, false)? + .proof() + .clone(); + + res.blocks.entry(id).or_default().proof = + Some(WithArchiveData::new(proof, entry.data.to_vec())); + res.mc_block_ids.insert(id.seqno, id); + } + ArchiveEntryId::ProofLink(id) if id.shard.workchain() != -1 => { + let proof = BlockProofStuff::deserialize(&id, entry.data, true)? + .proof() + .clone(); + + res.blocks.entry(id).or_default().proof = + Some(WithArchiveData::new(proof, entry.data.to_vec())); + } + _ => continue, + } + } + + Ok(res) + } + + pub fn lowest_mc_id(&self) -> Option<&BlockId> { + self.mc_block_ids.values().next() + } + + pub fn highest_mc_id(&self) -> Option<&BlockId> { + self.mc_block_ids.values().next_back() + } + + pub fn get_block_by_id(&self, id: &BlockId) -> Option { + self.blocks + .get(id) + .and_then(|entry| entry.block.as_ref().map(|x| x.data.clone())) + } +} + +#[derive(Default)] +pub struct ArchiveDataEntry { + pub block: Option>, + pub proof: Option>, +} + #[derive(Clone)] pub enum ArchiveData { /// The raw data is known. diff --git a/block-util/src/archive/writer.rs b/block-util/src/archive/writer.rs new file mode 100644 index 000000000..ce8c59be1 --- /dev/null +++ b/block-util/src/archive/writer.rs @@ -0,0 +1,194 @@ +#![allow(clippy::disallowed_types)] +use std::fs::File; +use std::io::{IoSlice, Write}; +use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; + +use anyhow::{Context, Result}; +use parking_lot::Mutex; + +use crate::archive::Archive; + +pub struct ArchiveWriter { + pool_state: Arc, + state: ArchiveWriterState, +} + +impl ArchiveWriter { + pub fn parse_archive(&self) -> Result { + match &self.state { + ArchiveWriterState::InMemory(buffer) => Archive::new(buffer), + ArchiveWriterState::File { file, .. } => { + let mapped_file = + FileWriter::new(file).context("Failed to map temp archive file")?; + + Archive::new(mapped_file.as_slice()) + } + } + } + + fn acquire_memory(&mut self, additional: usize) -> std::io::Result<()> { + if let ArchiveWriterState::InMemory(buffer) = &self.state { + let move_to_file = { + let mut acquired_memory = self.pool_state.acquired_memory.lock(); + if *acquired_memory + additional > self.pool_state.save_to_disk_threshold { + *acquired_memory -= buffer.len(); + true + } else { + *acquired_memory += additional; + false + } + }; + + if move_to_file { + let (path, mut file) = self.pool_state.acquire_file()?; + file.write_all(buffer)?; + self.state = ArchiveWriterState::File { path, file }; + } + } + + Ok(()) + } +} + +impl Drop for ArchiveWriter { + fn drop(&mut self) { + match &self.state { + ArchiveWriterState::InMemory(buffer) => { + *self.pool_state.acquired_memory.lock() -= buffer.len(); + } + ArchiveWriterState::File { path, .. } => { + if let Err(e) = std::fs::remove_file(path) { + tracing::error!( + target: "sync", + path = %path.display(), + "failed to remove temp archive file: {e:?}" + ); + } + } + } + } +} + +impl Write for ArchiveWriter { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.acquire_memory(buf.len())?; + + match &mut self.state { + ArchiveWriterState::InMemory(buffer) => buffer.write(buf), + ArchiveWriterState::File { file, .. } => file.write(buf), + } + } + + fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result { + let len = bufs.iter().map(|b| b.len()).sum(); + + self.acquire_memory(len)?; + + match &mut self.state { + ArchiveWriterState::InMemory(buffer) => { + buffer.reserve(len); + for buf in bufs { + buffer.extend_from_slice(buf); + } + Ok(len) + } + ArchiveWriterState::File { file, .. } => file.write_vectored(bufs), + } + } + + #[inline(always)] + fn flush(&mut self) -> std::io::Result<()> { + match &mut self.state { + ArchiveWriterState::InMemory(_) => Ok(()), + ArchiveWriterState::File { file, .. } => file.flush(), + } + } +} + +struct ArchiveWritersPoolState { + save_to_disk_threshold: usize, + // NOTE: `AtomicUsize` is not used here because there is a complex + // InMemory-to-File transition + acquired_memory: Mutex, + temp_file_index: AtomicUsize, + base_path: PathBuf, +} + +impl ArchiveWritersPoolState { + fn acquire_file(&self) -> std::io::Result<(PathBuf, File)> { + let temp_file_index = self.temp_file_index.fetch_add(1, Ordering::AcqRel); + let path = self + .base_path + .join(format!("temp_archive{temp_file_index:04}")); + + let file = std::fs::OpenOptions::new() + .write(true) + .read(true) + .create(true) + .truncate(true) + .open(&path)?; + + Ok((path, file)) + } +} + +enum ArchiveWriterState { + InMemory(Vec), + File { path: PathBuf, file: File }, +} + +struct FileWriter<'a> { + _file: &'a File, + length: usize, + ptr: *mut libc::c_void, +} + +impl<'a> FileWriter<'a> { + fn new(file: &'a File) -> std::io::Result { + use std::os::unix::io::AsRawFd; + + let length = file.metadata()?.len() as usize; + + // SAFETY: File was opened successfully, file mode is R, offset is aligned + let ptr = unsafe { + libc::mmap( + std::ptr::null_mut(), + length, + libc::PROT_READ, + libc::MAP_SHARED, + file.as_raw_fd(), + 0, + ) + }; + if ptr == libc::MAP_FAILED { + return Err(std::io::Error::last_os_error()); + } + + if unsafe { libc::madvise(ptr, length, libc::MADV_SEQUENTIAL) } != 0 { + return Err(std::io::Error::last_os_error()); + } + + Ok(Self { + _file: file, + length, + ptr, + }) + } + + fn as_slice(&self) -> &[u8] { + unsafe { std::slice::from_raw_parts(self.ptr as *const u8, self.length) } + } +} + +impl Drop for FileWriter<'_> { + fn drop(&mut self) { + // SAFETY: File still exists, ptr and length were initialized once on creation + if unsafe { libc::munmap(self.ptr, self.length) } != 0 { + // TODO: how to handle this? + let e = std::io::Error::last_os_error(); + panic!("failed to unmap temp archive file: {e:?}"); + } + } +} diff --git a/cli/src/node/mod.rs b/cli/src/node/mod.rs index 38f42c456..d8719603b 100644 --- a/cli/src/node/mod.rs +++ b/cli/src/node/mod.rs @@ -437,6 +437,10 @@ impl Node { } }; + if !self.is_synced()? { + // start normal sync + } + Ok(last_key_block_id) } @@ -585,6 +589,10 @@ impl Node { Ok(()) } + + pub fn is_synced(&self) -> Result { + todo!() + } } struct CollatorStateSubscriber { diff --git a/core/src/block_strider/provider/archive_provider.rs b/core/src/block_strider/provider/archive_provider.rs index 65b45dd9c..3c285bd39 100644 --- a/core/src/block_strider/provider/archive_provider.rs +++ b/core/src/block_strider/provider/archive_provider.rs @@ -1,78 +1,14 @@ #![allow(clippy::map_err_ignore)] -use std::collections::BTreeMap; - -use anyhow::Result; -use everscale_types::cell::Load; -use everscale_types::models::{Block, BlockId, BlockIdShort, BlockProof}; +use everscale_types::models::{BlockId, BlockIdShort}; use futures_util::future::BoxFuture; use futures_util::FutureExt; -use sha2::Digest; -use tycho_block_util::archive::{ArchiveEntryId, ArchiveReader}; +use tycho_block_util::archive::Archive; use tycho_block_util::block::{BlockStuff, BlockStuffAug}; use crate::block_strider::provider::{BlockProvider, OptionalBlockStuff}; -pub struct ArchiveBlockProvider { - pub mc_block_ids: BTreeMap, - pub blocks: BTreeMap, -} - -impl ArchiveBlockProvider { - pub fn new(data: &[u8]) -> Result { - let reader = ArchiveReader::new(data)?; - - let mut res = ArchiveBlockProvider { - mc_block_ids: Default::default(), - blocks: Default::default(), - }; - - for data in reader { - let entry = data?; - match ArchiveEntryId::from_filename(entry.name)? { - ArchiveEntryId::Block(id) => { - let block = deserialize_block(&id, entry.data)?; - - res.blocks.entry(id).or_default().block = Some(block); - if id.shard.workchain() == -1 { - // todo: add is_masterchain() method - res.mc_block_ids.insert(id.seqno, id); - } - } - ArchiveEntryId::Proof(id) if id.shard.workchain() == -1 => { - let proof = deserialize_block_proof(&id, entry.data, false)?; - - res.blocks.entry(id).or_default().proof = Some(proof); - res.mc_block_ids.insert(id.seqno, id); - } - ArchiveEntryId::ProofLink(id) if id.shard.workchain() != -1 => { - let proof = deserialize_block_proof(&id, entry.data, true)?; - - res.blocks.entry(id).or_default().proof = Some(proof); - } - _ => continue, - } - } - Ok(res) - } - - pub fn lowest_mc_id(&self) -> Option<&BlockId> { - self.mc_block_ids.values().next() - } - - pub fn highest_mc_id(&self) -> Option<&BlockId> { - self.mc_block_ids.values().next_back() - } - - pub fn get_block_by_id(&self, id: &BlockId) -> Option { - self.blocks - .get(id) - .map(|entry| entry.block.as_ref().unwrap()) - .cloned() - } -} - -impl BlockProvider for ArchiveBlockProvider { +impl BlockProvider for Archive { type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>; @@ -96,48 +32,6 @@ impl BlockProvider for ArchiveBlockProvider { } } -#[derive(Default)] -pub struct ArchiveDataEntry { - pub block: Option, - pub proof: Option, -} - -pub(crate) fn deserialize_block(id: &BlockId, data: &[u8]) -> Result { - let file_hash = sha2::Sha256::digest(data); - if id.file_hash.as_slice() != file_hash.as_slice() { - Err(ArchiveDataError::InvalidFileHash(id.as_short_id())) - } else { - let root = everscale_types::boc::Boc::decode(data) - .map_err(|_| ArchiveDataError::InvalidBlockData)?; - if &id.root_hash != root.repr_hash() { - return Err(ArchiveDataError::InvalidRootHash); - } - - Block::load_from(&mut root.as_slice()?).map_err(|_| ArchiveDataError::InvalidBlockData) - } -} - -pub(crate) fn deserialize_block_proof( - block_id: &BlockId, - data: &[u8], - is_link: bool, -) -> Result { - let root = - everscale_types::boc::Boc::decode(data).map_err(|_| ArchiveDataError::InvalidBlockProof)?; - let proof = everscale_types::models::BlockProof::load_from(&mut root.as_slice()?) - .map_err(|_| ArchiveDataError::InvalidBlockProof)?; - - if &proof.proof_for != block_id { - return Err(ArchiveDataError::ProofForAnotherBlock); - } - - if !block_id.shard.workchain() == -1 && !is_link { - Err(ArchiveDataError::ProofForNonMasterchainBlock) - } else { - Ok(proof) - } -} - #[derive(thiserror::Error, Debug)] pub(crate) enum ArchiveDataError { #[error("Invalid file hash {0}")] diff --git a/core/src/blockchain_rpc/client.rs b/core/src/blockchain_rpc/client.rs index 73cec1ed3..43107e15d 100644 --- a/core/src/blockchain_rpc/client.rs +++ b/core/src/blockchain_rpc/client.rs @@ -1,11 +1,11 @@ +use std::io::Write; use std::sync::Arc; use anyhow::Result; use bytes::Bytes; use everscale_types::models::BlockId; use futures_util::stream::{FuturesUnordered, StreamExt}; -use tycho_block_util::archive::{ArchiveData, WithArchiveData}; -use tycho_block_util::block::{BlockProofStuff, BlockProofStuffAug}; +use tycho_block_util::archive::ArchiveVerifier; use tycho_block_util::state::ShardStateStuff; use tycho_network::{PublicOverlay, Request}; use tycho_storage::Storage; @@ -363,4 +363,100 @@ impl BlockchainRpcClient { "downloaded incomplete state" ))) } + + pub async fn download_archive( + &self, + mc_seqno: u32, + output: &mut (dyn Write + Send), + ) -> Result { + const CHUNK_SIZE: u32 = 2 << 20; // 2 MB + + // TODO: Iterate through all known (or unknown) neighbours + const NEIGHBOUR_COUNT: usize = 10; + let neighbours = self + .overlay_client() + .neighbours() + .choose_multiple(NEIGHBOUR_COUNT) + .await; + + // Find a neighbour which has the requested archive + let (neighbour, archive_id) = 'info: { + let req = Request::from_tl(rpc::GetArchiveInfo { mc_seqno }); + + let mut futures = FuturesUnordered::new(); + for neighbour in neighbours { + futures.push(self.overlay_client().query_raw(neighbour, req.clone())); + } + + let mut err = None; + while let Some(info) = futures.next().await { + let (handle, info) = match info { + Ok(res) => res.split(), + Err(e) => { + err = Some(e); + continue; + } + }; + + match info { + ArchiveInfo::Found { id } => break 'info (handle.accept(), id), + ArchiveInfo::NotFound => continue, + } + } + + return match err { + None => Err(Error::Internal(anyhow::anyhow!( + "no neighbour has the requested archive" + ))), + Some(err) => Err(err), + }; + }; + + let mut verifier = ArchiveVerifier::default(); + + // TODO: add retry count to interrupt infinite loop + let mut offset = 0; + loop { + let req = Request::from_tl(rpc::GetArchiveSlice { + archive_id, + offset, + limit: CHUNK_SIZE, + }); + + let res = self + .overlay_client() + .query_raw::(neighbour.clone(), req) + .await; + + match res { + Ok(res) => { + let chunk = &res.data().data; + + verifier.write_verify(chunk).map_err(|e| { + Error::Internal(anyhow::anyhow!("Received invalid archive chunk: {e}")) + })?; + + let is_last = chunk.len() < CHUNK_SIZE as usize; + if is_last { + verifier.final_check().map_err(|e| { + Error::Internal(anyhow::anyhow!("Received invalid archive: {e}")) + })?; + } + + output.write_all(chunk).map_err(|e| { + Error::Internal(anyhow::anyhow!("Failed to write archive chunk: {e}")) + })?; + + offset += chunk.len() as u64; + + if is_last { + return Ok(offset as usize); + } + } + Err(e) => { + tracing::error!(archive_id, offset, "Failed to download archive: {e:?}",); + } + } + } + } }