diff --git a/storage/src/db/file_db/mod.rs b/storage/src/db/file_db/mod.rs index 53eecdea8..c4f73b1f0 100644 --- a/storage/src/db/file_db/mod.rs +++ b/storage/src/db/file_db/mod.rs @@ -1,5 +1,5 @@ use std::fs::File; -use std::io::Write; +use std::io::{Read, Seek, SeekFrom, Write}; use std::path::{Path, PathBuf}; use anyhow::{Context, Result}; @@ -10,8 +10,8 @@ pub use mapped_file::MappedFile; mod mapped_file; pub struct FileDb { - pub file: File, - pub path: PathBuf, + file: File, + path: PathBuf, } impl FileDb { @@ -25,7 +25,7 @@ impl FileDb { .truncate(true) .read(true) .open(&path) - .context("Failed to create file")?; + .context(format!("Failed to create file {:?}", path.as_ref()))?; Ok(Self { file, @@ -47,4 +47,24 @@ impl FileDb { self.file.flush()?; Ok(()) } + + pub fn seek(&mut self, pos: SeekFrom) -> Result<()> { + self.file.seek(pos)?; + Ok(()) + } + + pub fn file(&self) -> &File { + &self.file + } + + pub fn read(&mut self, buf: &mut [u8]) -> Result { + let bytes = self.file.read(buf)?; + Ok(bytes) + } +} + +impl Into for FileDb { + fn into(self) -> File { + self.file + } } diff --git a/storage/src/store/persistent_state/cell_writer.rs b/storage/src/store/persistent_state/cell_writer.rs deleted file mode 100644 index 3c2501279..000000000 --- a/storage/src/store/persistent_state/cell_writer.rs +++ /dev/null @@ -1,425 +0,0 @@ -use std::collections::hash_map; -use std::fs; -use std::fs::File; -use std::io::{Read, Seek, SeekFrom, Write}; -use std::os::unix::io::AsRawFd; -use std::path::{Path, PathBuf}; -use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::Arc; -use std::time::Instant; - -use anyhow::{Context, Result}; -use everscale_types::cell::HashBytes; -use everscale_types::models::BlockId; -use num_traits::ToPrimitive; -use smallvec::SmallVec; -use tycho_util::byte_reader::ByteOrderRead; -use tycho_util::FastHashMap; - -use crate::db::Db; -use crate::{db, FileDb}; - -pub struct CellWriter<'a> { - db: &'a Db, - base_path: &'a Path, -} - -impl<'a> CellWriter<'a> { - pub fn clear_temp(base_path: &Path, master_block_id: &BlockId, block_id: &BlockId) { - tracing::info!("Cleaning temporary persistent state files"); - - let file_path = Self::make_pss_path(base_path, master_block_id, block_id); - let int_file_path = Self::make_rev_pss_path(&file_path); - let temp_file_path = Self::make_temp_pss_path(&file_path); - - let _ = fs::remove_file(int_file_path); - let _ = fs::remove_file(temp_file_path); - } - - pub fn make_pss_path(base_path: &Path, mc_block_id: &BlockId, block_id: &BlockId) -> PathBuf { - let dir_path = mc_block_id.seqno.to_string(); - let file_name = block_id.root_hash.to_string(); - base_path.join(dir_path).join(file_name) - } - - pub fn make_temp_pss_path(file_path: &Path) -> PathBuf { - file_path.with_extension("temp") - } - - pub fn make_rev_pss_path(file_path: &Path) -> PathBuf { - file_path.with_extension("rev") - } - - #[allow(unused)] - pub fn new(db: &'a Db, base_path: &'a Path) -> Self { - Self { db, base_path } - } - - pub fn write( - &self, - master_block_id: &BlockId, - block_id: &BlockId, - state_root_hash: &HashBytes, - is_cancelled: Arc, - ) -> Result { - let file_path = Self::make_pss_path(self.base_path, master_block_id, block_id); - - // Load cells from db in reverse order into the temp file - tracing::info!(block = %block_id, "Started loading cells"); - let now = Instant::now(); - let mut intermediate = write_rev_cells( - self.db, - Self::make_rev_pss_path(&file_path), - state_root_hash.as_array(), - is_cancelled.clone(), - ) - .map_err(|e| { - anyhow::Error::msg(format!("Failed to write reversed cells data. Inner: {e:?}")) - })?; - - let temp_file_path = Self::make_temp_pss_path(&file_path); - - tracing::info!(block = %block_id, "Creating intermediate file {:?}", file_path); - - let file = fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(&temp_file_path) - .context("Failed to create target file")?; - - let cell_count = intermediate.cell_sizes.len() as u32; - tracing::info!( - elapsed = %humantime::format_duration(now.elapsed()), - cell_count, - block = %block_id, - "Finished loading cells" - ); - - // Compute offset type size (usually 4 bytes) - let offset_size = - std::cmp::min(number_of_bytes_to_fit(intermediate.total_size), 8) as usize; - - // Reserve space for the file - alloc_file( - &file, - 22 + offset_size * (1 + cell_count as usize) + (intermediate.total_size as usize), - )?; - - // Write cells data in BOC format - let mut buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN / 2, file); - - // Header | current len: 0 - let flags = 0b1000_0000u8 | (REF_SIZE as u8); - buffer.write_all(&[0xb5, 0xee, 0x9c, 0x72, flags, offset_size as u8])?; - - // Unique cell count | current len: 6 - buffer.write_all(&cell_count.to_be_bytes())?; - - // Root count | current len: 10 - buffer.write_all(&1u32.to_be_bytes())?; - - // Absent cell count | current len: 14 - buffer.write_all(&[0, 0, 0, 0])?; - - // Total cell size | current len: 18 - buffer.write_all(&intermediate.total_size.to_be_bytes()[(8 - offset_size)..8])?; - - // Root index | current len: 18 + offset_size - buffer.write_all(&[0, 0, 0, 0])?; - - // Cells index | current len: 22 + offset_size - tracing::info!(block = %block_id, "Started building index"); - { - let mut next_offset = 0; - for &cell_size in intermediate.cell_sizes.iter().rev() { - next_offset += cell_size as u64; - buffer.write_all(&next_offset.to_be_bytes()[(8 - offset_size)..8])?; - } - } - tracing::info!(block = %block_id, "Finished building index"); - - // Cells | current len: 22 + offset_size * (1 + cell_sizes.len()) - let mut cell_buffer = [0; 2 + 128 + 4 * REF_SIZE]; - for (i, &cell_size) in intermediate.cell_sizes.iter().rev().enumerate() { - if i % 1000 == 0 && is_cancelled.load(Ordering::Relaxed) { - anyhow::bail!("Persistent state writing cancelled.") - } - intermediate.total_size -= cell_size as u64; - intermediate - .file_db - .file - .seek(SeekFrom::Start(intermediate.total_size))?; - intermediate - .file_db - .file - .read_exact(&mut cell_buffer[..cell_size as usize])?; - - let d1 = cell_buffer[0]; - let d2 = cell_buffer[1]; - let ref_count = (d1 & 7) as usize; - let data_size = ((d2 >> 1) + (d2 & 1 != 0) as u8) as usize; - - let ref_offset = 2 + data_size; - for r in 0..ref_count { - let ref_offset = ref_offset + r * REF_SIZE; - let slice = &mut cell_buffer[ref_offset..ref_offset + REF_SIZE]; - - let index = u32::from_be_bytes(slice.try_into().unwrap()); - slice.copy_from_slice(&(cell_count - index - 1).to_be_bytes()); - } - - buffer.write_all(&cell_buffer[..cell_size as usize])?; - } - - buffer.flush()?; - std::fs::rename(&temp_file_path, &file_path)?; - - Ok(file_path) - } -} - -struct IntermediateState { - file_db: FileDb, - cell_sizes: Vec, - total_size: u64, - _remove_on_drop: RemoveOnDrop, -} - -fn write_rev_cells( - db: &Db, - file_path: PathBuf, - state_root_hash: &[u8; 32], - is_cancelled: Arc, -) -> Result { - todo!() - - /*enum StackItem { - New([u8; 32]), - Loaded(LoadedCell), - } - - struct LoadedCell { - hash: [u8; 32], - d1: u8, - d2: u8, - data: SmallVec<[u8; 128]>, - indices: SmallVec<[u32; 4]>, - } - - tracing::info!("Creating rev file {:?}", file_path); - let file = std::fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(&file_path) - .context("Failed to write rev file")?; - let remove_on_drop = RemoveOnDrop(file_path); - - let raw = db.raw().as_ref(); - let read_options = db.cells.read_config(); - let cf = db.cells.cf(); - - let mut references_buffer = SmallVec::<[[u8; 32]; 4]>::with_capacity(4); - - let mut indices = FastHashMap::default(); - let mut remap = FastHashMap::default(); - let mut cell_sizes = Vec::::with_capacity(FILE_BUFFER_LEN); - let mut stack = Vec::with_capacity(32); - - let mut total_size = 0u64; - let mut iteration = 0u32; - let mut remap_index = 0u32; - - stack.push((iteration, StackItem::New(*state_root_hash))); - indices.insert(*state_root_hash, (iteration, false)); - - let mut temp_file_buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN, file); - - while let Some((index, data)) = stack.pop() { - if iteration % 1000 == 0 && is_cancelled.load(Ordering::Relaxed) { - anyhow::bail!("Persistent state writing cancelled.") - } - - match data { - StackItem::New(hash) => { - let value = raw - .get_pinned_cf_opt(&cf, hash, read_options)? - .ok_or(CellWriterError::CellNotFound)?; - - let value = value.as_ref(); - - let mut value = match db::refcount::strip_refcount(value) { - Some(bytes) => bytes, - None => return Err(CellWriterError::CellNotFound.into()), - }; - if value.is_empty() { - return Err(CellWriterError::InvalidCell.into()); - } - - let cell_data = ton_types::CellData::deserialize(&mut value)?; - let bit_length = cell_data.bit_length(); - let d2 = (((bit_length >> 2) as u8) & !0b1) | ((bit_length % 8 != 0) as u8); - - let references_count = cell_data.references_count(); - let cell_type = cell_data - .cell_type() - .to_u8() - .ok_or(CellWriterError::InvalidCell)?; - - let level_mask = cell_data.level_mask().mask(); - let d1 = - references_count as u8 | (((cell_type != 0x01) as u8) << 3) | (level_mask << 5); - let data = cell_data.data(); - - for _ in 0..references_count { - let hash = HashBytes::from(value.read_u256()?); - references_buffer.push(hash.inner()); - } - - let mut reference_indices = SmallVec::with_capacity(references_buffer.len()); - - let mut indices_buffer = [0; 4]; - let mut keys = [std::ptr::null(); 4]; - let mut preload_count = 0; - - for hash in &references_buffer { - let index = match indices.entry(*hash) { - hash_map::Entry::Vacant(entry) => { - remap_index += 1; - - entry.insert((remap_index, false)); - - indices_buffer[preload_count] = remap_index; - keys[preload_count] = hash.as_ptr(); - preload_count += 1; - - remap_index - } - hash_map::Entry::Occupied(entry) => { - let (remap_index, written) = *entry.get(); - if !written { - indices_buffer[preload_count] = remap_index; - keys[preload_count] = hash.as_ptr(); - preload_count += 1; - } - remap_index - } - }; - - reference_indices.push(index); - } - - stack.push(( - index, - StackItem::Loaded(LoadedCell { - hash, - d1, - d2, - data: SmallVec::from_slice(data), - indices: reference_indices, - }), - )); - - if preload_count > 0 { - indices_buffer[..preload_count].reverse(); - keys[..preload_count].reverse(); - - for i in 0..preload_count { - let index = indices_buffer[i]; - let hash = unsafe { *(keys[i] as *const [u8; 32]) }; - stack.push((index, StackItem::New(hash))); - } - } - - references_buffer.clear(); - } - StackItem::Loaded(loaded) => { - match remap.entry(index) { - hash_map::Entry::Vacant(entry) => { - entry.insert(iteration.to_be_bytes()); - } - hash_map::Entry::Occupied(_) => continue, - }; - - if let Some((_, written)) = indices.get_mut(&loaded.hash) { - *written = true; - } - - iteration += 1; - if iteration % 100000 == 0 { - tracing::info!(iteration); - } - - let cell_size = 2 + loaded.data.len() + loaded.indices.len() * REF_SIZE; - cell_sizes.push(cell_size as u8); - total_size += cell_size as u64; - - temp_file_buffer.write_all(&[loaded.d1, loaded.d2])?; - temp_file_buffer.write_all(&loaded.data)?; - for index in loaded.indices { - let index = remap.get(&index).with_context(|| { - format!("Child not found. Iteration {iteration}. Child {index}") - })?; - temp_file_buffer.write_all(index)?; - } - } - } - } - - let mut file = temp_file_buffer.into_inner()?; - file.flush()?; - - Ok(IntermediateState { - file, - cell_sizes, - total_size, - _remove_on_drop: remove_on_drop, - })*/ -} - -#[cfg(not(target_os = "macos"))] -fn alloc_file(file: &File, len: usize) -> std::io::Result<()> { - let res = unsafe { libc::posix_fallocate(file.as_raw_fd(), 0, len as i64) }; - if res == 0 { - Ok(()) - } else { - Err(std::io::Error::last_os_error()) - } -} - -#[cfg(target_os = "macos")] -pub fn alloc_file(file: &File, len: usize) -> std::io::Result<()> { - let res = unsafe { libc::ftruncate(file.as_raw_fd(), len as i64) }; - if res < 0 { - Err(std::io::Error::last_os_error()) - } else { - Ok(()) - } -} - -fn number_of_bytes_to_fit(l: u64) -> u32 { - 8 - l.leading_zeros() / 8 -} - -struct RemoveOnDrop(PathBuf); - -impl Drop for RemoveOnDrop { - fn drop(&mut self) { - if let Err(e) = std::fs::remove_file(&self.0) { - tracing::error!(path = %self.0.display(), "failed to remove file: {e:?}"); - } - } -} - -const REF_SIZE: usize = std::mem::size_of::(); -const FILE_BUFFER_LEN: usize = 128 * 1024 * 1024; // 128 MB - -#[derive(thiserror::Error, Debug)] -enum CellWriterError { - #[error("Cell not found in cell db")] - CellNotFound, - #[error("Invalid cell")] - InvalidCell, -} diff --git a/storage/src/store/persistent_state/mod.rs b/storage/src/store/persistent_state/mod.rs index 56f6141bc..7c1d43da2 100644 --- a/storage/src/store/persistent_state/mod.rs +++ b/storage/src/store/persistent_state/mod.rs @@ -1,5 +1,6 @@ use std::fs; -use std::path::PathBuf; +use std::io::SeekFrom; +use std::path::{Path, PathBuf}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -11,10 +12,8 @@ use tokio::time::Instant; use crate::db::Db; use crate::store::BlockHandleStorage; - -use self::cell_writer::*; - -mod cell_writer; +use crate::utils::CellWriter; +use crate::FileDb; const KEY_BLOCK_UTIME_STEP: u32 = 86400; @@ -26,13 +25,13 @@ pub struct PersistentStateStorage { } impl PersistentStateStorage { - pub async fn new( + pub fn new( file_db_path: PathBuf, db: Arc, block_handle_storage: Arc, ) -> Result { let dir = file_db_path.join("states"); - tokio::fs::create_dir_all(&dir).await?; + fs::create_dir_all(&dir)?; let is_cancelled = Arc::new(Default::default()); Ok(Self { @@ -45,34 +44,34 @@ impl PersistentStateStorage { pub async fn save_state( &self, + mc_block_id: &BlockId, block_id: &BlockId, - master_block_id: &BlockId, - state_root_hash: &HashBytes, + root_hash: &HashBytes, ) -> Result<()> { let block_id = block_id.clone(); - let master_block_id = master_block_id.clone(); - let state_root_hash = *state_root_hash; + let root_hash = *root_hash; let db = self.db.clone(); - let base_path = self.storage_path.clone(); - let is_cancelled = self.is_cancelled.clone(); + let is_cancelled = Some(self.is_cancelled.clone()); + let base_path = self.get_state_file_path(&mc_block_id, &block_id); tokio::task::spawn_blocking(move || { let cell_writer = CellWriter::new(&db, &base_path); - match cell_writer.write(&master_block_id, &block_id, &state_root_hash, is_cancelled) { - Ok(path) => { + match cell_writer.write(&root_hash.0, is_cancelled) { + Ok(()) => { tracing::info!( block_id = %block_id, - path = %path.display(), - "Successfully wrote persistent state to a file", + "successfully wrote persistent state to a file", ); } Err(e) => { tracing::error!( block_id = %block_id, - "Writing persistent state failed. Err: {e:?}" + "writing persistent state failed: {e:?}" ); - CellWriter::clear_temp(&base_path, &master_block_id, &block_id); + if let Err(e) = cell_writer.remove(&root_hash.0) { + tracing::error!(%block_id, "{e}") + } } } }) @@ -87,15 +86,11 @@ impl PersistentStateStorage { offset: u64, size: u64, ) -> Option> { - use tokio::io::{AsyncReadExt, AsyncSeekExt, SeekFrom}; - // TODO: cache file handles - let mut file = tokio::fs::File::open(self.get_state_file_path(mc_block_id, block_id)) - .await - .ok()?; + let mut file_db = FileDb::open(self.get_state_file_path(mc_block_id, block_id)).ok()?; - if let Err(e) = file.seek(SeekFrom::Start(offset)).await { - tracing::error!("Failed to seek state file offset. Err: {e:?}"); + if let Err(e) = file_db.seek(SeekFrom::Start(offset)) { + tracing::error!("failed to seek state file offset: {e:?}"); return None; } @@ -103,15 +98,15 @@ impl PersistentStateStorage { let mut result = BytesMut::with_capacity(size as usize); let now = Instant::now(); loop { - match file.read_buf(&mut result).await { + match file_db.read(&mut result) { Ok(bytes_read) => { - tracing::debug!("Reading state file. Bytes read: {}", bytes_read); + tracing::debug!(bytes_read, "reading state file"); if bytes_read == 0 || bytes_read == size as usize { break; } } Err(e) => { - tracing::error!("Failed to read state file. Err: {e:?}"); + tracing::error!("failed to read state file. Err: {e:?}"); return None; } } @@ -134,14 +129,17 @@ impl PersistentStateStorage { let dir_path = mc_block.seqno.to_string(); let path = self.storage_path.join(dir_path); if !path.exists() { - tracing::info!(mc_block = %mc_block, "Creating persistent state directory"); + tracing::info!(mc_block = %mc_block, "creating persistent state directory"); fs::create_dir(path)?; } Ok(()) } fn get_state_file_path(&self, mc_block_id: &BlockId, block_id: &BlockId) -> PathBuf { - CellWriter::make_pss_path(&self.storage_path, mc_block_id, block_id) + self.storage_path + .clone() + .join(mc_block_id.seqno.to_string()) + .join(block_id.root_hash.to_string()) } pub fn cancel(&self) { @@ -149,7 +147,7 @@ impl PersistentStateStorage { } pub async fn clear_old_persistent_states(&self) -> Result<()> { - tracing::info!("Started clearing old persistent state directories"); + tracing::info!("started clearing old persistent state directories"); let start = Instant::now(); // Keep 2 days of states + 1 state before @@ -178,7 +176,7 @@ impl PersistentStateStorage { tracing::info!( elapsed = %humantime::format_duration(start.elapsed()), - "Clearing old persistent state directories completed" + "clearing old persistent state directories completed" ); Ok(()) @@ -210,16 +208,16 @@ impl PersistentStateStorage { } for dir in directories_to_remove { - tracing::info!(dir = %dir.display(), "Removing an old persistent state directory"); + tracing::info!(dir = %dir.display(), "removing an old persistent state directory"); if let Err(e) = fs::remove_dir_all(&dir) { - tracing::error!(dir = %dir.display(), "Failed to remove an old persistent state: {e:?}"); + tracing::error!(dir = %dir.display(), "failed to remove an old persistent state: {e:?}"); } } for file in files_to_remove { - tracing::info!(file = %file.display(), "Removing file"); + tracing::info!(file = %file.display(), "removing file"); if let Err(e) = fs::remove_file(&file) { - tracing::error!(file = %file.display(), "Failed to remove file: {e:?}"); + tracing::error!(file = %file.display(), "failed to remove file: {e:?}"); } } diff --git a/storage/src/store/shard_state/mod.rs b/storage/src/store/shard_state/mod.rs index ee189fafa..cde8dedda 100644 --- a/storage/src/store/shard_state/mod.rs +++ b/storage/src/store/shard_state/mod.rs @@ -19,7 +19,6 @@ use tycho_block_util::block::*; use tycho_block_util::state::*; mod cell_storage; -mod cell_writer; mod entries_buffer; mod replace_transaction; mod shard_state_reader; diff --git a/storage/src/store/shard_state/cell_writer.rs b/storage/src/utils/cell_writer.rs similarity index 89% rename from storage/src/store/shard_state/cell_writer.rs rename to storage/src/utils/cell_writer.rs index e04eeb60a..f51468419 100644 --- a/storage/src/store/shard_state/cell_writer.rs +++ b/storage/src/utils/cell_writer.rs @@ -1,15 +1,20 @@ use std::collections::hash_map; +use std::fs; use std::fs::File; use std::io::{Read, Seek, SeekFrom, Write}; use std::os::unix::io::AsRawFd; use std::path::{Path, PathBuf}; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; use anyhow::{Context, Result}; use smallvec::SmallVec; -use crate::db::Db; use tycho_util::FastHashMap; +use crate::db::Db; +use crate::FileDb; + pub struct CellWriter<'a> { db: &'a Db, base_path: &'a Path, @@ -22,19 +27,14 @@ impl<'a> CellWriter<'a> { } #[allow(unused)] - pub fn write(&self, root_hash: &[u8; 32]) -> Result<()> { + pub fn write(&self, root_hash: &[u8; 32], is_cancelled: Option>) -> Result<()> { // Open target file in advance to get the error immediately (if any) let file_path = self.base_path.join(hex::encode(root_hash)); - let file = std::fs::OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(file_path) - .context("Failed to create target file")?; + let file_db = FileDb::open(file_path)?; // Load cells from db in reverse order into the temp file tracing::info!("started loading cells"); - let mut intermediate = write_rev_cells(self.db, self.base_path, root_hash) + let mut intermediate = write_rev_cells(self.db, self.base_path, root_hash, &is_cancelled) .context("Failed to write reversed cells data")?; tracing::info!("finished loading cells"); let cell_count = intermediate.cell_sizes.len() as u32; @@ -45,12 +45,12 @@ impl<'a> CellWriter<'a> { // Reserve space for the file alloc_file( - &file, + file_db.file(), 22 + offset_size * (1 + cell_count as usize) + (intermediate.total_size as usize), )?; // Write cells data in BOC format - let mut buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN / 2, file); + let mut buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN / 2, file_db.file()); // Header | current len: 0 let flags = 0b1000_0000u8 | (REF_SIZE as u8); @@ -84,7 +84,13 @@ impl<'a> CellWriter<'a> { // Cells | current len: 22 + offset_size * (1 + cell_sizes.len()) let mut cell_buffer = [0; 2 + 128 + 4 * REF_SIZE]; - for &cell_size in intermediate.cell_sizes.iter().rev() { + for (i, &cell_size) in intermediate.cell_sizes.iter().rev().enumerate() { + if let Some(is_cancelled) = is_cancelled.as_ref() { + if i % 1000 == 0 && is_cancelled.load(Ordering::Relaxed) { + anyhow::bail!("Cell writing cancelled.") + } + } + intermediate.total_size -= cell_size as u64; intermediate .file @@ -114,6 +120,14 @@ impl<'a> CellWriter<'a> { Ok(()) } + + pub fn remove(&self, root_hash: &[u8; 32]) -> Result<()> { + let file_path = self.base_path.join(hex::encode(root_hash)); + fs::remove_file(&file_path).context(format!( + "Failed to remove persistent state file {:?}", + file_path + )) + } } struct IntermediateState { @@ -127,6 +141,7 @@ fn write_rev_cells>( db: &Db, base_path: P, root_hash: &[u8; 32], + is_cancelled: &Option>, ) -> Result { enum StackItem { New([u8; 32]), @@ -146,13 +161,7 @@ fn write_rev_cells>( .join(hex::encode(root_hash)) .with_extension("temp"); - let file = std::fs::OpenOptions::new() - .read(true) - .write(true) - .create(true) - .truncate(true) - .open(&file_path) - .context("Failed to create temp file")?; + let file_db = FileDb::open(&file_path)?; let remove_on_drop = RemoveOnDrop(file_path); let raw = db.raw().as_ref(); @@ -173,9 +182,15 @@ fn write_rev_cells>( stack.push((iteration, StackItem::New(*root_hash))); indices.insert(*root_hash, (iteration, false)); - let mut temp_file_buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN, file); + let mut temp_file_buffer = std::io::BufWriter::with_capacity(FILE_BUFFER_LEN, file_db.into()); while let Some((index, data)) = stack.pop() { + if let Some(is_cancelled) = is_cancelled { + if iteration % 1000 == 0 && is_cancelled.load(Ordering::Relaxed) { + anyhow::bail!("Persistent state writing cancelled.") + } + } + match data { StackItem::New(hash) => { let value = raw @@ -280,7 +295,7 @@ fn write_rev_cells>( } } - let mut file = temp_file_buffer.into_inner()?; + let mut file: File = temp_file_buffer.into_inner()?; file.flush()?; Ok(IntermediateState { @@ -382,7 +397,7 @@ struct RemoveOnDrop(PathBuf); impl Drop for RemoveOnDrop { fn drop(&mut self) { - if let Err(e) = std::fs::remove_file(&self.0) { + if let Err(e) = fs::remove_file(&self.0) { tracing::error!(path = %self.0.display(), "failed to remove file: {e:?}"); } } diff --git a/storage/src/utils/mod.rs b/storage/src/utils/mod.rs index d28b57e54..359217fe1 100644 --- a/storage/src/utils/mod.rs +++ b/storage/src/utils/mod.rs @@ -1,3 +1,5 @@ +pub use self::cell_writer::*; pub use self::stored_value::*; +mod cell_writer; mod stored_value;