Skip to content

Commit

Permalink
refactor(papyrus_storage): add revert_events function
Browse files Browse the repository at this point in the history
  • Loading branch information
AlonLStarkWare committed Dec 25, 2024
1 parent f1696a7 commit 9ac0fe0
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 1 deletion.
93 changes: 92 additions & 1 deletion crates/papyrus_storage/src/body/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ use starknet_api::transaction::{
TransactionOffsetInBlock,
TransactionOutput,
};
use tracing::debug;

use super::{update_marker, TransactionMetadataTable};
use super::{update_marker, MarkerKind, TransactionMetadataTable};
use crate::body::{AddressToTransactionIndexTableKey, TransactionIndex};
use crate::db::serialization::{NoVersionValueWrapper, VersionZeroWrapper};
use crate::db::table_types::{CommonPrefix, DbCursor, DbCursorTrait, NoValue, SimpleTable, Table};
Expand Down Expand Up @@ -381,6 +382,41 @@ type AddressToTransactionIndexTableCursor<'txn> = DbCursor<
type TransactionMetadataTableCursor<'txn> =
DbCursor<'txn, RO, TransactionIndex, VersionZeroWrapper<TransactionMetadata>, SimpleTable>;

/// Interface for reading the events from the storage.
pub trait EventStorageReader {
/// The first block number that isn't written in the events table.
fn get_events_marker(&self) -> StorageResult<BlockNumber>;

/// Returns the events of a specific block.
fn get_block_events(&self, block_number: BlockNumber)
-> StorageResult<Option<Vec<Vec<Event>>>>;
}

impl EventStorageReader for StorageTxn<'_, RW> {
fn get_events_marker(&self) -> StorageResult<BlockNumber> {
let markers_table = self.open_table(&self.tables.markers)?;
Ok(markers_table.get(&self.txn, &MarkerKind::Event)?.unwrap_or_default())
}
fn get_block_events(
&self,
block_number: BlockNumber,
) -> StorageResult<Option<Vec<Vec<Event>>>> {
let events_table = self.open_table(&self.tables.events)?;
let mut cursor = events_table.cursor(&self.txn)?;
let mut current =
cursor.lower_bound(&TransactionIndex(block_number, TransactionOffsetInBlock(0)))?;
let mut res = Vec::new();
while let Some((tx_index, events_location)) = current {
if tx_index.0 != block_number {
break;
}
res.push(self.file_handlers.get_events_unchecked(events_location)?);
current = cursor.next()?;
}
if res.is_empty() { Ok(None) } else { Ok(Some(res)) }
}
}

/// Interface for updating the events in the storage.
pub trait EventStorageWriter
where
Expand All @@ -393,6 +429,13 @@ where
block_number: BlockNumber,
block_events: &[&[Event]],
) -> StorageResult<Self>;

/// Removes the events of an entire block from the storage.
// To enforce that no commit happen after a failure, we consume and return Self on success.
fn revert_events(
self,
block_number: BlockNumber,
) -> StorageResult<(Self, Option<Vec<Vec<Event>>>)>;
}

impl EventStorageWriter for StorageTxn<'_, RW> {
Expand Down Expand Up @@ -432,4 +475,52 @@ impl EventStorageWriter for StorageTxn<'_, RW> {
}
Ok(self)
}

fn revert_events(
self,
block_number: BlockNumber,
) -> StorageResult<(Self, Option<Vec<Vec<Event>>>)> {
let markers_table = self.open_table(&self.tables.markers)?;
// Assert that body marker equals the reverted block number + 1
let current_header_marker = self.get_events_marker()?;
if block_number
.next()
.filter(|next_block_number| current_header_marker == *next_block_number)
.is_none()
{
debug!(
"Attempt to revert a non-existing / old block {}. Returning without an action.",
block_number
);
return Ok((self, None));
}
let reverted_block_events = 'reverted_block_events: {
if self.scope == StorageScope::StateOnly {
break 'reverted_block_events None;
} else {
let events_table = self.open_table(&self.tables.events)?;
let address_to_tx_index_table =
self.open_table(&self.tables.address_to_transaction_index)?;

let block_events = self.get_block_events(block_number)?.unwrap_or_else(|| {
panic!("Block events not found for block number: {block_number:?}")
});
// Assuming theres a vector of events for every transaction (even if empty), so
// the index of each item of this enumerate is the transaction index in the
// block.
for (index, transaction_events) in block_events.iter().enumerate() {
let transaction_index =
TransactionIndex(block_number, TransactionOffsetInBlock(index));
events_table.delete(&self.txn, &transaction_index)?;
for event in transaction_events {
address_to_tx_index_table
.delete(&self.txn, &(event.from_address, transaction_index))?;
}
}
Some(block_events)
}
};
markers_table.upsert(&self.txn, &MarkerKind::Event, &block_number)?;
Ok((self, reverted_block_events))
}
}
7 changes: 7 additions & 0 deletions crates/papyrus_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -807,6 +807,13 @@ impl<Mode: TransactionKind> FileHandlers<Mode> {
msg: format!("Transaction at location {:?} not found.", location),
})
}

// Returns the events at the given location or an error in case it doesn't exist.
fn get_events_unchecked(&self, location: LocationInFile) -> StorageResult<Vec<Event>> {
self.event.get(location)?.ok_or(StorageError::DBInconsistency {
msg: format!("Events at location {:?} not found.", location),
})
}
}

fn open_storage_files(
Expand Down

0 comments on commit 9ac0fe0

Please sign in to comment.