Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(papyrus_storage): add append_events function that writes the events of a block to storage #2900

Open
wants to merge 3 commits into
base: alonl/event_storage_split
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 64 additions & 3 deletions crates/papyrus_storage/src/body/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,23 @@ use starknet_api::transaction::{
Event,
EventContent,
EventIndexInTransactionOutput,
TransactionOffsetInBlock,
TransactionOutput,
};

use super::TransactionMetadataTable;
use super::{update_marker, TransactionMetadataTable};
use crate::body::{AddressToTransactionIndexTableKey, TransactionIndex};
use crate::db::serialization::{NoVersionValueWrapper, VersionZeroWrapper};
use crate::db::table_types::{CommonPrefix, DbCursor, DbCursorTrait, NoValue, SimpleTable, Table};
use crate::db::{DbTransaction, RO};
use crate::{FileHandlers, StorageResult, StorageTxn, TransactionMetadata};
use crate::db::{DbTransaction, RO, RW};
use crate::{
FileHandlers,
OffsetKind,
StorageResult,
StorageScope,
StorageTxn,
TransactionMetadata,
};

/// An identifier of an event.
#[derive(Debug, Copy, Clone, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Ord)]
Expand Down Expand Up @@ -372,3 +380,56 @@ type AddressToTransactionIndexTableCursor<'txn> = DbCursor<
/// A cursor of the transaction outputs table.
type TransactionMetadataTableCursor<'txn> =
DbCursor<'txn, RO, TransactionIndex, VersionZeroWrapper<TransactionMetadata>, SimpleTable>;

/// Interface for updating the events in the storage.
pub trait EventStorageWriter
where
Self: Sized,
{
/// Appends the events of an entire block to the storage.
// To enforce that no commit happen after a failure, we consume and return Self on success.
fn append_events(
self,
block_number: BlockNumber,
block_events: &[&[Event]],
) -> StorageResult<Self>;
}

impl EventStorageWriter for StorageTxn<'_, RW> {
fn append_events(
self,
block_number: BlockNumber,
block_events: &[&[Event]],
) -> StorageResult<Self> {
let markers_table = self.open_table(&self.tables.markers)?;
update_marker(&self.txn, &markers_table, block_number)?;
if self.scope != StorageScope::StateOnly {
let events_table = self.open_table(&self.tables.events)?;
let file_offset_table = self.open_table(&self.tables.file_offsets)?;
let address_to_transaction_index =
self.open_table(&self.tables.address_to_transaction_index)?;

for (index, &transaction_events) in block_events.iter().enumerate() {
let transaction_index =
TransactionIndex(block_number, TransactionOffsetInBlock(index));
let event_offset = self.file_handlers.append_events(transaction_events);
events_table.append(&self.txn, &transaction_index, &event_offset)?;
for event in transaction_events {
address_to_transaction_index.insert(
&self.txn,
&(event.from_address, transaction_index),
&NoValue,
)?;
}
if index == block_events.len() - 1 {
file_offset_table.upsert(
&self.txn,
&OffsetKind::Event,
&event_offset.next_offset(),
)?;
}
}
}
Ok(self)
}
}
7 changes: 7 additions & 0 deletions crates/papyrus_storage/src/body/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ use tracing::debug;
use crate::db::serialization::{NoVersionValueWrapper, VersionZeroWrapper};
use crate::db::table_types::{CommonPrefix, DbCursorTrait, NoValue, SimpleTable, Table};
use crate::db::{DbTransaction, TableHandle, TransactionKind, RW};
use crate::mmap_file::LocationInFile;
use crate::{
FileHandlers,
MarkerKind,
Expand All @@ -85,6 +86,12 @@ type AddressToTransactionIndexTable<'env> = TableHandle<
NoVersionValueWrapper<NoValue>,
CommonPrefix,
>;
// TODO: remove the dead code attribute.
#[allow(dead_code)]
type EventsTableKey = TransactionIndex;
#[allow(dead_code)]
type EventsTable<'env> =
TableHandle<'env, EventsTableKey, VersionZeroWrapper<LocationInFile>, SimpleTable>;

/// The index of a transaction in a block.
#[derive(Copy, Clone, Debug, Eq, PartialEq, Deserialize, Serialize, PartialOrd, Ord)]
Expand Down
16 changes: 8 additions & 8 deletions crates/papyrus_storage/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -721,8 +721,8 @@ impl FileHandlers<RW> {
}

// Appends an event to the corresponding file and returns its location.
fn append_events(&self, events: &Vec<Event>) -> LocationInFile {
self.clone().event.append(events)
fn append_events(&self, events: &[Event]) -> LocationInFile {
self.clone().event.append(&events.to_vec())
}

// TODO(dan): Consider 1. flushing only the relevant files, 2. flushing concurrently.
Expand All @@ -749,7 +749,7 @@ impl<Mode: TransactionKind> FileHandlers<Mode> {
("deprecated_contract_class".to_string(), self.deprecated_contract_class.stats()),
("transaction_output".to_string(), self.transaction_output.stats()),
("transaction".to_string(), self.transaction.stats()),
("events".to_string(), self.event.stats()),
("event".to_string(), self.event.stats()),
])
}

Expand Down Expand Up @@ -865,8 +865,8 @@ fn open_storage_files(
)?;

let event_offset = table.get(&db_transaction, &OffsetKind::Event)?.unwrap_or_default();
let (events_writer, events_reader) =
open_file(mmap_file_config, db_config.path().join("events.dat"), event_offset)?;
let (event_writer, event_reader) =
open_file(mmap_file_config, db_config.path().join("event.dat"), event_offset)?;

Ok((
FileHandlers {
Expand All @@ -876,7 +876,7 @@ fn open_storage_files(
deprecated_contract_class: deprecated_contract_class_writer,
transaction_output: transaction_output_writer,
transaction: transaction_writer,
event: events_writer,
event: event_writer,
},
FileHandlers {
thin_state_diff: thin_state_diff_reader,
Expand All @@ -885,7 +885,7 @@ fn open_storage_files(
deprecated_contract_class: deprecated_contract_class_reader,
transaction_output: transaction_output_reader,
transaction: transaction_reader,
event: events_reader,
event: event_reader,
},
))
}
Expand All @@ -905,7 +905,7 @@ pub enum OffsetKind {
TransactionOutput,
/// A transaction file.
Transaction,
/// An events file.
/// An event file.
Event,
}

Expand Down
1 change: 1 addition & 0 deletions crates/starknet_api/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -948,4 +948,5 @@ pub struct TransactionOffsetInBlock(pub usize);
#[derive(
Debug, Default, Copy, Clone, Eq, PartialEq, Hash, Deserialize, Serialize, PartialOrd, Ord,
)]
// TODO: Rename to EventIndexInTransaction.
pub struct EventIndexInTransactionOutput(pub usize);
Loading