Skip to content

Commit

Permalink
Write slot as atomic batch (#422)
Browse files Browse the repository at this point in the history
* Write slot as atomic batch

* Remove TODO
  • Loading branch information
preston-evans98 authored Jun 21, 2023
1 parent 758bb4e commit f54d926
Showing 1 changed file with 30 additions and 13 deletions.
43 changes: 30 additions & 13 deletions full-node/db/sov-db/src/ledger_db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use sov_rollup_interface::{
services::da::SlotData,
stf::{BatchReceipt, Event},
};
use sov_schema_db::{Schema, DB};
use sov_schema_db::{Schema, SchemaBatch, DB};

use crate::{
rocks_db_config::gen_rocksdb_options,
Expand Down Expand Up @@ -158,38 +158,45 @@ impl LedgerDB {
Ok(out)
}

fn put_slot(&self, slot: &StoredSlot, slot_number: &SlotNumber) -> Result<(), anyhow::Error> {
self.db.put::<SlotByNumber>(slot_number, slot)?;
self.db.put::<SlotByHash>(&slot.hash, slot_number)
fn put_slot(
&self,
slot: &StoredSlot,
slot_number: &SlotNumber,
schema_batch: &mut SchemaBatch,
) -> Result<(), anyhow::Error> {
schema_batch.put::<SlotByNumber>(slot_number, slot)?;
schema_batch.put::<SlotByHash>(&slot.hash, slot_number)
}

fn put_batch(
&self,
batch: &StoredBatch,
batch_number: &BatchNumber,
schema_batch: &mut SchemaBatch,
) -> Result<(), anyhow::Error> {
self.db.put::<BatchByNumber>(batch_number, batch)?;
self.db.put::<BatchByHash>(&batch.hash, batch_number)
schema_batch.put::<BatchByNumber>(batch_number, batch)?;
schema_batch.put::<BatchByHash>(&batch.hash, batch_number)
}

fn put_transaction(
&self,
tx: &StoredTransaction,
tx_number: &TxNumber,
schema_batch: &mut SchemaBatch,
) -> Result<(), anyhow::Error> {
self.db.put::<TxByNumber>(tx_number, tx)?;
self.db.put::<TxByHash>(&tx.hash, tx_number)
schema_batch.put::<TxByNumber>(tx_number, tx)?;
schema_batch.put::<TxByHash>(&tx.hash, tx_number)
}

fn put_event(
&self,
event: &Event,
event_number: &EventNumber,
tx_number: TxNumber,
schema_batch: &mut SchemaBatch,
) -> Result<(), anyhow::Error> {
self.db.put::<EventByNumber>(event_number, event)?;
self.db
.put::<EventByKey>(&(event.key().clone(), tx_number, *event_number), &())
schema_batch.put::<EventByNumber>(event_number, event)?;
schema_batch.put::<EventByKey>(&(event.key().clone(), tx_number, *event_number), &())
}

/// Commits a slot to the database by inserting its events, transactions, and batches before
Expand All @@ -198,7 +205,6 @@ impl LedgerDB {
&self,
data_to_commit: SlotCommit<S, B, T>,
) -> Result<(), anyhow::Error> {
// TODO: expose a batch API on the db to commit everything atomically
// Create a scope to ensure that the lock is released before we commit to the db
let mut current_item_numbers = {
let mut next_item_numbers = self.next_item_numbers.lock().unwrap();
Expand All @@ -211,6 +217,8 @@ impl LedgerDB {
// The lock is released here
};

let mut schema_batch = SchemaBatch::new();

let first_batch_number = current_item_numbers.batch_number;
let last_batch_number = first_batch_number + data_to_commit.batch_receipts.len() as u64;
// Insert data from "bottom up" to ensure consistency if the application crashes during insertion
Expand All @@ -226,10 +234,15 @@ impl LedgerDB {
&event,
&EventNumber(current_item_numbers.event_number),
TxNumber(current_item_numbers.tx_number),
&mut schema_batch,
)?;
current_item_numbers.event_number += 1;
}
self.put_transaction(&tx_to_store, &TxNumber(current_item_numbers.tx_number))?;
self.put_transaction(
&tx_to_store,
&TxNumber(current_item_numbers.tx_number),
&mut schema_batch,
)?;
current_item_numbers.tx_number += 1;
}

Expand All @@ -244,6 +257,7 @@ impl LedgerDB {
self.put_batch(
&batch_to_store,
&BatchNumber(current_item_numbers.batch_number),
&mut schema_batch,
)?;
current_item_numbers.batch_number += 1;
}
Expand All @@ -258,8 +272,11 @@ impl LedgerDB {
self.put_slot(
&slot_to_store,
&SlotNumber(current_item_numbers.slot_number),
&mut schema_batch,
)?;

self.db.write_schemas(schema_batch)?;

Ok(())
}

Expand Down

0 comments on commit f54d926

Please sign in to comment.