From d58cf538500288af27e7e8d0303be7dbd56aff39 Mon Sep 17 00:00:00 2001 From: joshieDo <93316087+joshieDo@users.noreply.github.com> Date: Mon, 2 Sep 2024 17:17:08 +0100 Subject: [PATCH] fix(provider): move consistency check methods from `NippyJarWriter` to `NippyJarChecker` (#10633) --- crates/stages/stages/src/stages/mod.rs | 16 +- crates/storage/nippy-jar/src/consistency.rs | 187 ++++++++++++++++++ crates/storage/nippy-jar/src/error.rs | 3 + crates/storage/nippy-jar/src/lib.rs | 25 +-- crates/storage/nippy-jar/src/writer.rs | 168 +++------------- .../src/providers/static_file/manager.rs | 51 +++-- .../src/providers/static_file/writer.rs | 57 ++---- 7 files changed, 284 insertions(+), 223 deletions(-) create mode 100644 crates/storage/nippy-jar/src/consistency.rs diff --git a/crates/stages/stages/src/stages/mod.rs b/crates/stages/stages/src/stages/mod.rs index 569dc8cc384e..eca259585dd1 100644 --- a/crates/stages/stages/src/stages/mod.rs +++ b/crates/stages/stages/src/stages/mod.rs @@ -62,9 +62,9 @@ mod tests { StaticFileSegment, B256, U256, }; use reth_provider::{ - providers::StaticFileWriter, AccountExtReader, BlockReader, DatabaseProviderFactory, - ProviderFactory, ProviderResult, ReceiptProvider, StageCheckpointWriter, - StaticFileProviderFactory, StorageReader, + providers::{StaticFileProvider, StaticFileWriter}, + AccountExtReader, BlockReader, DatabaseProviderFactory, ProviderFactory, ProviderResult, + ReceiptProvider, StageCheckpointWriter, StaticFileProviderFactory, StorageReader, }; use reth_prune_types::{PruneMode, PruneModes}; use reth_stages_api::{ @@ -297,7 +297,10 @@ mod tests { is_full_node: bool, expected: Option, ) { - let static_file_provider = db.factory.static_file_provider(); + // We recreate the static file provider, since consistency heals are done on fetching the + // writer for the first time. + let static_file_provider = + StaticFileProvider::read_write(db.factory.static_file_provider().path()).unwrap(); // Simulate corruption by removing `prune_count` rows from the data file without updating // its offset list and configuration. @@ -312,8 +315,11 @@ mod tests { data_file.get_ref().sync_all().unwrap(); } + // We recreate the static file provider, since consistency heals are done on fetching the + // writer for the first time. assert_eq!( - static_file_provider + StaticFileProvider::read_write(db.factory.static_file_provider().path()) + .unwrap() .check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,), Ok(expected) ); diff --git a/crates/storage/nippy-jar/src/consistency.rs b/crates/storage/nippy-jar/src/consistency.rs new file mode 100644 index 000000000000..c0345657e844 --- /dev/null +++ b/crates/storage/nippy-jar/src/consistency.rs @@ -0,0 +1,187 @@ +use crate::{writer::OFFSET_SIZE_BYTES, NippyJar, NippyJarError, NippyJarHeader}; +use std::{ + cmp::Ordering, + fs::{File, OpenOptions}, + io::{BufWriter, Seek, SeekFrom}, + path::Path, +}; + +/// Performs consistency checks or heals on the [`NippyJar`] file +/// * Is the offsets file size expected? +/// * Is the data file size expected? +/// +/// This is based on the assumption that [`NippyJar`] configuration is **always** the last one +/// to be updated when something is written, as by the `NippyJarWriter::commit()` function shows. +/// +/// **For checks (read-only) use `check_consistency` method.** +/// +/// **For heals (read-write) use `ensure_consistency` method.** +#[derive(Debug)] +pub struct NippyJarChecker { + /// Associated [`NippyJar`], containing all necessary configurations for data + /// handling. + pub(crate) jar: NippyJar, + /// File handle to where the data is stored. + pub(crate) data_file: Option>, + /// File handle to where the offsets are stored. + pub(crate) offsets_file: Option>, +} + +impl NippyJarChecker { + pub const fn new(jar: NippyJar) -> Self { + Self { jar, data_file: None, offsets_file: None } + } + + /// It will throw an error if the [`NippyJar`] is in a inconsistent state. + pub fn check_consistency(&mut self) -> Result<(), NippyJarError> { + self.handle_consistency(ConsistencyFailStrategy::ThrowError) + } + + /// It will attempt to heal if the [`NippyJar`] is in a inconsistent state. + /// + /// **ATTENTION**: disk commit should be handled externally by consuming `Self` + pub fn ensure_consistency(&mut self) -> Result<(), NippyJarError> { + self.handle_consistency(ConsistencyFailStrategy::Heal) + } + + fn handle_consistency(&mut self, mode: ConsistencyFailStrategy) -> Result<(), NippyJarError> { + self.load_files(mode)?; + let reader = self.jar.open_data_reader()?; + + // When an offset size is smaller than the initial (8), we are dealing with immutable + // data. + if reader.offset_size() != OFFSET_SIZE_BYTES { + return Err(NippyJarError::FrozenJar) + } + + let expected_offsets_file_size: u64 = (1 + // first byte is the size of one offset + OFFSET_SIZE_BYTES as usize* self.jar.rows * self.jar.columns + // `offset size * num rows * num columns` + OFFSET_SIZE_BYTES as usize) as u64; // expected size of the data file + let actual_offsets_file_size = self.offsets_file().get_ref().metadata()?.len(); + + if mode.should_err() && + expected_offsets_file_size.cmp(&actual_offsets_file_size) != Ordering::Equal + { + return Err(NippyJarError::InconsistentState) + } + + // Offsets configuration wasn't properly committed + match expected_offsets_file_size.cmp(&actual_offsets_file_size) { + Ordering::Less => { + // Happened during an appending job + // TODO: ideally we could truncate until the last offset of the last column of the + // last row inserted + self.offsets_file().get_mut().set_len(expected_offsets_file_size)?; + } + Ordering::Greater => { + // Happened during a pruning job + // `num rows = (file size - 1 - size of one offset) / num columns` + self.jar.rows = ((actual_offsets_file_size. + saturating_sub(1). // first byte is the size of one offset + saturating_sub(OFFSET_SIZE_BYTES as u64) / // expected size of the data file + (self.jar.columns as u64)) / + OFFSET_SIZE_BYTES as u64) as usize; + + // Freeze row count changed + self.jar.freeze_config()?; + } + Ordering::Equal => {} + } + + // last offset should match the data_file_len + let last_offset = reader.reverse_offset(0)?; + let data_file_len = self.data_file().get_ref().metadata()?.len(); + + if mode.should_err() && last_offset.cmp(&data_file_len) != Ordering::Equal { + return Err(NippyJarError::InconsistentState) + } + + // Offset list wasn't properly committed + match last_offset.cmp(&data_file_len) { + Ordering::Less => { + // Happened during an appending job, so we need to truncate the data, since there's + // no way to recover it. + self.data_file().get_mut().set_len(last_offset)?; + } + Ordering::Greater => { + // Happened during a pruning job, so we need to reverse iterate offsets until we + // find the matching one. + for index in 0..reader.offsets_count()? { + let offset = reader.reverse_offset(index + 1)?; + // It would only be equal if the previous row was fully pruned. + if offset <= data_file_len { + let new_len = self + .offsets_file() + .get_ref() + .metadata()? + .len() + .saturating_sub(OFFSET_SIZE_BYTES as u64 * (index as u64 + 1)); + self.offsets_file().get_mut().set_len(new_len)?; + + drop(reader); + + // Since we decrease the offset list, we need to check the consistency of + // `self.jar.rows` again + self.handle_consistency(ConsistencyFailStrategy::Heal)?; + break + } + } + } + Ordering::Equal => {} + } + + self.offsets_file().seek(SeekFrom::End(0))?; + self.data_file().seek(SeekFrom::End(0))?; + + Ok(()) + } + + /// Loads data and offsets files. + fn load_files(&mut self, mode: ConsistencyFailStrategy) -> Result<(), NippyJarError> { + let load_file = |path: &Path| -> Result, NippyJarError> { + let path = path + .exists() + .then_some(path) + .ok_or_else(|| NippyJarError::MissingFile(path.to_path_buf()))?; + Ok(BufWriter::new(OpenOptions::new().read(true).write(mode.should_heal()).open(path)?)) + }; + self.data_file = Some(load_file(self.jar.data_path())?); + self.offsets_file = Some(load_file(&self.jar.offsets_path())?); + Ok(()) + } + + /// Returns a mutable reference to offsets file. + /// + /// **Panics** if it does not exist. + fn offsets_file(&mut self) -> &mut BufWriter { + self.offsets_file.as_mut().expect("should exist") + } + + /// Returns a mutable reference to data file. + /// + /// **Panics** if it does not exist. + fn data_file(&mut self) -> &mut BufWriter { + self.data_file.as_mut().expect("should exist") + } +} + +/// Strategy on encountering an inconsistent state on [`NippyJarChecker`]. +#[derive(Debug, Copy, Clone)] +enum ConsistencyFailStrategy { + /// Writer should heal. + Heal, + /// Writer should throw an error. + ThrowError, +} + +impl ConsistencyFailStrategy { + /// Whether writer should heal. + const fn should_heal(&self) -> bool { + matches!(self, Self::Heal) + } + + /// Whether writer should throw an error. + const fn should_err(&self) -> bool { + matches!(self, Self::ThrowError) + } +} diff --git a/crates/storage/nippy-jar/src/error.rs b/crates/storage/nippy-jar/src/error.rs index 6a5714e1e4ed..706d49a177b7 100644 --- a/crates/storage/nippy-jar/src/error.rs +++ b/crates/storage/nippy-jar/src/error.rs @@ -1,3 +1,4 @@ +use std::path::PathBuf; use thiserror::Error; /// Errors associated with [`crate::NippyJar`]. @@ -60,4 +61,6 @@ pub enum NippyJarError { FrozenJar, #[error("File is in an inconsistent state.")] InconsistentState, + #[error("Missing file: {0}.")] + MissingFile(PathBuf), } diff --git a/crates/storage/nippy-jar/src/lib.rs b/crates/storage/nippy-jar/src/lib.rs index 60ed57346163..bc4cd06fe7a2 100644 --- a/crates/storage/nippy-jar/src/lib.rs +++ b/crates/storage/nippy-jar/src/lib.rs @@ -44,7 +44,10 @@ mod cursor; pub use cursor::NippyJarCursor; mod writer; -pub use writer::{ConsistencyFailStrategy, NippyJarWriter}; +pub use writer::NippyJarWriter; + +mod consistency; +pub use consistency::NippyJarChecker; const NIPPY_JAR_VERSION: usize = 1; @@ -346,7 +349,7 @@ impl NippyJar { self.freeze_filters()?; // Creates the writer, data and offsets file - let mut writer = NippyJarWriter::new(self, ConsistencyFailStrategy::Heal)?; + let mut writer = NippyJarWriter::new(self)?; // Append rows to file while holding offsets in memory writer.append_rows(columns, total_rows)?; @@ -959,7 +962,7 @@ mod tests { assert!(initial_offset_size > 0); // Appends a third row - let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap(); + let mut writer = NippyJarWriter::new(nippy).unwrap(); writer.append_column(Some(Ok(&col1[2]))).unwrap(); writer.append_column(Some(Ok(&col2[2]))).unwrap(); @@ -990,7 +993,7 @@ mod tests { // Writer will execute a consistency check and verify first that the offset list on disk // doesn't match the nippy.rows, and prune it. Then, it will prune the data file // accordingly as well. - let writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap(); + let writer = NippyJarWriter::new(nippy).unwrap(); assert_eq!(initial_rows, writer.rows()); assert_eq!( initial_offset_size, @@ -1016,7 +1019,7 @@ mod tests { // Appends a third row, so we have an offset list in memory, which is not flushed to disk, // while the data has been. - let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap(); + let mut writer = NippyJarWriter::new(nippy).unwrap(); writer.append_column(Some(Ok(&col1[2]))).unwrap(); writer.append_column(Some(Ok(&col2[2]))).unwrap(); @@ -1039,7 +1042,7 @@ mod tests { // Writer will execute a consistency check and verify that the data file has more data than // it should, and resets it to the last offset of the list (on disk here) - let writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap(); + let writer = NippyJarWriter::new(nippy).unwrap(); assert_eq!(initial_rows, writer.rows()); assert_eq!( initial_data_size, @@ -1055,7 +1058,7 @@ mod tests { assert_eq!(nippy.max_row_size, 0); assert_eq!(nippy.rows, 0); - let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap(); + let mut writer = NippyJarWriter::new(nippy).unwrap(); assert_eq!(writer.column(), 0); writer.append_column(Some(Ok(&col1[0]))).unwrap(); @@ -1093,7 +1096,7 @@ mod tests { assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len()); assert_eq!(nippy.rows, 1); - let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap(); + let mut writer = NippyJarWriter::new(nippy).unwrap(); assert_eq!(writer.column(), 0); writer.append_column(Some(Ok(&col1[1]))).unwrap(); @@ -1124,7 +1127,7 @@ mod tests { fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec], col2: &[Vec]) { let nippy = NippyJar::load_without_header(file_path).unwrap(); - let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap(); + let mut writer = NippyJarWriter::new(nippy).unwrap(); // Appends a third row, so we have an offset list in memory, which is not flushed to disk writer.append_column(Some(Ok(&col1[2]))).unwrap(); @@ -1155,7 +1158,7 @@ mod tests { } // This should prune from the ondisk offset list and clear the jar. - let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap(); + let mut writer = NippyJarWriter::new(nippy).unwrap(); writer.prune_rows(1).unwrap(); assert!(writer.is_dirty()); @@ -1196,6 +1199,6 @@ mod tests { data_file.set_len(data_len - 32 * missing_offsets).unwrap(); // runs the consistency check. - let _ = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap(); + let _ = NippyJarWriter::new(nippy).unwrap(); } } diff --git a/crates/storage/nippy-jar/src/writer.rs b/crates/storage/nippy-jar/src/writer.rs index 695fd6642e54..34272a9abd04 100644 --- a/crates/storage/nippy-jar/src/writer.rs +++ b/crates/storage/nippy-jar/src/writer.rs @@ -1,13 +1,15 @@ -use crate::{compression::Compression, ColumnResult, NippyJar, NippyJarError, NippyJarHeader}; +use crate::{ + compression::Compression, ColumnResult, NippyJar, NippyJarChecker, NippyJarError, + NippyJarHeader, +}; use std::{ - cmp::Ordering, fs::{File, OpenOptions}, io::{BufWriter, Read, Seek, SeekFrom, Write}, path::Path, }; /// Size of one offset in bytes. -const OFFSET_SIZE_BYTES: u8 = 8; +pub(crate) const OFFSET_SIZE_BYTES: u8 = 8; /// Writer of [`NippyJar`]. Handles table data and offsets only. /// @@ -46,22 +48,32 @@ pub struct NippyJarWriter { impl NippyJarWriter { /// Creates a [`NippyJarWriter`] from [`NippyJar`]. /// - /// If `read_only` is set to `true`, any inconsistency issue won't be healed, and will return - /// [`NippyJarError::InconsistentState`] instead. - pub fn new( - jar: NippyJar, - check_mode: ConsistencyFailStrategy, - ) -> Result { + /// If will **always** attempt to heal any inconsistent state when called. + pub fn new(jar: NippyJar) -> Result { let (data_file, offsets_file, is_created) = Self::create_or_open_files(jar.data_path(), &jar.offsets_path())?; - // Makes sure we don't have dangling data and offset files - jar.freeze_config()?; + let (jar, data_file, offsets_file) = if is_created { + // Makes sure we don't have dangling data and offset files when we just created the file + jar.freeze_config()?; + + (jar, BufWriter::new(data_file), BufWriter::new(offsets_file)) + } else { + // If we are opening a previously created jar, we need to check its consistency, and + // make changes if necessary. + let mut checker = NippyJarChecker::new(jar); + checker.ensure_consistency()?; + + let NippyJarChecker { jar, data_file, offsets_file } = checker; + + // Calling ensure_consistency, will fill data_file and offsets_file + (jar, data_file.expect("qed"), offsets_file.expect("qed")) + }; let mut writer = Self { jar, - data_file: BufWriter::new(data_file), - offsets_file: BufWriter::new(offsets_file), + data_file, + offsets_file, tmp_buf: Vec::with_capacity(1_000_000), uncompressed_row_size: 0, offsets: Vec::with_capacity(1_000_000), @@ -69,13 +81,9 @@ impl NippyJarWriter { dirty: false, }; - // If we are opening a previously created jar, we need to check its consistency, and make - // changes if necessary. if !is_created { - writer.ensure_file_consistency(check_mode)?; - if check_mode.should_heal() { - writer.commit()?; - } + // Commit any potential heals done above. + writer.commit()?; } Ok(writer) @@ -147,107 +155,6 @@ impl NippyJarWriter { Ok((data_file, offsets_file, is_created)) } - /// Performs consistency checks on the [`NippyJar`] file and might self-heal or throw an error - /// according to [`ConsistencyFailStrategy`]. - /// * Is the offsets file size expected? - /// * Is the data file size expected? - /// - /// This is based on the assumption that [`NippyJar`] configuration is **always** the last one - /// to be updated when something is written, as by the `commit()` function shows. - pub fn ensure_file_consistency( - &mut self, - check_mode: ConsistencyFailStrategy, - ) -> Result<(), NippyJarError> { - let reader = self.jar.open_data_reader()?; - - // When an offset size is smaller than the initial (8), we are dealing with immutable - // data. - if reader.offset_size() != OFFSET_SIZE_BYTES { - return Err(NippyJarError::FrozenJar) - } - - let expected_offsets_file_size: u64 = (1 + // first byte is the size of one offset - OFFSET_SIZE_BYTES as usize* self.jar.rows * self.jar.columns + // `offset size * num rows * num columns` - OFFSET_SIZE_BYTES as usize) as u64; // expected size of the data file - let actual_offsets_file_size = self.offsets_file.get_ref().metadata()?.len(); - - if check_mode.should_err() && - expected_offsets_file_size.cmp(&actual_offsets_file_size) != Ordering::Equal - { - return Err(NippyJarError::InconsistentState) - } - - // Offsets configuration wasn't properly committed - match expected_offsets_file_size.cmp(&actual_offsets_file_size) { - Ordering::Less => { - // Happened during an appending job - // TODO: ideally we could truncate until the last offset of the last column of the - // last row inserted - self.offsets_file.get_mut().set_len(expected_offsets_file_size)?; - } - Ordering::Greater => { - // Happened during a pruning job - // `num rows = (file size - 1 - size of one offset) / num columns` - self.jar.rows = ((actual_offsets_file_size. - saturating_sub(1). // first byte is the size of one offset - saturating_sub(OFFSET_SIZE_BYTES as u64) / // expected size of the data file - (self.jar.columns as u64)) / - OFFSET_SIZE_BYTES as u64) as usize; - - // Freeze row count changed - self.jar.freeze_config()?; - } - Ordering::Equal => {} - } - - // last offset should match the data_file_len - let last_offset = reader.reverse_offset(0)?; - let data_file_len = self.data_file.get_ref().metadata()?.len(); - - if check_mode.should_err() && last_offset.cmp(&data_file_len) != Ordering::Equal { - return Err(NippyJarError::InconsistentState) - } - - // Offset list wasn't properly committed - match last_offset.cmp(&data_file_len) { - Ordering::Less => { - // Happened during an appending job, so we need to truncate the data, since there's - // no way to recover it. - self.data_file.get_mut().set_len(last_offset)?; - } - Ordering::Greater => { - // Happened during a pruning job, so we need to reverse iterate offsets until we - // find the matching one. - for index in 0..reader.offsets_count()? { - let offset = reader.reverse_offset(index + 1)?; - // It would only be equal if the previous row was fully pruned. - if offset <= data_file_len { - let new_len = self - .offsets_file - .get_ref() - .metadata()? - .len() - .saturating_sub(OFFSET_SIZE_BYTES as u64 * (index as u64 + 1)); - self.offsets_file.get_mut().set_len(new_len)?; - - drop(reader); - - // Since we decrease the offset list, we need to check the consistency of - // `self.jar.rows` again - self.ensure_file_consistency(ConsistencyFailStrategy::Heal)?; - break - } - } - } - Ordering::Equal => {} - } - - self.offsets_file.seek(SeekFrom::End(0))?; - self.data_file.seek(SeekFrom::End(0))?; - - Ok(()) - } - /// Appends rows to data file. `fn commit()` should be called to flush offsets and config to /// disk. /// @@ -540,24 +447,3 @@ impl NippyJarWriter { &self.jar } } - -/// Strategy on encountering an inconsistent state when creating a [`NippyJarWriter`]. -#[derive(Debug, Copy, Clone)] -pub enum ConsistencyFailStrategy { - /// Writer should heal. - Heal, - /// Writer should throw an error. - ThrowError, -} - -impl ConsistencyFailStrategy { - /// Whether writer should heal. - const fn should_heal(&self) -> bool { - matches!(self, Self::Heal) - } - - /// Whether writer should throw an error. - const fn should_err(&self) -> bool { - matches!(self, Self::ThrowError) - } -} diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 523fdf18fa7a..a25dbd2b2adf 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -22,7 +22,7 @@ use reth_db_api::{ table::Table, transaction::DbTx, }; -use reth_nippy_jar::NippyJar; +use reth_nippy_jar::{NippyJar, NippyJarChecker}; use reth_primitives::{ keccak256, static_file::{find_fixed_range, HighestStaticFiles, SegmentHeader, SegmentRangeInclusive}, @@ -573,7 +573,12 @@ impl StaticFileProvider { // * pruning data was interrupted before a config commit, then we have deleted data that // we are expected to still have. We need to check the Database and unwind everything // accordingly. - self.ensure_file_consistency(segment)?; + if self.access.is_read_only() { + self.check_segment_consistency(segment)?; + } else { + // Fetching the writer will attempt to heal any file level inconsistency. + self.latest_writer(segment)?; + } // Only applies to block-based static files. (Headers) // @@ -655,6 +660,23 @@ impl StaticFileProvider { Ok(unwind_target.map(PipelineTarget::Unwind)) } + /// Checks consistency of the latest static file segment and throws an error if at fault. + /// Read-only. + pub fn check_segment_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> { + if let Some(latest_block) = self.get_highest_static_file_block(segment) { + let file_path = + self.directory().join(segment.filename(&find_fixed_range(latest_block))); + + let jar = NippyJar::::load(&file_path) + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; + + NippyJarChecker::new(jar) + .check_consistency() + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; + } + Ok(()) + } + /// Check invariants for each corresponding table and static file segment: /// /// * the corresponding database table should overlap or have continuity in their keys @@ -1040,9 +1062,6 @@ pub trait StaticFileWriter { /// Commits all changes of all [`StaticFileProviderRW`] of all [`StaticFileSegment`]. fn commit(&self) -> ProviderResult<()>; - - /// Checks consistency of the segment latest file and heals if possible. - fn ensure_file_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()>; } impl StaticFileWriter for StaticFileProvider { @@ -1071,28 +1090,6 @@ impl StaticFileWriter for StaticFileProvider { fn commit(&self) -> ProviderResult<()> { self.writers.commit() } - - fn ensure_file_consistency(&self, segment: StaticFileSegment) -> ProviderResult<()> { - match self.access { - StaticFileAccess::RO => { - let latest_block = self.get_highest_static_file_block(segment).unwrap_or_default(); - - let mut writer = StaticFileProviderRW::new( - segment, - latest_block, - Arc::downgrade(&self.0), - self.metrics.clone(), - )?; - - writer.ensure_file_consistency(self.access.is_read_only())?; - } - StaticFileAccess::RW => { - self.latest_writer(segment)?.ensure_file_consistency(self.access.is_read_only())?; - } - } - - Ok(()) - } } impl HeaderProvider for StaticFileProvider { diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index 6df36ab45fdb..6084c929e85f 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -5,7 +5,7 @@ use crate::providers::static_file::metrics::StaticFileProviderOperation; use parking_lot::{lock_api::RwLockWriteGuard, RawRwLock, RwLock}; use reth_codecs::Compact; use reth_db_api::models::CompactU256; -use reth_nippy_jar::{ConsistencyFailStrategy, NippyJar, NippyJarError, NippyJarWriter}; +use reth_nippy_jar::{NippyJar, NippyJarError, NippyJarWriter}; use reth_primitives::{ static_file::{find_fixed_range, SegmentHeader, SegmentRangeInclusive}, BlockHash, BlockNumber, Header, Receipt, StaticFileSegment, TransactionSignedNoHash, TxNumber, @@ -105,6 +105,9 @@ pub struct StaticFileProviderRW { impl StaticFileProviderRW { /// Creates a new [`StaticFileProviderRW`] for a [`StaticFileSegment`]. + /// + /// Before use, transaction based segments should ensure the block end range is the expected + /// one, and heal if not. For more check `Self::ensure_end_range_consistency`. pub fn new( segment: StaticFileSegment, block: BlockNumber, @@ -112,14 +115,18 @@ impl StaticFileProviderRW { metrics: Option>, ) -> ProviderResult { let (writer, data_path) = Self::open(segment, block, reader.clone(), metrics.clone())?; - Ok(Self { + let mut writer = Self { writer, data_path, buf: Vec::with_capacity(100), reader, metrics, prune_on_commit: None, - }) + }; + + writer.ensure_end_range_consistency()?; + + Ok(writer) } fn open( @@ -150,14 +157,7 @@ impl StaticFileProviderRW { Err(err) => return Err(err), }; - let reader = Self::upgrade_provider_to_strong_reference(&reader); - let access = if reader.is_read_only() { - ConsistencyFailStrategy::ThrowError - } else { - ConsistencyFailStrategy::Heal - }; - - let result = match NippyJarWriter::new(jar, access) { + let result = match NippyJarWriter::new(jar) { Ok(writer) => Ok((writer, path)), Err(NippyJarError::FrozenJar) => { // This static file has been frozen, so we should @@ -177,33 +177,15 @@ impl StaticFileProviderRW { Ok(result) } - /// Checks the consistency of the file and heals it if necessary and `read_only` is set to - /// false. If the check fails, it will return an error. + /// If a file level healing happens, we need to update the end range on the + /// [`SegmentHeader`]. /// - /// If healing does happen, it will update the end range on the [`SegmentHeader`]. However, for - /// transaction based segments, the block end range has to be found and healed externally. + /// However, for transaction based segments, the block end range has to be found and healed + /// externally. /// - /// Check [`NippyJarWriter::ensure_file_consistency`] for more on healing. - pub fn ensure_file_consistency(&mut self, read_only: bool) -> ProviderResult<()> { - let inconsistent_error = || { - ProviderError::NippyJar( - "Inconsistent state found. Restart the node to heal.".to_string(), - ) - }; - - let check_mode = if read_only { - ConsistencyFailStrategy::ThrowError - } else { - ConsistencyFailStrategy::Heal - }; - - self.writer.ensure_file_consistency(check_mode).map_err(|error| { - if matches!(error, NippyJarError::InconsistentState) { - return inconsistent_error() - } - ProviderError::NippyJar(error.to_string()) - })?; - + /// Check [`reth_nippy_jar::NippyJarChecker`] & + /// [`NippyJarWriter`] for more on healing. + fn ensure_end_range_consistency(&mut self) -> ProviderResult<()> { // If we have lost rows (in this run or previous), we need to update the [SegmentHeader]. let expected_rows = if self.user_header().segment().is_headers() { self.user_header().block_len().unwrap_or_default() @@ -212,9 +194,6 @@ impl StaticFileProviderRW { }; let pruned_rows = expected_rows - self.writer.rows() as u64; if pruned_rows > 0 { - if read_only { - return Err(inconsistent_error()) - } self.user_header_mut().prune(pruned_rows); }