From 8ce3b5633c2722ed8f87f6b4aaaa4b1c12f51007 Mon Sep 17 00:00:00 2001 From: Ivan Kalinin Date: Mon, 8 Jul 2024 19:14:49 +0200 Subject: [PATCH] refactor: simplify archives --- Cargo.lock | 2 +- block-util/Cargo.toml | 2 +- block-util/src/archive/mod.rs | 150 ++++++------ block-util/src/archive/reader.rs | 5 + block-util/src/archive/writer.rs | 219 ------------------ cli/src/node/boot/cold_boot.rs | 65 +++--- cli/src/node/config.rs | 5 +- cli/src/node/mod.rs | 11 +- collator/src/state_node.rs | 3 +- collator/src/validator/state.rs | 2 +- collator/src/validator/types.rs | 27 +-- collator/src/validator/validator.rs | 2 +- core/src/block_strider/mod.rs | 6 +- .../provider/archive_provider.rs | 210 ++++++----------- .../provider/blockchain_provider.rs | 27 +-- core/src/block_strider/provider/mod.rs | 72 +++--- core/src/block_strider/state_applier.rs | 2 +- core/src/blockchain_rpc/client.rs | 1 + storage/src/models/block_meta.rs | 26 --- storage/src/models/mod.rs | 2 +- storage/src/store/block/mod.rs | 2 + 21 files changed, 235 insertions(+), 606 deletions(-) delete mode 100644 block-util/src/archive/writer.rs diff --git a/Cargo.lock b/Cargo.lock index 09a789b18..8a426bf59 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3277,9 +3277,9 @@ dependencies = [ "anyhow", "arc-swap", "bytes", + "bytesize", "everscale-types", "hex", - "libc", "parking_lot", "rand", "sha2", diff --git a/block-util/Cargo.toml b/block-util/Cargo.toml index 350ad93bf..e583439fe 100644 --- a/block-util/Cargo.toml +++ b/block-util/Cargo.toml @@ -13,9 +13,9 @@ license.workspace = true anyhow = { workspace = true } arc-swap = { workspace = true } bytes = { workspace = true } +bytesize = { workspace = true } everscale-types = { workspace = true } hex = { workspace = true } -libc = { workspace = true } parking_lot = { workspace = true } sha2 = { workspace = true } smallvec = { workspace = true } diff --git a/block-util/src/archive/mod.rs b/block-util/src/archive/mod.rs index 39371a708..1b2e25801 100644 --- a/block-util/src/archive/mod.rs +++ b/block-util/src/archive/mod.rs @@ -1,32 +1,36 @@ use std::collections::BTreeMap; +use anyhow::Result; use bytes::Bytes; -use everscale_types::models::{Block, BlockId, BlockProof}; +use everscale_types::models::BlockId; +use tycho_util::FastHashMap; pub use self::entry_id::{ArchiveEntryId, ArchiveEntryIdKind, GetFileName}; pub use self::reader::{ArchiveEntry, ArchiveReader, ArchiveReaderError, ArchiveVerifier}; -pub use self::writer::ArchiveWritersPool; -use crate::block::{BlockProofStuff, BlockStuff, BlockStuffAug}; +use crate::block::{BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug}; mod entry_id; mod reader; -mod writer; pub const ARCHIVE_PREFIX: [u8; 4] = u32::to_le_bytes(0xae8fdd01); pub const ARCHIVE_ENTRY_PREFIX: [u8; 2] = u16::to_le_bytes(0x1e8b); pub const ARCHIVE_ENTRY_HEADER_LEN: usize = ARCHIVE_ENTRY_PREFIX.len() + 2 + 4; // magic + filename len + data len pub struct Archive { - pub block_ids: BTreeMap, - pub blocks: BTreeMap, + pub mc_block_ids: BTreeMap, + pub blocks: FastHashMap, } impl Archive { - pub fn new(data: &[u8]) -> anyhow::Result { - let reader = ArchiveReader::new(data)?; + pub fn new(data: T) -> Result + where + Bytes: From, + { + let data = Bytes::from(data); + let reader = ArchiveReader::new(&data)?; let mut res = Archive { - block_ids: Default::default(), + mc_block_ids: Default::default(), blocks: Default::default(), }; @@ -34,93 +38,69 @@ impl Archive { let entry = entry_data?; match ArchiveEntryId::from_filename(entry.name)? { ArchiveEntryId::Block(id) => { - let block = BlockStuff::deserialize_checked(&id, entry.data)?.into_block(); - - res.block_ids.insert(id.seqno, id); - - res.blocks.entry(id).or_default().block = - Some(WithArchiveData::new(block, entry.data.to_vec())); + let block = BlockStuff::deserialize_checked(&id, entry.data)?; + + if id.shard.is_masterchain() { + res.mc_block_ids.insert(id.seqno, id); + } + + let parsed = res.blocks.entry(id).or_default(); + anyhow::ensure!(parsed.block.is_none(), "duplicate block data for: {id}"); + parsed.block = Some(WithArchiveData::new::( + block, + data.slice_ref(entry.data), + )); } - ArchiveEntryId::Proof(id) if id.shard.is_masterchain() => { - let proof = BlockProofStuff::deserialize(&id, entry.data, false)? - .proof() - .clone(); + ArchiveEntryId::Proof(id) => { + let proof = BlockProofStuff::deserialize(&id, entry.data, false)?; - res.block_ids.insert(id.seqno, id); + res.mc_block_ids.insert(id.seqno, id); - res.blocks.entry(id).or_default().proof = - Some(WithArchiveData::new(proof, entry.data.to_vec())); + let parsed = res.blocks.entry(id).or_default(); + anyhow::ensure!(parsed.proof.is_none(), "duplicate block proof for: {id}"); + parsed.proof = Some(WithArchiveData::new::( + proof, + data.slice_ref(entry.data), + )); } - ArchiveEntryId::ProofLink(id) if !id.shard.is_masterchain() => { - let proof = BlockProofStuff::deserialize(&id, entry.data, true)? - .proof() - .clone(); - - res.block_ids.insert(id.seqno, id); - - res.blocks.entry(id).or_default().proof = - Some(WithArchiveData::new(proof, entry.data.to_vec())); + ArchiveEntryId::ProofLink(id) => { + let proof = BlockProofStuff::deserialize(&id, entry.data, true)?; + + let parsed = res.blocks.entry(id).or_default(); + anyhow::ensure!(parsed.proof.is_none(), "duplicate block proof for: {id}"); + parsed.proof = Some(WithArchiveData::new::( + proof, + data.slice_ref(entry.data), + )); } - _ => continue, } } Ok(res) } - pub fn get_block_with_archive(&self, id: &BlockId) -> anyhow::Result { - let archive_data = self.blocks.get(id).ok_or(ArchiveError::WrongArchive)?; - - let block = archive_data - .block - .as_ref() - .ok_or(ArchiveError::BlockNotFound)?; - - let data = everscale_types::boc::BocRepr::encode(block.data.clone())?; - - Ok(BlockStuffAug::new( - BlockStuff::with_block(*id, block.data.clone()), - data, - )) + pub fn get_block_by_id(&self, id: &BlockId) -> Result<&BlockStuffAug, ArchiveError> { + let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?; + entry.block.as_ref().ok_or(ArchiveError::BlockNotFound) } - pub fn get_block_by_id(&self, id: &BlockId) -> anyhow::Result { - let archive_data = self.blocks.get(id).ok_or(ArchiveError::WrongArchive)?; - - let block = archive_data - .block - .as_ref() - .ok_or(ArchiveError::BlockNotFound)?; - - Ok(BlockStuff::with_block(*id, block.data.clone())) + pub fn get_proof_by_id(&self, id: &BlockId) -> Result<&BlockProofStuffAug, ArchiveError> { + let entry = self.blocks.get(id).ok_or(ArchiveError::OutOfRange)?; + entry.proof.as_ref().ok_or(ArchiveError::ProofNotFound) } - pub fn get_proof_by_id(&self, id: &BlockId) -> anyhow::Result { - let archive_data = self.blocks.get(id).ok_or(ArchiveError::WrongArchive)?; - - let proof = archive_data - .proof - .as_ref() - .ok_or(ArchiveError::ProofNotFound)?; - - let is_link = !proof.proof_for.is_masterchain(); - let proof = BlockProofStuff::from_proof(Box::new(proof.data.clone()), is_link)?; - - Ok(proof) - } - - pub fn get_block_by_seqno(&self, seqno: u32) -> anyhow::Result { + pub fn get_mc_block_by_seqno(&self, seqno: u32) -> Result<&BlockStuffAug, ArchiveError> { let id = self - .block_ids + .mc_block_ids .get(&seqno) .ok_or(ArchiveError::BlockNotFound)?; self.get_block_by_id(id) } - pub fn get_proof_by_seqno(&self, seqno: u32) -> anyhow::Result { + pub fn get_mc_proof_by_seqno(&self, seqno: u32) -> Result<&BlockProofStuffAug, ArchiveError> { let id = self - .block_ids + .mc_block_ids .get(&seqno) .ok_or(ArchiveError::BlockNotFound)?; @@ -130,8 +110,8 @@ impl Archive { #[derive(Default)] pub struct ArchiveDataEntry { - pub block: Option>, - pub proof: Option>, + pub block: Option, + pub proof: Option, } #[derive(Clone)] @@ -204,6 +184,16 @@ impl std::ops::Deref for WithArchiveData { #[error("archive data not loaded")] pub struct WithArchiveDataError; +#[derive(thiserror::Error, Debug)] +pub enum ArchiveError { + #[error("mc block seqno out of range")] + OutOfRange, + #[error("block not found")] + BlockNotFound, + #[error("proof not found")] + ProofNotFound, +} + /// Encodes archive package segment. pub fn make_archive_entry(filename: &str, data: &[u8]) -> Vec { let mut vec = Vec::with_capacity(2 + 2 + 4 + filename.len() + data.len()); @@ -215,16 +205,6 @@ pub fn make_archive_entry(filename: &str, data: &[u8]) -> Vec { vec } -#[derive(thiserror::Error, Debug)] -enum ArchiveError { - #[error("Block not found in archive")] - WrongArchive, - #[error("Block not found")] - BlockNotFound, - #[error("Proof not found")] - ProofNotFound, -} - #[cfg(test)] mod tests { use super::*; diff --git a/block-util/src/archive/reader.rs b/block-util/src/archive/reader.rs index ee53d06ed..e3ccda53f 100644 --- a/block-util/src/archive/reader.rs +++ b/block-util/src/archive/reader.rs @@ -13,6 +13,11 @@ impl<'a> ArchiveReader<'a> { read_package_header(data, &mut offset)?; Ok(Self { data, offset }) } + + #[inline] + pub fn offset(&self) -> usize { + self.offset + } } impl<'a> Iterator for ArchiveReader<'a> { diff --git a/block-util/src/archive/writer.rs b/block-util/src/archive/writer.rs deleted file mode 100644 index c12cfe1ba..000000000 --- a/block-util/src/archive/writer.rs +++ /dev/null @@ -1,219 +0,0 @@ -#![allow(clippy::disallowed_types)] -use std::fs::File; -use std::io::{IoSlice, Write}; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::sync::Arc; - -use anyhow::{Context, Result}; -use parking_lot::Mutex; - -use crate::archive::Archive; - -pub struct ArchiveWriter { - pool_state: Arc, - state: ArchiveWriterState, -} - -impl ArchiveWriter { - pub fn parse_archive(&self) -> Result { - match &self.state { - ArchiveWriterState::InMemory(buffer) => Archive::new(buffer), - ArchiveWriterState::File { file, .. } => { - let mapped_file = - FileWriter::new(file).context("Failed to map temp archive file")?; - - Archive::new(mapped_file.as_slice()) - } - } - } - - fn acquire_memory(&mut self, additional: usize) -> std::io::Result<()> { - if let ArchiveWriterState::InMemory(buffer) = &self.state { - let move_to_file = { - let mut acquired_memory = self.pool_state.acquired_memory.lock(); - if *acquired_memory + additional > self.pool_state.save_to_disk_threshold { - *acquired_memory -= buffer.len(); - true - } else { - *acquired_memory += additional; - false - } - }; - - if move_to_file { - let (path, mut file) = self.pool_state.acquire_file()?; - file.write_all(buffer)?; - self.state = ArchiveWriterState::File { path, file }; - } - } - - Ok(()) - } -} - -impl Drop for ArchiveWriter { - fn drop(&mut self) { - match &self.state { - ArchiveWriterState::InMemory(buffer) => { - *self.pool_state.acquired_memory.lock() -= buffer.len(); - } - ArchiveWriterState::File { path, .. } => { - if let Err(e) = std::fs::remove_file(path) { - tracing::error!( - target: "sync", - path = %path.display(), - "failed to remove temp archive file: {e:?}" - ); - } - } - } - } -} - -impl Write for ArchiveWriter { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.acquire_memory(buf.len())?; - - match &mut self.state { - ArchiveWriterState::InMemory(buffer) => buffer.write(buf), - ArchiveWriterState::File { file, .. } => file.write(buf), - } - } - - fn write_vectored(&mut self, bufs: &[IoSlice<'_>]) -> std::io::Result { - let len = bufs.iter().map(|b| b.len()).sum(); - - self.acquire_memory(len)?; - - match &mut self.state { - ArchiveWriterState::InMemory(buffer) => { - buffer.reserve(len); - for buf in bufs { - buffer.extend_from_slice(buf); - } - Ok(len) - } - ArchiveWriterState::File { file, .. } => file.write_vectored(bufs), - } - } - - #[inline(always)] - fn flush(&mut self) -> std::io::Result<()> { - match &mut self.state { - ArchiveWriterState::InMemory(_) => Ok(()), - ArchiveWriterState::File { file, .. } => file.flush(), - } - } -} - -#[derive(Clone)] -pub struct ArchiveWritersPool { - state: Arc, -} - -impl ArchiveWritersPool { - pub fn new(base_path: impl AsRef, save_to_disk_threshold: usize) -> Self { - Self { - state: Arc::new(ArchiveWritersPoolState { - save_to_disk_threshold, - acquired_memory: Default::default(), - temp_file_index: Default::default(), - base_path: base_path.as_ref().to_path_buf(), - }), - } - } - - pub fn acquire(&self) -> ArchiveWriter { - ArchiveWriter { - pool_state: self.state.clone(), - state: ArchiveWriterState::InMemory(Vec::new()), - } - } -} - -struct ArchiveWritersPoolState { - save_to_disk_threshold: usize, - // NOTE: `AtomicUsize` is not used here because there is a complex - // InMemory-to-File transition - acquired_memory: Mutex, - temp_file_index: AtomicUsize, - base_path: PathBuf, -} - -impl ArchiveWritersPoolState { - fn acquire_file(&self) -> std::io::Result<(PathBuf, File)> { - let temp_file_index = self.temp_file_index.fetch_add(1, Ordering::AcqRel); - let path = self - .base_path - .join(format!("temp_archive{temp_file_index:04}")); - - let file = std::fs::OpenOptions::new() - .write(true) - .read(true) - .create(true) - .truncate(true) - .open(&path)?; - - Ok((path, file)) - } -} - -enum ArchiveWriterState { - InMemory(Vec), - File { path: PathBuf, file: File }, -} - -struct FileWriter<'a> { - _file: &'a File, - length: usize, - ptr: *mut libc::c_void, -} - -impl<'a> FileWriter<'a> { - fn new(file: &'a File) -> std::io::Result { - use std::os::unix::io::AsRawFd; - - let length = file.metadata()?.len() as usize; - - // SAFETY: File was opened successfully, file mode is R, offset is aligned - let ptr = unsafe { - libc::mmap( - std::ptr::null_mut(), - length, - libc::PROT_READ, - libc::MAP_SHARED, - file.as_raw_fd(), - 0, - ) - }; - if ptr == libc::MAP_FAILED { - return Err(std::io::Error::last_os_error()); - } - - if unsafe { libc::madvise(ptr, length, libc::MADV_SEQUENTIAL) } != 0 { - return Err(std::io::Error::last_os_error()); - } - - Ok(Self { - _file: file, - length, - ptr, - }) - } - - fn as_slice(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.ptr as *const u8, self.length) } - } -} - -impl Drop for FileWriter<'_> { - fn drop(&mut self) { - // SAFETY: File still exists, ptr and length were initialized once on creation - if unsafe { libc::munmap(self.ptr, self.length) } != 0 { - // TODO: how to handle this? - let e = std::io::Error::last_os_error(); - panic!("failed to unmap temp archive file: {e:?}"); - } - } -} diff --git a/cli/src/node/boot/cold_boot.rs b/cli/src/node/boot/cold_boot.rs index 612988c92..4d4a253e5 100644 --- a/cli/src/node/boot/cold_boot.rs +++ b/cli/src/node/boot/cold_boot.rs @@ -1,10 +1,9 @@ use std::path::PathBuf; use std::sync::Arc; -use anyhow::Context; -use everscale_types::boc::Boc; -use everscale_types::cell::CellBuilder; -use everscale_types::models::{BlockId, ShardIdent, ShardStateUnsplit}; +use anyhow::{Context, Result}; +use everscale_types::models::*; +use everscale_types::prelude::*; use futures_util::StreamExt; use tokio::sync::mpsc; use tycho_block_util::archive::WithArchiveData; @@ -12,7 +11,7 @@ use tycho_block_util::block::{BlockProofStuff, BlockStuff}; use tycho_block_util::state::{MinRefMcStateTracker, ShardStateStuff}; use tycho_core::proto::blockchain::BlockFull; use tycho_storage::{ - BlockHandle, BlockMetaData, BriefBlockInfo, KeyBlocksDirection, KEY_BLOCK_UTIME_STEP, + BlockHandle, BlockMetaData, BlockProofHandle, KeyBlocksDirection, KEY_BLOCK_UTIME_STEP, }; use tycho_util::futures::JoinTask; use tycho_util::time::now_sec; @@ -24,7 +23,7 @@ use crate::util::error::ResultExt; /// Boot type when the node has not yet started syncing /// /// Returns last masterchain key block id -pub async fn run(node: &Arc, zerostates: Option>) -> anyhow::Result { +pub async fn run(node: &Arc, zerostates: Option>) -> Result { tracing::info!("starting cold boot"); // Find the last known key block (or zerostate) @@ -51,7 +50,7 @@ pub async fn run(node: &Arc, zerostates: Option>) -> anyhow:: async fn prepare_prev_key_block( node: &Arc, zerostates: Option>, -) -> anyhow::Result { +) -> Result { let block_id = node .storage .node_state() @@ -99,10 +98,7 @@ async fn prepare_prev_key_block( Ok(prev_key_block) } -async fn download_key_blocks( - node: &Arc, - mut prev_key_block: PrevKeyBlock, -) -> anyhow::Result<()> { +async fn download_key_blocks(node: &Arc, mut prev_key_block: PrevKeyBlock) -> Result<()> { const BLOCKS_PER_BATCH: u32 = 10; const PARALLEL_REQUESTS: usize = 10; @@ -203,14 +199,12 @@ async fn download_key_blocks( for (index, proof) in proofs.into_iter().enumerate() { // Verify block proof match prev_key_block.check_next_proof(&proof.data) { - Ok(info) => { + Ok(meta) => { // Save block proof - let block_id = proof.id(); - let handle = node .storage .block_storage() - .store_block_proof(&proof, info.with_mc_seq_no(block_id.seqno).into()) + .store_block_proof(&proof, BlockProofHandle::New(meta)) .await? .handle; @@ -266,7 +260,7 @@ async fn download_key_blocks( } /// Select the latest suitable key block with persistent state -fn choose_key_block(node: &Node) -> anyhow::Result { +fn choose_key_block(node: &Node) -> Result { let block_handle_storage = node.storage.block_handle_storage(); let mut key_blocks = block_handle_storage @@ -333,7 +327,7 @@ fn choose_key_block(node: &Node) -> anyhow::Result { async fn import_zerostates( node: &Arc, paths: Vec, -) -> anyhow::Result<(BlockHandle, ShardStateStuff)> { +) -> Result<(BlockHandle, ShardStateStuff)> { // Use a separate tracker for zerostates let tracker = MinRefMcStateTracker::default(); @@ -410,7 +404,7 @@ async fn import_zerostates( handle_storage.create_or_load_handle(state.block_id(), BlockMetaData { is_key_block: state.block_id().is_masterchain(), gen_utime, - mc_ref_seqno: 0, + mc_ref_seqno: Some(0), }); let stored = state_storage @@ -446,7 +440,7 @@ async fn import_zerostates( Ok((handle, state)) } -async fn download_zerostates(node: &Arc) -> anyhow::Result<(BlockHandle, ShardStateStuff)> { +async fn download_zerostates(node: &Arc) -> Result<(BlockHandle, ShardStateStuff)> { let zerostate_id = node.zerostate.as_block_id(); let (handle, state) = load_or_download_state(node, &zerostate_id).await?; @@ -463,7 +457,7 @@ async fn download_zerostates(node: &Arc) -> anyhow::Result<(BlockHandle, S async fn load_or_download_state( node: &Arc, block_id: &BlockId, -) -> anyhow::Result<(BlockHandle, ShardStateStuff)> { +) -> Result<(BlockHandle, ShardStateStuff)> { let storage = &node.storage; let blockchain_rpc_client = &node.blockchain_rpc_client; @@ -514,10 +508,7 @@ async fn load_or_download_state( Ok((handle, state)) } -async fn download_start_blocks_and_states( - node: &Arc, - mc_block_id: &BlockId, -) -> anyhow::Result<()> { +async fn download_start_blocks_and_states(node: &Arc, mc_block_id: &BlockId) -> Result<()> { // Download and save masterchain block and state let (_, init_mc_block) = download_block_with_state(node, *mc_block_id, *mc_block_id).await?; @@ -538,7 +529,7 @@ async fn download_block_with_state( node: &Arc, mc_block_id: BlockId, block_id: BlockId, -) -> anyhow::Result<(BlockHandle, BlockStuff)> { +) -> Result<(BlockHandle, BlockStuff)> { let block_storage = node.storage.block_storage(); let block_handle_storage = node.storage.block_handle_storage(); @@ -596,10 +587,11 @@ async fn download_block_with_state( match proof.pre_check_block_proof() { Ok((_, block_info)) => { - let meta_data = BriefBlockInfo::from(&block_info) - .with_mc_seq_no(mc_seqno); - - break (block, proof, meta_data); + break (block, proof, BlockMetaData { + is_key_block: block_info.key_block, + gen_utime: block_info.gen_utime, + mc_ref_seqno: Some(mc_seqno), + }); } Err(e) => { tracing::error!("received invalid block: {e:?}"); @@ -653,10 +645,7 @@ async fn download_block_with_state( Ok((handle, block)) } -fn load_zerostate( - tracker: &MinRefMcStateTracker, - path: &PathBuf, -) -> anyhow::Result { +fn load_zerostate(tracker: &MinRefMcStateTracker, path: &PathBuf) -> Result { let data = std::fs::read(path).wrap_err("failed to read file")?; let file_hash = Boc::file_hash(&data); @@ -684,7 +673,7 @@ fn make_shard_state( global_id: i32, shard_ident: ShardIdent, now: u32, -) -> anyhow::Result { +) -> Result { let state = ShardStateUnsplit { global_id, shard_ident, @@ -726,12 +715,16 @@ impl PrevKeyBlock { } } - fn check_next_proof(&self, next_proof: &BlockProofStuff) -> anyhow::Result { + fn check_next_proof(&self, next_proof: &BlockProofStuff) -> Result { let (virt_block, virt_block_info) = next_proof .pre_check_block_proof() .context("Failed to pre check block proof")?; - let res = BriefBlockInfo::from(&virt_block_info); + let res = BlockMetaData { + is_key_block: virt_block_info.key_block, + gen_utime: virt_block_info.gen_utime, + mc_ref_seqno: Some(next_proof.proof().proof_for.seqno), + }; match self { // Check block proof with zero state diff --git a/cli/src/node/config.rs b/cli/src/node/config.rs index 5506b3404..f627cd7dd 100644 --- a/cli/src/node/config.rs +++ b/cli/src/node/config.rs @@ -6,7 +6,7 @@ use everscale_crypto::ed25519; use everscale_types::cell::HashBytes; use serde::{Deserialize, Serialize}; use tycho_collator::types::CollationConfig; -use tycho_core::block_strider::{ArchiveBlockProviderConfig, BlockchainBlockProviderConfig}; +use tycho_core::block_strider::BlockchainBlockProviderConfig; use tycho_core::blockchain_rpc::BlockchainRpcServiceConfig; use tycho_core::overlay_client::PublicOverlayClientConfig; use tycho_network::{DhtConfig, NetworkConfig, OverlayConfig, PeerResolverConfig}; @@ -58,8 +58,6 @@ pub struct NodeConfig { pub blockchain_rpc_service: BlockchainRpcServiceConfig, - pub archive_block_provider: ArchiveBlockProviderConfig, - pub blockchain_block_provider: BlockchainBlockProviderConfig, pub collator: CollationConfig, @@ -86,7 +84,6 @@ impl Default for NodeConfig { public_overlay_client: PublicOverlayClientConfig::default(), storage: StorageConfig::default(), blockchain_rpc_service: BlockchainRpcServiceConfig::default(), - archive_block_provider: ArchiveBlockProviderConfig::default(), blockchain_block_provider: BlockchainBlockProviderConfig::default(), collator: CollationConfig::default(), rpc: Some(RpcConfig::default()), diff --git a/cli/src/node/mod.rs b/cli/src/node/mod.rs index 25a0c3a12..24807149a 100644 --- a/cli/src/node/mod.rs +++ b/cli/src/node/mod.rs @@ -24,7 +24,7 @@ use tycho_collator::validator::client::retry::BackoffConfig; use tycho_collator::validator::config::ValidatorConfig; use tycho_collator::validator::validator::ValidatorStdImplFactory; use tycho_core::block_strider::{ - BlockProvider, BlockStrider, BlockSubscriberExt, BlockchainBlockProvider, + ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriberExt, BlockchainBlockProvider, BlockchainBlockProviderConfig, GcSubscriber, MetricsSubscriber, OptionalBlockStuff, PersistentBlockStriderState, ShardStateApplier, StateSubscriber, StateSubscriberContext, StorageBlockProvider, @@ -299,7 +299,6 @@ pub struct Node { state_tracker: MinRefMcStateTracker, rpc_config: Option, - archive_block_provider_config: ArchiveBlockProviderConfig, blockchain_block_provider_config: BlockchainBlockProviderConfig, collation_config: CollationConfig, @@ -420,7 +419,6 @@ impl Node { blockchain_rpc_client, state_tracker, rpc_config: node_config.rpc, - archive_block_provider_config: node_config.archive_block_provider, blockchain_block_provider_config: node_config.blockchain_block_provider, collation_config: node_config.collator, }) @@ -578,11 +576,8 @@ impl Node { PersistentBlockStriderState::new(self.zerostate.as_block_id(), self.storage.clone()); // TODO: add to block_strider later - let _archive_block_provider = ArchiveBlockProvider::new( - self.blockchain_rpc_client.clone(), - self.storage.clone(), - self.archive_block_provider_config.clone(), - ); + let _archive_block_provider = + ArchiveBlockProvider::new(self.blockchain_rpc_client.clone(), self.storage.clone()); let block_strider = BlockStrider::builder() .with_provider(( diff --git a/collator/src/state_node.rs b/collator/src/state_node.rs index 431bf2f83..34d4ae7bd 100644 --- a/collator/src/state_node.rs +++ b/collator/src/state_node.rs @@ -379,7 +379,8 @@ impl StateNodeAdapterStdImpl { (i as u16, BlockSignature { node_id_short: tl_proto::hash(everscale_crypto::tl::PublicKey::Ed25519 { key: &key.as_array(), - }), + }) + .into(), signature: *value, }) }), diff --git a/collator/src/validator/state.rs b/collator/src/validator/state.rs index ed5f48a5c..b2f55cfc6 100644 --- a/collator/src/validator/state.rs +++ b/collator/src/validator/state.rs @@ -10,7 +10,7 @@ use crate::tracing_targets; use crate::types::{BlockSignatures, OnValidatedBlockEvent}; use crate::validator::client::retry::RetryClient; use crate::validator::client::ValidatorClient; -use crate::validator::types::{BlockValidationCandidate, ValidationStatus, ValidatorInfo}; +use crate::validator::types::{ValidationStatus, ValidatorInfo}; use crate::validator::ValidatorEventListener; #[derive(Eq, PartialEq)] diff --git a/collator/src/validator/types.rs b/collator/src/validator/types.rs index fb88d525c..6aba157b9 100644 --- a/collator/src/validator/types.rs +++ b/collator/src/validator/types.rs @@ -2,7 +2,7 @@ use std::convert::TryFrom; use everscale_crypto::ed25519::PublicKey; use everscale_types::cell::HashBytes; -use everscale_types::models::{BlockId, BlockIdShort, ShardIdent, ValidatorDescription}; +use everscale_types::models::{BlockIdShort, ShardIdent, ValidatorDescription}; use tl_proto::{TlRead, TlWrite}; #[derive(Clone)] @@ -26,31 +26,6 @@ impl TryFrom<&ValidatorDescription> for ValidatorInfo { } } -/// Block candidate for validation -#[derive(Debug, Default, Clone, Copy, Eq, Hash, PartialEq, Ord, PartialOrd, TlRead, TlWrite)] -pub(crate) struct BlockValidationCandidate { - pub root_hash: [u8; 32], - pub file_hash: [u8; 32], -} - -impl From for BlockValidationCandidate { - fn from(block_id: BlockId) -> Self { - Self { - root_hash: block_id.root_hash.0, - file_hash: block_id.file_hash.0, - } - } -} - -impl BlockValidationCandidate { - pub fn as_bytes(&self) -> [u8; 64] { - let mut bytes = [0u8; 64]; - bytes[..32].copy_from_slice(&self.root_hash); - bytes[32..].copy_from_slice(&self.file_hash); - bytes - } -} - #[derive(Clone, Debug)] pub enum StopValidationCommand { ByTopShardBlock(BlockIdShort), diff --git a/collator/src/validator/validator.rs b/collator/src/validator/validator.rs index 346cb6c8d..a0cb2b03e 100644 --- a/collator/src/validator/validator.rs +++ b/collator/src/validator/validator.rs @@ -26,7 +26,7 @@ use crate::validator::state::{ NotificationStatus, SessionInfo, ValidationState, ValidationStateStdImpl, }; use crate::validator::types::{ - BlockValidationCandidate, OverlayNumber, StopValidationCommand, ValidationStatus, ValidatorInfo, + OverlayNumber, StopValidationCommand, ValidationStatus, ValidatorInfo, }; // FACTORY diff --git a/core/src/block_strider/mod.rs b/core/src/block_strider/mod.rs index a32b9e007..5f4b9cda9 100644 --- a/core/src/block_strider/mod.rs +++ b/core/src/block_strider/mod.rs @@ -14,9 +14,9 @@ use tycho_util::metrics::HistogramGuard; use tycho_util::FastHashMap; pub use self::provider::{ - ArchiveBlockProvider, ArchiveBlockProviderConfig, BlockProvider, BlockProviderExt, - BlockchainBlockProvider, BlockchainBlockProviderConfig, ChainBlockProvider, EmptyBlockProvider, - OptionalBlockStuff, StorageBlockProvider, + ArchiveBlockProvider, BlockProvider, BlockProviderExt, BlockchainBlockProvider, + BlockchainBlockProviderConfig, ChainBlockProvider, EmptyBlockProvider, OptionalBlockStuff, + StorageBlockProvider, }; pub use self::state::{BlockStriderState, PersistentBlockStriderState, TempBlockStriderState}; pub use self::state_applier::ShardStateApplier; diff --git a/core/src/block_strider/provider/archive_provider.rs b/core/src/block_strider/provider/archive_provider.rs index 0cf9819ad..9d59fc1a3 100644 --- a/core/src/block_strider/provider/archive_provider.rs +++ b/core/src/block_strider/provider/archive_provider.rs @@ -1,178 +1,126 @@ -#![allow(clippy::map_err_ignore)] - use std::sync::Arc; +use std::time::Duration; +use anyhow::Result; use arc_swap::ArcSwapOption; +use bytes::{BufMut, BytesMut}; use everscale_types::models::BlockId; use futures_util::future::BoxFuture; -use serde::{Deserialize, Serialize}; -use tycho_block_util::archive::{Archive, ArchiveWritersPool}; -use tycho_block_util::block::{BlockStuff, BlockStuffAug}; -use tycho_storage::Storage; +use tycho_block_util::archive::Archive; +use tycho_storage::{BlockMetaData, Storage}; use tycho_util::time::now_sec; use crate::block_strider::provider::{BlockProvider, OptionalBlockStuff, ProofChecker}; use crate::blockchain_rpc::BlockchainRpcClient; -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(default)] -#[non_exhaustive] -pub struct ArchiveBlockProviderConfig { - /// Default: 1073741824 (1 GB) - pub save_to_disk_threshold: usize, -} - -impl Default for ArchiveBlockProviderConfig { - fn default() -> Self { - Self { - save_to_disk_threshold: 1024 * 1024 * 1024, - } - } -} - +#[derive(Clone)] +#[repr(transparent)] pub struct ArchiveBlockProvider { - client: BlockchainRpcClient, - writers_pool: ArchiveWritersPool, - proof_checker: ProofChecker, - last_known_archive: ArcSwapOption, + inner: Arc, } impl ArchiveBlockProvider { - pub fn new( - client: BlockchainRpcClient, - storage: Storage, - config: ArchiveBlockProviderConfig, - ) -> Self { - let writers_pool = - ArchiveWritersPool::new(storage.root().path(), config.save_to_disk_threshold); + pub fn new(client: BlockchainRpcClient, storage: Storage) -> Self { let proof_checker = ProofChecker::new(storage); Self { - client, - writers_pool, - proof_checker, - last_known_archive: Default::default(), + inner: Arc::new(Inner { + client, + proof_checker, + last_known_archive: ArcSwapOption::empty(), + }), } } - async fn get_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff { - let res = self.last_known_archive.load_full().map(|archive| { - ( - archive.get_block_by_id(block_id), - archive.get_proof_by_id(block_id), - ) - }); - - let (block, proof) = match res { - Some((Ok(block), Ok(proof))) => (block, proof), - _ => match self.download_archive(block_id.seqno).await { - Ok(archive) => { - let block = archive.get_block_by_id(block_id); - let proof = archive.get_proof_by_id(block_id); - - match (block, proof) { - (Ok(block), Ok(proof)) => { - self.last_known_archive.store(Some(Arc::new(archive))); - (block, proof) - } - _ => return Some(Err(ArchiveProviderError::InvalidArchive.into())), - } + async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff { + let this = self.inner.as_ref(); + + let next_block_seqno = block_id.seqno + 1; + + let block_id; + let archive = loop { + if let Some(archive) = this.last_known_archive.load_full() { + if let Some(mc_block_id) = archive.mc_block_ids.get(&next_block_seqno) { + block_id = *mc_block_id; + break archive; } + } + + // TODO: Impl parallel download + match self.download_archive(next_block_seqno).await { + Ok(archive) => this.last_known_archive.store(Some(Arc::new(archive))), Err(e) => return Some(Err(e)), - }, + } }; - if let Err(e) = self.proof_checker.check_proof(&block, &proof).await { - return Some(Err(e)); - } + let (block, proof) = match ( + archive.get_block_by_id(&block_id), + archive.get_proof_by_id(&block_id), + ) { + (Ok(block), Ok(proof)) => (block, proof), + (Err(e), _) | (_, Err(e)) => return Some(Err(e.into())), + }; - match Self::is_sync(&block) { - Ok(true) => return None, - Ok(false) => { /* do nothing */ } - Err(e) => return Some(Err(e)), + match this.proof_checker.check_proof(block, proof, true).await { + Ok(meta) if is_block_recent(&meta) => Some(Ok(block.clone())), + Ok(_) => None, + Err(e) => Some(Err(e)), } - - Some(Self::construct_block(block)) } - async fn get_next_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff { - let next_block_seqno = block_id.seqno + 1; + async fn get_block_impl(&self, block_id: &BlockId) -> OptionalBlockStuff { + let this = self.inner.as_ref(); - let res = self.last_known_archive.load_full().map(|archive| { - ( - archive.get_block_by_seqno(next_block_seqno), - archive.get_proof_by_seqno(next_block_seqno), - ) - }); - - let (block, proof) = match res { - Some((Ok(block), Ok(proof))) => (block, proof), - _ => match self.download_archive(next_block_seqno).await { - Ok(archive) => { - let block = archive.get_block_by_seqno(next_block_seqno); - let proof = archive.get_proof_by_seqno(next_block_seqno); - - match (block, proof) { - (Ok(block), Ok(proof)) => { - self.last_known_archive.store(Some(Arc::new(archive))); - (block, proof) - } - (Err(e), _) | (_, Err(e)) => return Some(Err(e)), - } - } - Err(e) => return Some(Err(e)), - }, + let Some(archive) = this.last_known_archive.load_full() else { + return Some(Err(anyhow::anyhow!("no archive available"))); }; - if let Err(e) = self.proof_checker.check_proof(&block, &proof).await { - return Some(Err(e)); - } + let (block, proof) = match ( + archive.get_block_by_id(block_id), + archive.get_proof_by_id(block_id), + ) { + (Ok(block), Ok(proof)) => (block, proof), + (Err(e), _) | (_, Err(e)) => return Some(Err(e.into())), + }; - match Self::is_sync(&block) { - Ok(true) => return None, - Ok(false) => { /* do nothing */ } - Err(e) => return Some(Err(e)), + if let Err(e) = this.proof_checker.check_proof(block, proof, true).await { + return Some(Err(e)); } - Some(Self::construct_block(block)) + // NOTE: Always return the block by id even if it's not recent + Some(Ok(block.clone())) } - async fn download_archive(&self, seqno: u32) -> anyhow::Result { - let writers_pool = self.writers_pool.clone(); - let blockchain_rpc_client = self.client.clone(); + async fn download_archive(&self, seqno: u32) -> Result { + let client = &self.inner.client; loop { - let mut writer = writers_pool.acquire(); - - blockchain_rpc_client - .download_archive(seqno, &mut writer) - .await?; + let mut archive_data = BytesMut::new().writer(); + client.download_archive(seqno, &mut archive_data).await?; + let archive_data = archive_data.into_inner().freeze(); - let archive = match writer.parse_archive() { - Ok(archive) => archive, + match Archive::new(archive_data) { + Ok(archive) => return Ok(archive), Err(e) => { tracing::error!(seqno, "failed to parse downloaded archive: {e}"); + + // TODO: backoff + tokio::time::sleep(Duration::from_millis(100)).await; continue; } - }; - - return Ok(archive); + } } } +} - fn is_sync(block: &BlockStuff) -> anyhow::Result { - Ok(block.load_info()?.gen_utime + 600 > now_sec()) - } +fn is_block_recent(meta: &BlockMetaData) -> bool { + meta.gen_utime + 600 > now_sec() +} - fn construct_block(block: BlockStuff) -> anyhow::Result { - match everscale_types::boc::BocRepr::encode(block.block().clone()) { - Ok(archive_data) => Ok(BlockStuffAug::new( - BlockStuff::with_block(*block.id(), block.into_block()), - archive_data, - )), - Err(e) => Err(e.into()), - } - } +struct Inner { + client: BlockchainRpcClient, + proof_checker: ProofChecker, + last_known_archive: ArcSwapOption, } impl BlockProvider for ArchiveBlockProvider { @@ -187,9 +135,3 @@ impl BlockProvider for ArchiveBlockProvider { Box::pin(self.get_block_impl(block_id)) } } - -#[derive(thiserror::Error, Debug)] -pub(crate) enum ArchiveProviderError { - #[error("Invalid archive")] - InvalidArchive, -} diff --git a/core/src/block_strider/provider/blockchain_provider.rs b/core/src/block_strider/provider/blockchain_provider.rs index e0bc0c52e..5714405e9 100644 --- a/core/src/block_strider/provider/blockchain_provider.rs +++ b/core/src/block_strider/provider/blockchain_provider.rs @@ -3,6 +3,7 @@ use std::time::Duration; use everscale_types::models::*; use futures_util::future::BoxFuture; use serde::{Deserialize, Serialize}; +use tycho_block_util::archive::WithArchiveData; use tycho_block_util::block::{BlockProofStuff, BlockStuff}; use tycho_storage::Storage; use tycho_util::serde_helpers; @@ -91,22 +92,13 @@ impl BlockchainBlockProvider { BlockProofStuff::deserialize(&block_id, &proof_data, is_link), ) { (Ok(block), Ok(proof)) => { - if let Err(e) = self.proof_checker.check_proof(&block, &proof).await { + let proof = WithArchiveData::new(proof, proof_data); + if let Err(e) = self.proof_checker.check_proof(&block, &proof, true).await { handle.reject(); tracing::error!("got invalid mc block proof: {e}"); break 'res; } - if let Err(e) = self - .proof_checker - .store_block_proof(&block, proof, proof_data.into()) - .await - { - handle.reject(); - tracing::error!("failed to store block proof: {e}"); - break 'res; - } - handle.accept(); return Some(Ok(block.with_archive_data(block_data))); } @@ -152,22 +144,13 @@ impl BlockchainBlockProvider { BlockProofStuff::deserialize(&block_id, &proof_data, is_link), ) { (Ok(block), Ok(proof)) => { - if let Err(e) = self.proof_checker.check_proof(&block, &proof).await { + let proof = WithArchiveData::new(proof, proof_data); + if let Err(e) = self.proof_checker.check_proof(&block, &proof, true).await { handle.reject(); tracing::error!("got invalid shard block proof: {e}"); break 'res; } - if let Err(e) = self - .proof_checker - .store_block_proof(&block, proof, proof_data.into()) - .await - { - handle.reject(); - tracing::error!("failed to store block proof: {e}"); - break 'res; - } - handle.accept(); return Some(Ok(block.with_archive_data(block_data))); } diff --git a/core/src/block_strider/provider/mod.rs b/core/src/block_strider/provider/mod.rs index 67d544f53..4469b6204 100644 --- a/core/src/block_strider/provider/mod.rs +++ b/core/src/block_strider/provider/mod.rs @@ -3,19 +3,19 @@ use std::pin::pin; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; -use anyhow::Context; +use anyhow::{Context, Result}; use arc_swap::ArcSwapAny; use everscale_types::models::BlockId; use futures_util::future::{self, BoxFuture}; use tycho_block_util::block::{ - check_with_master_state, check_with_prev_key_block_proof, BlockProofStuff, BlockStuff, - BlockStuffAug, + check_with_master_state, check_with_prev_key_block_proof, BlockProofStuff, BlockProofStuffAug, + BlockStuff, BlockStuffAug, }; use tycho_block_util::state::ShardStateStuff; -use tycho_storage::{BriefBlockInfo, Storage}; +use tycho_storage::{BlockMetaData, BlockProofHandle, Storage}; use tycho_util::metrics::HistogramGuard; -pub use self::archive_provider::{ArchiveBlockProvider, ArchiveBlockProviderConfig}; +pub use self::archive_provider::ArchiveBlockProvider; pub use self::blockchain_provider::{BlockchainBlockProvider, BlockchainBlockProviderConfig}; pub use self::storage_provider::StorageBlockProvider; @@ -184,8 +184,9 @@ impl ProofChecker { pub async fn check_proof( &self, block: &BlockStuff, - proof: &BlockProofStuff, - ) -> anyhow::Result<()> { + proof: &BlockProofStuffAug, + store_proof_on_success: bool, + ) -> Result { // TODO: Add labels with shard? let _histogram = HistogramGuard::begin("tycho_core_check_block_proof_time"); @@ -200,16 +201,27 @@ impl ProofChecker { anyhow::ensure!(is_masterchain ^ proof.is_link(), "unexpected proof type"); let (virt_block, virt_block_info) = proof.pre_check_block_proof()?; + let meta = BlockMetaData { + is_key_block: virt_block_info.key_block, + gen_utime: virt_block_info.gen_utime, + mc_ref_seqno: is_masterchain.then(|| block.id().seqno), + }; + if !is_masterchain { - return Ok(()); + if store_proof_on_success { + // Store proof link + self.storage + .block_storage() + .store_block_proof(proof, BlockProofHandle::New(meta)) + .await?; + } + return Ok(meta); } - let handle = { - let block_handles = self.storage.block_handle_storage(); - block_handles - .load_key_block_handle(virt_block_info.prev_key_block_seqno) - .context("failed to load prev key block handle")? - }; + let block_handles = self.storage.block_handle_storage(); + let handle = block_handles + .load_key_block_handle(virt_block_info.prev_key_block_seqno) + .context("failed to load prev key block handle")?; if handle.id().seqno == 0 { let zerostate = 'zerostate: { @@ -228,7 +240,7 @@ impl ProofChecker { zerostate }; - check_with_master_state(proof, &zerostate, &virt_block, &virt_block_info) + check_with_master_state(proof, &zerostate, &virt_block, &virt_block_info)?; } else { let prev_key_block_proof = 'prev_proof: { if let Some(prev_proof) = self.cached_prev_key_block_proof.load_full() { @@ -256,29 +268,17 @@ impl ProofChecker { &prev_key_block_proof, &virt_block, &virt_block_info, - ) + )?; } - } - async fn store_block_proof( - &self, - block: &BlockStuff, - proof: BlockProofStuff, - proof_data: Vec, - ) -> anyhow::Result<()> { - let block_info = block.load_info()?; - let block_meta = BriefBlockInfo::from(&block_info); - - let proof_handle = block_meta - .with_mc_seq_no(block_info.min_ref_mc_seqno) - .into(); - - self.storage - .block_storage() - .store_block_proof(&proof.with_archive_data(proof_data), proof_handle) - .await?; - - Ok(()) + if store_proof_on_success { + // Store proof + self.storage + .block_storage() + .store_block_proof(proof, BlockProofHandle::New(meta)) + .await?; + } + Ok(meta) } } diff --git a/core/src/block_strider/state_applier.rs b/core/src/block_strider/state_applier.rs index d729b59bb..4c0e5ba40 100644 --- a/core/src/block_strider/state_applier.rs +++ b/core/src/block_strider/state_applier.rs @@ -96,7 +96,7 @@ where let (state, handles) = if handle.meta().has_state() { // Fast path when state is already applied - let state = state_storage.load_state(handle.id()); + let state = state_storage.load_state(handle.id()).await?; (state, RefMcStateHandles::Skip) } else { // Load previous states diff --git a/core/src/blockchain_rpc/client.rs b/core/src/blockchain_rpc/client.rs index 93da4bca6..fe056d2a2 100644 --- a/core/src/blockchain_rpc/client.rs +++ b/core/src/blockchain_rpc/client.rs @@ -364,6 +364,7 @@ impl BlockchainRpcClient { ))) } + // TODO: Split into `find_` and `download_` methods to not spam the network. pub async fn download_archive( &self, mc_seqno: u32, diff --git a/storage/src/models/block_meta.rs b/storage/src/models/block_meta.rs index 72122d8e5..21d599a78 100644 --- a/storage/src/models/block_meta.rs +++ b/storage/src/models/block_meta.rs @@ -21,32 +21,6 @@ impl BlockMetaData { } } -impl BriefBlockInfo { - pub fn with_mc_seq_no(self, mc_seq_no: u32) -> BlockMetaData { - BlockMetaData { - is_key_block: self.is_key_block, - gen_utime: self.gen_utime, - mc_ref_seqno: mc_seq_no, - } - } -} - -#[derive(Debug, Copy, Clone)] -pub struct BriefBlockInfo { - pub is_key_block: bool, - pub gen_utime: u32, - pub after_split: bool, -} - -impl From<&BlockInfo> for BriefBlockInfo { - fn from(info: &BlockInfo) -> Self { - Self { - is_key_block: info.key_block, - gen_utime: info.gen_utime, - after_split: info.after_split, - } - } -} #[derive(Debug, Default)] pub struct BlockMeta { flags: AtomicU64, diff --git a/storage/src/models/mod.rs b/storage/src/models/mod.rs index 3459a4583..3d4579231 100644 --- a/storage/src/models/mod.rs +++ b/storage/src/models/mod.rs @@ -1,5 +1,5 @@ pub use block_handle::{BlockHandle, BlockHandleCache, WeakBlockHandle}; -pub use block_meta::{BlockMeta, BlockMetaData, BriefBlockInfo, BriefBlockMeta}; +pub use block_meta::{BlockMeta, BlockMetaData, BriefBlockMeta}; mod block_handle; mod block_meta; diff --git a/storage/src/store/block/mod.rs b/storage/src/store/block/mod.rs index df3848240..1a8270254 100644 --- a/storage/src/store/block/mod.rs +++ b/storage/src/store/block/mod.rs @@ -738,6 +738,8 @@ impl BlockStorage { } } + // TODO: A single condition `prev_id.is_none()` might be enough, + // but what if we started right in the middle of the archive? let is_first_archive = archive_id == 0 && prev_id.is_none(); if is_first_archive || mc_seqno.saturating_sub(archive_id) >= ARCHIVE_PACKAGE_SIZE { self.archive_ids.write().insert(mc_seqno);