Skip to content

Commit

Permalink
fix(storage): only delete static file if last_block is on a previo…
Browse files Browse the repository at this point in the history
…us static file (#11029)

Co-authored-by: Alexgao001 <[email protected]>
  • Loading branch information
joshieDo and alexgao001 authored Sep 25, 2024
1 parent 1d56382 commit 4070498
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 33 deletions.
10 changes: 8 additions & 2 deletions crates/static-file/types/src/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ impl StaticFileSegment {
pub const fn is_receipts(&self) -> bool {
matches!(self, Self::Receipts)
}

/// Returns `true` if the segment is `StaticFileSegment::Receipts` or
/// `StaticFileSegment::Transactions`.
pub const fn is_tx_based(&self) -> bool {
matches!(self, Self::Receipts | Self::Transactions)
}
}

/// A segment header that contains information common to all segments. Used for storage.
Expand Down Expand Up @@ -239,7 +245,7 @@ impl SegmentHeader {
match self.segment {
StaticFileSegment::Headers => {
if let Some(range) = &mut self.block_range {
if num > range.end {
if num > range.end - range.start {
self.block_range = None;
} else {
range.end = range.end.saturating_sub(num);
Expand All @@ -248,7 +254,7 @@ impl SegmentHeader {
}
StaticFileSegment::Transactions | StaticFileSegment::Receipts => {
if let Some(range) = &mut self.tx_range {
if num > range.end {
if num > range.end - range.start {
self.tx_range = None;
} else {
range.end = range.end.saturating_sub(num);
Expand Down
27 changes: 17 additions & 10 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,15 +533,16 @@ impl StaticFileProvider {
})
.or_insert_with(|| BTreeMap::from([(tx_end, current_block_range)]));
}
} else if tx_index.get(&segment).map(|index| index.len()) == Some(1) {
// Only happens if we unwind all the txs/receipts from the first static file.
// Should only happen in test scenarios.
if jar.user_header().expected_block_start() == 0 &&
matches!(
segment,
StaticFileSegment::Receipts | StaticFileSegment::Transactions
)
{
} else if segment.is_tx_based() {
// The unwinded file has no more transactions/receipts. However, the highest
// block is within this files' block range. We only retain
// entries with block ranges before the current one.
tx_index.entry(segment).and_modify(|index| {
index.retain(|_, block_range| block_range.start() < fixed_range.start());
});

// If the index is empty, just remove it.
if tx_index.get(&segment).is_some_and(|index| index.is_empty()) {
tx_index.remove(&segment);
}
}
Expand Down Expand Up @@ -1145,11 +1146,17 @@ impl StaticFileProvider {
Ok(data)
}

#[cfg(any(test, feature = "test-utils"))]
/// Returns `static_files` directory
#[cfg(any(test, feature = "test-utils"))]
pub fn path(&self) -> &Path {
&self.path
}

/// Returns `static_files` transaction index
#[cfg(any(test, feature = "test-utils"))]
pub fn tx_index(&self) -> &RwLock<SegmentRanges> {
&self.static_files_tx_index
}
}

/// Helper trait to manage different [`StaticFileProviderRW`] of an `Arc<StaticFileProvider`
Expand Down
107 changes: 88 additions & 19 deletions crates/storage/provider/src/providers/static_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,9 @@ mod tests {
use reth_db_api::transaction::DbTxMut;
use reth_primitives::{
static_file::{find_fixed_range, SegmentRangeInclusive, DEFAULT_BLOCKS_PER_STATIC_FILE},
BlockHash, Header, Receipt, TransactionSignedNoHash,
BlockHash, Header, Receipt, TransactionSignedNoHash, TxNumber,
};
use reth_storage_api::{ReceiptProvider, TransactionsProvider};
use reth_testing_utils::generators::{self, random_header_range};
use std::{fmt::Debug, fs, ops::Range, path::Path};

Expand Down Expand Up @@ -204,6 +205,14 @@ mod tests {
"block mismatch",
)?;

if let Some(id) = expected_tip {
assert_eyre(
sf_rw.header_by_number(id)?.map(|h| h.number),
expected_tip,
"header mismatch",
)?;
}

// Validate the number of files remaining in the directory
assert_eyre(
fs::read_dir(static_dir)?.count(),
Expand Down Expand Up @@ -304,17 +313,22 @@ mod tests {
mut tx_count: u64,
next_tx_num: &mut u64,
) {
let mut receipt = Receipt::default();
let mut tx = TransactionSignedNoHash::default();

for block in block_range.clone() {
writer.increment_block(block).unwrap();

// Append transaction/receipt if there's still a transaction count to append
if tx_count > 0 {
if segment.is_receipts() {
writer.append_receipt(*next_tx_num, &Receipt::default()).unwrap();
// Used as ID for validation
receipt.cumulative_gas_used = *next_tx_num;
writer.append_receipt(*next_tx_num, &receipt).unwrap();
} else {
writer
.append_transaction(*next_tx_num, &TransactionSignedNoHash::default())
.unwrap();
// Used as ID for validation
tx.transaction.set_nonce(*next_tx_num);
writer.append_transaction(*next_tx_num, &tx).unwrap();
}
*next_tx_num += 1;
tx_count -= 1;
Expand Down Expand Up @@ -376,25 +390,36 @@ mod tests {
expected_tx_range.as_ref()
);
});

// Ensure transaction index
let tx_index = sf_rw.tx_index().read();
let expected_tx_index =
vec![(8, SegmentRangeInclusive::new(0, 9)), (9, SegmentRangeInclusive::new(20, 29))];
assert_eq!(
tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
(!expected_tx_index.is_empty()).then_some(expected_tx_index),
"tx index mismatch",
);
}

#[test]
#[ignore]
fn test_tx_based_truncation() {
let segments = [StaticFileSegment::Transactions, StaticFileSegment::Receipts];
let blocks_per_file = 10; // Number of blocks per file
let files_per_range = 3; // Number of files per range (data/conf/offset files)
let file_set_count = 3; // Number of sets of files to create
let initial_file_count = files_per_range * file_set_count + 1; // Includes lockfile

#[allow(clippy::too_many_arguments)]
fn prune_and_validate(
sf_rw: &StaticFileProvider,
static_dir: impl AsRef<Path>,
segment: StaticFileSegment,
prune_count: u64,
last_block: u64,
expected_tx_tip: u64,
expected_tx_tip: Option<u64>,
expected_file_count: i32,
expected_tx_index: Vec<(TxNumber, SegmentRangeInclusive)>,
) -> eyre::Result<()> {
let mut writer = sf_rw.latest_writer(segment)?;

Expand All @@ -412,18 +437,41 @@ mod tests {
Some(last_block),
"block mismatch",
)?;
assert_eyre(
sf_rw.get_highest_static_file_tx(segment),
Some(expected_tx_tip),
"tx mismatch",
)?;
assert_eyre(sf_rw.get_highest_static_file_tx(segment), expected_tx_tip, "tx mismatch")?;

// Verify that transactions and receipts are returned correctly. Uses
// cumulative_gas_used & nonce as ids.
if let Some(id) = expected_tx_tip {
if segment.is_receipts() {
assert_eyre(
expected_tx_tip,
sf_rw.receipt(id)?.map(|r| r.cumulative_gas_used),
"tx mismatch",
)?;
} else {
assert_eyre(
expected_tx_tip,
sf_rw.transaction_by_id(id)?.map(|t| t.nonce()),
"tx mismatch",
)?;
}
}

// Ensure the file count has reduced as expected
assert_eyre(
fs::read_dir(static_dir)?.count(),
expected_file_count as usize,
"file count mismatch",
)?;

// Ensure that the inner tx index (max_tx -> block range) is as expected
let tx_index = sf_rw.tx_index().read();
assert_eyre(
tx_index.get(&segment).map(|index| index.iter().map(|(k, v)| (*k, *v)).collect()),
(!expected_tx_index.is_empty()).then_some(expected_tx_index),
"tx index mismatch",
)?;

Ok(())
}

Expand All @@ -442,26 +490,46 @@ mod tests {
let highest_tx = sf_rw.get_highest_static_file_tx(segment).unwrap();

// Test cases
// [prune_count, last_block, expected_tx_tip, expected_file_count)
// [prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index)
let test_cases = vec![
// Case 0: 20..=29 has only one tx. Prune the only tx of the block range.
// It ensures that the file is not deleted even though there are no rows, since the
// `last_block` which is passed to the prune method is the first
// block of the range.
(1, blocks_per_file * 2, highest_tx - 1, initial_file_count),
(
1,
blocks_per_file * 2,
Some(highest_tx - 1),
initial_file_count,
vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
),
// Case 1: 10..=19 has no txs. There are no txes in the whole block range, but want
// to unwind to block 9. Ensures that the 20..=29 and 10..=19 files
// are deleted.
(0, blocks_per_file - 1, highest_tx - 1, files_per_range + 1), // includes lockfile
(
0,
blocks_per_file - 1,
Some(highest_tx - 1),
files_per_range + 1, // includes lockfile
vec![(highest_tx - 1, SegmentRangeInclusive::new(0, 9))],
),
// Case 2: Prune most txs up to block 1.
(7, 1, 1, files_per_range + 1),
(
highest_tx - 1,
1,
Some(0),
files_per_range + 1,
vec![(0, SegmentRangeInclusive::new(0, 1))],
),
// Case 3: Prune remaining tx and ensure that file is not deleted.
(1, 0, 0, files_per_range + 1),
(1, 0, None, files_per_range + 1, vec![]),
];

// Loop through test cases
for (case, (prune_count, last_block, expected_tx_tip, expected_file_count)) in
test_cases.into_iter().enumerate()
for (
case,
(prune_count, last_block, expected_tx_tip, expected_file_count, expected_tx_index),
) in test_cases.into_iter().enumerate()
{
prune_and_validate(
&sf_rw,
Expand All @@ -471,6 +539,7 @@ mod tests {
last_block,
expected_tx_tip,
expected_file_count,
expected_tx_index,
)
.map_err(|err| eyre::eyre!("Test case {case}: {err}"))
.unwrap();
Expand Down
12 changes: 10 additions & 2 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,8 +381,9 @@ impl StaticFileProviderRW {
/// Commits to the configuration file at the end.
fn truncate(&mut self, num_rows: u64, last_block: Option<u64>) -> ProviderResult<()> {
let mut remaining_rows = num_rows;
let segment = self.writer.user_header().segment();
while remaining_rows > 0 {
let len = match self.writer.user_header().segment() {
let len = match segment {
StaticFileSegment::Headers => {
self.writer.user_header().block_len().unwrap_or_default()
}
Expand All @@ -396,7 +397,14 @@ impl StaticFileProviderRW {
// delete the whole file and go to the next static file
let block_start = self.writer.user_header().expected_block_start();

if block_start != 0 {
// We only delete the file if it's NOT the first static file AND:
// * it's a Header segment OR
// * it's a tx-based segment AND `last_block` is lower than the first block of this
// file's block range. Otherwise, having no rows simply means that this block
// range has no transactions, but the file should remain.
if block_start != 0 &&
(segment.is_headers() || last_block.is_some_and(|b| b < block_start))
{
self.delete_current_and_open_previous()?;
} else {
// Update `SegmentHeader`
Expand Down

0 comments on commit 4070498

Please sign in to comment.