diff --git a/full-node/db/sov-db/src/ledger_db/mod.rs b/full-node/db/sov-db/src/ledger_db/mod.rs index cd67a350b..ac5e48c93 100644 --- a/full-node/db/sov-db/src/ledger_db/mod.rs +++ b/full-node/db/sov-db/src/ledger_db/mod.rs @@ -9,7 +9,7 @@ use sov_rollup_interface::{ services::da::SlotData, stf::{BatchReceipt, Event}, }; -use sov_schema_db::{Schema, DB}; +use sov_schema_db::{Schema, SchemaBatch, DB}; use crate::{ rocks_db_config::gen_rocksdb_options, @@ -158,27 +158,34 @@ impl LedgerDB { Ok(out) } - fn put_slot(&self, slot: &StoredSlot, slot_number: &SlotNumber) -> Result<(), anyhow::Error> { - self.db.put::(slot_number, slot)?; - self.db.put::(&slot.hash, slot_number) + fn put_slot( + &self, + slot: &StoredSlot, + slot_number: &SlotNumber, + schema_batch: &mut SchemaBatch, + ) -> Result<(), anyhow::Error> { + schema_batch.put::(slot_number, slot)?; + schema_batch.put::(&slot.hash, slot_number) } fn put_batch( &self, batch: &StoredBatch, batch_number: &BatchNumber, + schema_batch: &mut SchemaBatch, ) -> Result<(), anyhow::Error> { - self.db.put::(batch_number, batch)?; - self.db.put::(&batch.hash, batch_number) + schema_batch.put::(batch_number, batch)?; + schema_batch.put::(&batch.hash, batch_number) } fn put_transaction( &self, tx: &StoredTransaction, tx_number: &TxNumber, + schema_batch: &mut SchemaBatch, ) -> Result<(), anyhow::Error> { - self.db.put::(tx_number, tx)?; - self.db.put::(&tx.hash, tx_number) + schema_batch.put::(tx_number, tx)?; + schema_batch.put::(&tx.hash, tx_number) } fn put_event( @@ -186,10 +193,10 @@ impl LedgerDB { event: &Event, event_number: &EventNumber, tx_number: TxNumber, + schema_batch: &mut SchemaBatch, ) -> Result<(), anyhow::Error> { - self.db.put::(event_number, event)?; - self.db - .put::(&(event.key().clone(), tx_number, *event_number), &()) + schema_batch.put::(event_number, event)?; + schema_batch.put::(&(event.key().clone(), tx_number, *event_number), &()) } /// Commits a slot to the database by inserting its events, transactions, and batches before @@ -198,7 +205,6 @@ impl LedgerDB { &self, data_to_commit: SlotCommit, ) -> Result<(), anyhow::Error> { - // TODO: expose a batch API on the db to commit everything atomically // Create a scope to ensure that the lock is released before we commit to the db let mut current_item_numbers = { let mut next_item_numbers = self.next_item_numbers.lock().unwrap(); @@ -211,6 +217,8 @@ impl LedgerDB { // The lock is released here }; + let mut schema_batch = SchemaBatch::new(); + let first_batch_number = current_item_numbers.batch_number; let last_batch_number = first_batch_number + data_to_commit.batch_receipts.len() as u64; // Insert data from "bottom up" to ensure consistency if the application crashes during insertion @@ -226,10 +234,15 @@ impl LedgerDB { &event, &EventNumber(current_item_numbers.event_number), TxNumber(current_item_numbers.tx_number), + &mut schema_batch, )?; current_item_numbers.event_number += 1; } - self.put_transaction(&tx_to_store, &TxNumber(current_item_numbers.tx_number))?; + self.put_transaction( + &tx_to_store, + &TxNumber(current_item_numbers.tx_number), + &mut schema_batch, + )?; current_item_numbers.tx_number += 1; } @@ -244,6 +257,7 @@ impl LedgerDB { self.put_batch( &batch_to_store, &BatchNumber(current_item_numbers.batch_number), + &mut schema_batch, )?; current_item_numbers.batch_number += 1; } @@ -258,8 +272,11 @@ impl LedgerDB { self.put_slot( &slot_to_store, &SlotNumber(current_item_numbers.slot_number), + &mut schema_batch, )?; + self.db.write_schemas(schema_batch)?; + Ok(()) }