Skip to content

Commit

Permalink
fix(provider): move consistency check methods from NippyJarWriter t…
Browse files Browse the repository at this point in the history
…o `NippyJarChecker` (#10633)
  • Loading branch information
joshieDo committed Sep 2, 2024
1 parent 2c0170a commit d58cf53
Show file tree
Hide file tree
Showing 7 changed files with 284 additions and 223 deletions.
16 changes: 11 additions & 5 deletions crates/stages/stages/src/stages/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,9 @@ mod tests {
StaticFileSegment, B256, U256,
};
use reth_provider::{
providers::StaticFileWriter, AccountExtReader, BlockReader, DatabaseProviderFactory,
ProviderFactory, ProviderResult, ReceiptProvider, StageCheckpointWriter,
StaticFileProviderFactory, StorageReader,
providers::{StaticFileProvider, StaticFileWriter},
AccountExtReader, BlockReader, DatabaseProviderFactory, ProviderFactory, ProviderResult,
ReceiptProvider, StageCheckpointWriter, StaticFileProviderFactory, StorageReader,
};
use reth_prune_types::{PruneMode, PruneModes};
use reth_stages_api::{
Expand Down Expand Up @@ -297,7 +297,10 @@ mod tests {
is_full_node: bool,
expected: Option<PipelineTarget>,
) {
let static_file_provider = db.factory.static_file_provider();
// We recreate the static file provider, since consistency heals are done on fetching the
// writer for the first time.
let static_file_provider =
StaticFileProvider::read_write(db.factory.static_file_provider().path()).unwrap();

// Simulate corruption by removing `prune_count` rows from the data file without updating
// its offset list and configuration.
Expand All @@ -312,8 +315,11 @@ mod tests {
data_file.get_ref().sync_all().unwrap();
}

// We recreate the static file provider, since consistency heals are done on fetching the
// writer for the first time.
assert_eq!(
static_file_provider
StaticFileProvider::read_write(db.factory.static_file_provider().path())
.unwrap()
.check_consistency(&db.factory.database_provider_ro().unwrap(), is_full_node,),
Ok(expected)
);
Expand Down
187 changes: 187 additions & 0 deletions crates/storage/nippy-jar/src/consistency.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use crate::{writer::OFFSET_SIZE_BYTES, NippyJar, NippyJarError, NippyJarHeader};
use std::{
cmp::Ordering,
fs::{File, OpenOptions},
io::{BufWriter, Seek, SeekFrom},
path::Path,
};

/// Performs consistency checks or heals on the [`NippyJar`] file
/// * Is the offsets file size expected?
/// * Is the data file size expected?
///
/// This is based on the assumption that [`NippyJar`] configuration is **always** the last one
/// to be updated when something is written, as by the `NippyJarWriter::commit()` function shows.
///
/// **For checks (read-only) use `check_consistency` method.**
///
/// **For heals (read-write) use `ensure_consistency` method.**
#[derive(Debug)]
pub struct NippyJarChecker<H: NippyJarHeader = ()> {
/// Associated [`NippyJar`], containing all necessary configurations for data
/// handling.
pub(crate) jar: NippyJar<H>,
/// File handle to where the data is stored.
pub(crate) data_file: Option<BufWriter<File>>,
/// File handle to where the offsets are stored.
pub(crate) offsets_file: Option<BufWriter<File>>,
}

impl<H: NippyJarHeader> NippyJarChecker<H> {
pub const fn new(jar: NippyJar<H>) -> Self {
Self { jar, data_file: None, offsets_file: None }
}

/// It will throw an error if the [`NippyJar`] is in a inconsistent state.
pub fn check_consistency(&mut self) -> Result<(), NippyJarError> {
self.handle_consistency(ConsistencyFailStrategy::ThrowError)
}

/// It will attempt to heal if the [`NippyJar`] is in a inconsistent state.
///
/// **ATTENTION**: disk commit should be handled externally by consuming `Self`
pub fn ensure_consistency(&mut self) -> Result<(), NippyJarError> {
self.handle_consistency(ConsistencyFailStrategy::Heal)
}

fn handle_consistency(&mut self, mode: ConsistencyFailStrategy) -> Result<(), NippyJarError> {
self.load_files(mode)?;
let reader = self.jar.open_data_reader()?;

// When an offset size is smaller than the initial (8), we are dealing with immutable
// data.
if reader.offset_size() != OFFSET_SIZE_BYTES {
return Err(NippyJarError::FrozenJar)
}

let expected_offsets_file_size: u64 = (1 + // first byte is the size of one offset
OFFSET_SIZE_BYTES as usize* self.jar.rows * self.jar.columns + // `offset size * num rows * num columns`
OFFSET_SIZE_BYTES as usize) as u64; // expected size of the data file
let actual_offsets_file_size = self.offsets_file().get_ref().metadata()?.len();

if mode.should_err() &&
expected_offsets_file_size.cmp(&actual_offsets_file_size) != Ordering::Equal
{
return Err(NippyJarError::InconsistentState)
}

// Offsets configuration wasn't properly committed
match expected_offsets_file_size.cmp(&actual_offsets_file_size) {
Ordering::Less => {
// Happened during an appending job
// TODO: ideally we could truncate until the last offset of the last column of the
// last row inserted
self.offsets_file().get_mut().set_len(expected_offsets_file_size)?;
}
Ordering::Greater => {
// Happened during a pruning job
// `num rows = (file size - 1 - size of one offset) / num columns`
self.jar.rows = ((actual_offsets_file_size.
saturating_sub(1). // first byte is the size of one offset
saturating_sub(OFFSET_SIZE_BYTES as u64) / // expected size of the data file
(self.jar.columns as u64)) /
OFFSET_SIZE_BYTES as u64) as usize;

// Freeze row count changed
self.jar.freeze_config()?;
}
Ordering::Equal => {}
}

// last offset should match the data_file_len
let last_offset = reader.reverse_offset(0)?;
let data_file_len = self.data_file().get_ref().metadata()?.len();

if mode.should_err() && last_offset.cmp(&data_file_len) != Ordering::Equal {
return Err(NippyJarError::InconsistentState)
}

// Offset list wasn't properly committed
match last_offset.cmp(&data_file_len) {
Ordering::Less => {
// Happened during an appending job, so we need to truncate the data, since there's
// no way to recover it.
self.data_file().get_mut().set_len(last_offset)?;
}
Ordering::Greater => {
// Happened during a pruning job, so we need to reverse iterate offsets until we
// find the matching one.
for index in 0..reader.offsets_count()? {
let offset = reader.reverse_offset(index + 1)?;
// It would only be equal if the previous row was fully pruned.
if offset <= data_file_len {
let new_len = self
.offsets_file()
.get_ref()
.metadata()?
.len()
.saturating_sub(OFFSET_SIZE_BYTES as u64 * (index as u64 + 1));
self.offsets_file().get_mut().set_len(new_len)?;

drop(reader);

// Since we decrease the offset list, we need to check the consistency of
// `self.jar.rows` again
self.handle_consistency(ConsistencyFailStrategy::Heal)?;
break
}
}
}
Ordering::Equal => {}
}

self.offsets_file().seek(SeekFrom::End(0))?;
self.data_file().seek(SeekFrom::End(0))?;

Ok(())
}

/// Loads data and offsets files.
fn load_files(&mut self, mode: ConsistencyFailStrategy) -> Result<(), NippyJarError> {
let load_file = |path: &Path| -> Result<BufWriter<File>, NippyJarError> {
let path = path
.exists()
.then_some(path)
.ok_or_else(|| NippyJarError::MissingFile(path.to_path_buf()))?;
Ok(BufWriter::new(OpenOptions::new().read(true).write(mode.should_heal()).open(path)?))
};
self.data_file = Some(load_file(self.jar.data_path())?);
self.offsets_file = Some(load_file(&self.jar.offsets_path())?);
Ok(())
}

/// Returns a mutable reference to offsets file.
///
/// **Panics** if it does not exist.
fn offsets_file(&mut self) -> &mut BufWriter<File> {
self.offsets_file.as_mut().expect("should exist")
}

/// Returns a mutable reference to data file.
///
/// **Panics** if it does not exist.
fn data_file(&mut self) -> &mut BufWriter<File> {
self.data_file.as_mut().expect("should exist")
}
}

/// Strategy on encountering an inconsistent state on [`NippyJarChecker`].
#[derive(Debug, Copy, Clone)]
enum ConsistencyFailStrategy {
/// Writer should heal.
Heal,
/// Writer should throw an error.
ThrowError,
}

impl ConsistencyFailStrategy {
/// Whether writer should heal.
const fn should_heal(&self) -> bool {
matches!(self, Self::Heal)
}

/// Whether writer should throw an error.
const fn should_err(&self) -> bool {
matches!(self, Self::ThrowError)
}
}
3 changes: 3 additions & 0 deletions crates/storage/nippy-jar/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::path::PathBuf;
use thiserror::Error;

/// Errors associated with [`crate::NippyJar`].
Expand Down Expand Up @@ -60,4 +61,6 @@ pub enum NippyJarError {
FrozenJar,
#[error("File is in an inconsistent state.")]
InconsistentState,
#[error("Missing file: {0}.")]
MissingFile(PathBuf),
}
25 changes: 14 additions & 11 deletions crates/storage/nippy-jar/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ mod cursor;
pub use cursor::NippyJarCursor;

mod writer;
pub use writer::{ConsistencyFailStrategy, NippyJarWriter};
pub use writer::NippyJarWriter;

mod consistency;
pub use consistency::NippyJarChecker;

const NIPPY_JAR_VERSION: usize = 1;

Expand Down Expand Up @@ -346,7 +349,7 @@ impl<H: NippyJarHeader> NippyJar<H> {
self.freeze_filters()?;

// Creates the writer, data and offsets file
let mut writer = NippyJarWriter::new(self, ConsistencyFailStrategy::Heal)?;
let mut writer = NippyJarWriter::new(self)?;

// Append rows to file while holding offsets in memory
writer.append_rows(columns, total_rows)?;
Expand Down Expand Up @@ -959,7 +962,7 @@ mod tests {
assert!(initial_offset_size > 0);

// Appends a third row
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
let mut writer = NippyJarWriter::new(nippy).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();

Expand Down Expand Up @@ -990,7 +993,7 @@ mod tests {
// Writer will execute a consistency check and verify first that the offset list on disk
// doesn't match the nippy.rows, and prune it. Then, it will prune the data file
// accordingly as well.
let writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
let writer = NippyJarWriter::new(nippy).unwrap();
assert_eq!(initial_rows, writer.rows());
assert_eq!(
initial_offset_size,
Expand All @@ -1016,7 +1019,7 @@ mod tests {

// Appends a third row, so we have an offset list in memory, which is not flushed to disk,
// while the data has been.
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
let mut writer = NippyJarWriter::new(nippy).unwrap();
writer.append_column(Some(Ok(&col1[2]))).unwrap();
writer.append_column(Some(Ok(&col2[2]))).unwrap();

Expand All @@ -1039,7 +1042,7 @@ mod tests {

// Writer will execute a consistency check and verify that the data file has more data than
// it should, and resets it to the last offset of the list (on disk here)
let writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
let writer = NippyJarWriter::new(nippy).unwrap();
assert_eq!(initial_rows, writer.rows());
assert_eq!(
initial_data_size,
Expand All @@ -1055,7 +1058,7 @@ mod tests {
assert_eq!(nippy.max_row_size, 0);
assert_eq!(nippy.rows, 0);

let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
let mut writer = NippyJarWriter::new(nippy).unwrap();
assert_eq!(writer.column(), 0);

writer.append_column(Some(Ok(&col1[0]))).unwrap();
Expand Down Expand Up @@ -1093,7 +1096,7 @@ mod tests {
assert_eq!(nippy.max_row_size, col1[0].len() + col2[0].len());
assert_eq!(nippy.rows, 1);

let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
let mut writer = NippyJarWriter::new(nippy).unwrap();
assert_eq!(writer.column(), 0);

writer.append_column(Some(Ok(&col1[1]))).unwrap();
Expand Down Expand Up @@ -1124,7 +1127,7 @@ mod tests {

fn prune_rows(num_columns: usize, file_path: &Path, col1: &[Vec<u8>], col2: &[Vec<u8>]) {
let nippy = NippyJar::load_without_header(file_path).unwrap();
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
let mut writer = NippyJarWriter::new(nippy).unwrap();

// Appends a third row, so we have an offset list in memory, which is not flushed to disk
writer.append_column(Some(Ok(&col1[2]))).unwrap();
Expand Down Expand Up @@ -1155,7 +1158,7 @@ mod tests {
}

// This should prune from the ondisk offset list and clear the jar.
let mut writer = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
let mut writer = NippyJarWriter::new(nippy).unwrap();
writer.prune_rows(1).unwrap();
assert!(writer.is_dirty());

Expand Down Expand Up @@ -1196,6 +1199,6 @@ mod tests {
data_file.set_len(data_len - 32 * missing_offsets).unwrap();

// runs the consistency check.
let _ = NippyJarWriter::new(nippy, ConsistencyFailStrategy::Heal).unwrap();
let _ = NippyJarWriter::new(nippy).unwrap();
}
}
Loading

0 comments on commit d58cf53

Please sign in to comment.