diff --git a/Cargo.lock b/Cargo.lock index 5bc4de7d1..e3b145387 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2190,6 +2190,7 @@ dependencies = [ "serde", "sov-db", "sov-prover-storage-manager", + "sov-rollup-interface", "sov-schema-db", "tempfile", "tokio", diff --git a/bin/citrea/src/main.rs b/bin/citrea/src/main.rs index 7ab639cad..051e9a051 100644 --- a/bin/citrea/src/main.rs +++ b/bin/citrea/src/main.rs @@ -15,6 +15,7 @@ use citrea_common::{from_toml_path, FromEnv, FullNodeConfig}; use citrea_light_client_prover::da_block_handler::StartVariant; use citrea_stf::genesis_config::GenesisPaths; use citrea_stf::runtime::{CitreaRuntime, DefaultContext}; +use citrea_storage_ops::pruning::types::PruningNodeType; use clap::Parser; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_util::MetricKindMask; @@ -370,7 +371,9 @@ where // Spawn pruner if configs are set if let Some(pruner_service) = pruner_service { task_manager.spawn(|cancellation_token| async move { - pruner_service.run(cancellation_token).await + pruner_service + .run(PruningNodeType::FullNode, cancellation_token) + .await }); } diff --git a/bin/citrea/src/rollup/mod.rs b/bin/citrea/src/rollup/mod.rs index f33ee53ad..50ec867d5 100644 --- a/bin/citrea/src/rollup/mod.rs +++ b/bin/citrea/src/rollup/mod.rs @@ -211,7 +211,7 @@ pub trait CitreaRollupBlueprint: RollupBlueprint { ) -> Result<( CitreaFullnode, FullNodeL1BlockHandler, - Option>, + Option, )> { let runner_config = rollup_config.runner.expect("Runner config is missing"); diff --git a/bin/citrea/tests/common/helpers.rs b/bin/citrea/tests/common/helpers.rs index d35c9e2a3..84c66f9cf 100644 --- a/bin/citrea/tests/common/helpers.rs +++ b/bin/citrea/tests/common/helpers.rs @@ -17,6 +17,7 @@ use citrea_common::{ use citrea_light_client_prover::da_block_handler::StartVariant; use citrea_primitives::TEST_PRIVATE_KEY; use citrea_stf::genesis_config::GenesisPaths; +use citrea_storage_ops::pruning::types::PruningNodeType; use citrea_storage_ops::pruning::PruningConfig; use sov_db::ledger_db::SharedLedgerOps; use sov_db::rocks_db_config::RocksdbConfig; @@ -336,8 +337,11 @@ pub async fn start_rollup( // Spawn pruner if configs are set if let Some(pruner) = pruner { - task_manager - .spawn(|cancellation_token| async move { pruner.run(cancellation_token).await }); + task_manager.spawn(|cancellation_token| async move { + pruner + .run(PruningNodeType::FullNode, cancellation_token) + .await + }); } task_manager.spawn(|cancellation_token| async move { diff --git a/bin/cli/src/commands/prune.rs b/bin/cli/src/commands/prune.rs index 965cd7850..fb5abd7c5 100644 --- a/bin/cli/src/commands/prune.rs +++ b/bin/cli/src/commands/prune.rs @@ -1,15 +1,44 @@ use std::path::PathBuf; use std::sync::Arc; +use citrea_storage_ops::pruning::types::PruningNodeType; use citrea_storage_ops::pruning::{Pruner, PruningConfig}; +use clap::ValueEnum; use sov_db::ledger_db::{LedgerDB, SharedLedgerOps}; use sov_db::native_db::NativeDB; use sov_db::rocks_db_config::RocksdbConfig; +use sov_db::schema::tables::{ + BATCH_PROVER_LEDGER_TABLES, FULL_NODE_LEDGER_TABLES, LIGHT_CLIENT_PROVER_LEDGER_TABLES, + SEQUENCER_LEDGER_TABLES, +}; use sov_db::state_db::StateDB; use sov_prover_storage_manager::SnapshotManager; use tracing::{debug, info}; -pub(crate) async fn prune(db_path: PathBuf, distance: u64) -> anyhow::Result<()> { +#[derive(Copy, Clone, ValueEnum)] +pub enum PruningNodeTypeArg { + Sequencer, + FullNode, + BatchProver, + LightClient, +} + +impl From for PruningNodeType { + fn from(value: PruningNodeTypeArg) -> Self { + match value { + PruningNodeTypeArg::Sequencer => PruningNodeType::Sequencer, + PruningNodeTypeArg::FullNode => PruningNodeType::FullNode, + PruningNodeTypeArg::BatchProver => PruningNodeType::BatchProver, + PruningNodeTypeArg::LightClient => PruningNodeType::LightClient, + } + } +} + +pub(crate) async fn prune( + node_type: PruningNodeTypeArg, + db_path: PathBuf, + distance: u64, +) -> anyhow::Result<()> { info!( "Pruning DB at {} with pruning distance of {}", db_path.display(), @@ -17,7 +46,9 @@ pub(crate) async fn prune(db_path: PathBuf, distance: u64) -> anyhow::Result<()> ); let config = PruningConfig { distance }; - let rocksdb_config = RocksdbConfig::new(&db_path, None, None); + let column_families = cfs_from_node_type(node_type); + + let rocksdb_config = RocksdbConfig::new(&db_path, None, Some(column_families.to_vec())); let ledger_db = LedgerDB::with_config(&rocksdb_config)?; let native_db = NativeDB::::setup_schema_db(&rocksdb_config)?; let state_db = StateDB::::setup_schema_db(&rocksdb_config)?; @@ -31,8 +62,26 @@ pub(crate) async fn prune(db_path: PathBuf, distance: u64) -> anyhow::Result<()> soft_confirmation_number, distance ); - let pruner = Pruner::new(config, ledger_db, Arc::new(state_db), Arc::new(native_db)); - pruner.prune(soft_confirmation_number).await; + let pruner = Pruner::new( + config, + ledger_db.inner(), + Arc::new(state_db), + Arc::new(native_db), + ); + pruner + .prune(node_type.into(), soft_confirmation_number) + .await; Ok(()) } + +fn cfs_from_node_type(node_type: PruningNodeTypeArg) -> Vec { + let cfs = match node_type { + PruningNodeTypeArg::Sequencer => SEQUENCER_LEDGER_TABLES, + PruningNodeTypeArg::FullNode => FULL_NODE_LEDGER_TABLES, + PruningNodeTypeArg::BatchProver => BATCH_PROVER_LEDGER_TABLES, + PruningNodeTypeArg::LightClient => LIGHT_CLIENT_PROVER_LEDGER_TABLES, + }; + + cfs.iter().map(|x| x.to_string()).collect::>() +} diff --git a/bin/cli/src/main.rs b/bin/cli/src/main.rs index 6450a3d61..77bdd7d6a 100644 --- a/bin/cli/src/main.rs +++ b/bin/cli/src/main.rs @@ -1,6 +1,7 @@ use std::path::PathBuf; use clap::{Parser, Subcommand, ValueEnum}; +use commands::PruningNodeTypeArg; use tracing_subscriber::fmt; use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::util::SubscriberInitExt; @@ -39,6 +40,8 @@ struct Cli { enum Commands { /// Prune old DB entries Prune { + #[arg(long)] + node_type: PruningNodeTypeArg, /// The path of the database to prune #[arg(long)] db_path: PathBuf, @@ -78,8 +81,12 @@ async fn main() -> anyhow::Result<()> { // You can check for the existence of subcommands, and if found use their // matches just as you would the top level cmd match cli.command { - Commands::Prune { db_path, distance } => { - commands::prune(db_path.clone(), distance).await?; + Commands::Prune { + node_type, + db_path, + distance, + } => { + commands::prune(node_type, db_path.clone(), distance).await?; } Commands::Rollback { db_path: _db_path, diff --git a/crates/fullnode/src/lib.rs b/crates/fullnode/src/lib.rs index 1ad09cd45..91f747a8e 100644 --- a/crates/fullnode/src/lib.rs +++ b/crates/fullnode/src/lib.rs @@ -44,7 +44,7 @@ pub fn build_services( ) -> Result<( CitreaFullnode, L1BlockHandler, - Option>, + Option, )> where Da: DaService, @@ -53,9 +53,9 @@ where { let last_pruned_block = ledger_db.get_last_pruned_l2_height()?.unwrap_or(0); let pruner = runner_config.pruning_config.as_ref().map(|pruning_config| { - let pruner = Pruner::::new( + let pruner = Pruner::new( pruning_config.clone(), - ledger_db.clone(), + ledger_db.inner(), storage_manager.get_state_db_handle(), storage_manager.get_native_db_handle(), ); diff --git a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs index 3be2dcea5..464ab92b6 100644 --- a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs +++ b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/mod.rs @@ -152,6 +152,11 @@ impl SharedLedgerOps for LedgerDB { self.db.path() } + /// Returns the inner DB instance + fn inner(&self) -> Arc { + self.db.clone() + } + #[instrument(level = "trace", skip(self, schema_batch), err, ret)] fn put_soft_confirmation( &self, diff --git a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/traits.rs b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/traits.rs index 7cf3d1890..b1e22a069 100644 --- a/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/traits.rs +++ b/crates/sovereign-sdk/full-node/db/sov-db/src/ledger_db/traits.rs @@ -1,4 +1,5 @@ use std::path::Path; +use std::sync::Arc; use anyhow::Result; use serde::de::DeserializeOwned; @@ -20,6 +21,9 @@ pub trait SharedLedgerOps { /// Return DB path fn path(&self) -> &Path; + /// Returns the inner DB instance + fn inner(&self) -> Arc; + /// Put soft confirmation to db fn put_soft_confirmation( &self, diff --git a/crates/sovereign-sdk/full-node/db/sov-db/src/schema/tables.rs b/crates/sovereign-sdk/full-node/db/sov-db/src/schema/tables.rs index 12a9be7bb..6a31e8e46 100644 --- a/crates/sovereign-sdk/full-node/db/sov-db/src/schema/tables.rs +++ b/crates/sovereign-sdk/full-node/db/sov-db/src/schema/tables.rs @@ -48,7 +48,6 @@ pub const STATE_TABLES: &[&str] = &[ /// A list of all tables used by Sequencer LedgerDB pub const SEQUENCER_LEDGER_TABLES: &[&str] = &[ ExecutedMigrations::table_name(), - SlotByHash::table_name(), SoftConfirmationByNumber::table_name(), SoftConfirmationByHash::table_name(), L2RangeByL1Height::table_name(), @@ -58,10 +57,17 @@ pub const SEQUENCER_LEDGER_TABLES: &[&str] = &[ LastSequencerCommitmentSent::table_name(), SoftConfirmationStatus::table_name(), CommitmentsByNumber::table_name(), - VerifiedBatchProofsBySlotNumber::table_name(), - ProverLastScannedSlot::table_name(), MempoolTxs::table_name(), LastPrunedBlock::table_name(), + // ######## + // The following tables exist in the sequencer since they enable + // using the fullnode's backup as a sequencer database without having + // to remove these tables first as demonstrated by the + // `test_sequencer_crash_and_replace_full_node` test. + VerifiedBatchProofsBySlotNumber::table_name(), + ProverLastScannedSlot::table_name(), + SlotByHash::table_name(), + // ######## #[cfg(test)] TestTableOld::table_name(), #[cfg(test)] @@ -77,13 +83,10 @@ pub const FULL_NODE_LEDGER_TABLES: &[&str] = &[ SoftConfirmationByHash::table_name(), L2RangeByL1Height::table_name(), L2GenesisStateRoot::table_name(), - LastStateDiff::table_name(), - PendingSequencerCommitmentL2Range::table_name(), LastSequencerCommitmentSent::table_name(), SoftConfirmationStatus::table_name(), ProverLastScannedSlot::table_name(), CommitmentsByNumber::table_name(), - MempoolTxs::table_name(), LastPrunedBlock::table_name(), VerifiedBatchProofsBySlotNumber::table_name(), #[cfg(test)] @@ -119,9 +122,9 @@ pub const BATCH_PROVER_LEDGER_TABLES: &[&str] = &[ pub const LIGHT_CLIENT_PROVER_LEDGER_TABLES: &[&str] = &[ ExecutedMigrations::table_name(), SlotByHash::table_name(), + SoftConfirmationByNumber::table_name(), LightClientProofBySlotNumber::table_name(), ProverLastScannedSlot::table_name(), - SoftConfirmationByNumber::table_name(), #[cfg(test)] TestTableOld::table_name(), #[cfg(test)] diff --git a/crates/storage-ops/Cargo.toml b/crates/storage-ops/Cargo.toml index 58b61a6ab..706ad7daf 100644 --- a/crates/storage-ops/Cargo.toml +++ b/crates/storage-ops/Cargo.toml @@ -27,4 +27,5 @@ tracing = { workspace = true } [dev-dependencies] sov-prover-storage-manager = { path = "../sovereign-sdk/full-node/sov-prover-storage-manager" } +sov-rollup-interface = { path = "../../crates/sovereign-sdk/rollup-interface", features = ["testing"] } tempfile = { workspace = true } diff --git a/crates/storage-ops/src/pruning/components/ledger_db.rs b/crates/storage-ops/src/pruning/components/ledger_db.rs deleted file mode 100644 index 06da6f26c..000000000 --- a/crates/storage-ops/src/pruning/components/ledger_db.rs +++ /dev/null @@ -1,8 +0,0 @@ -use sov_db::ledger_db::SharedLedgerOps; -use tracing::debug; - -/// Prune ledger -pub(crate) fn prune_ledger(_ledger_db: DB, up_to_block: u64) { - debug!("Pruning Ledger, up to L2 block {}", up_to_block); - // unimplemented!() -} diff --git a/crates/storage-ops/src/pruning/components/ledger_db/mod.rs b/crates/storage-ops/src/pruning/components/ledger_db/mod.rs new file mode 100644 index 000000000..cc309e85f --- /dev/null +++ b/crates/storage-ops/src/pruning/components/ledger_db/mod.rs @@ -0,0 +1,64 @@ +use std::sync::Arc; + +use sov_schema_db::DB; +use tracing::{debug, error}; + +use self::slots::prune_slots; +use self::soft_confirmations::prune_soft_confirmations; +use crate::pruning::types::PruningNodeType; + +mod slots; +mod soft_confirmations; + +macro_rules! log_result_or_error { + ($tables_group:literal, $call:expr) => {{ + match $call { + Ok(result) => { + debug!("Deleted {} records from {} group", result, $tables_group); + } + Err(e) => { + error!( + "Failed to prune ledger's {} table group: {:?}", + $tables_group, e + ); + return; + } + } + }}; +} + +/// Prune ledger +pub(crate) fn prune_ledger(node_type: PruningNodeType, ledger_db: Arc, up_to_block: u64) { + debug!("Pruning Ledger, up to L2 block {}", up_to_block); + + match node_type { + PruningNodeType::Sequencer => { + log_result_or_error!( + "soft_confirmations", + prune_soft_confirmations(node_type, &ledger_db, up_to_block) + ); + log_result_or_error!("slots", prune_slots(node_type, &ledger_db, up_to_block)); + } + PruningNodeType::FullNode => { + log_result_or_error!( + "soft_confirmations", + prune_soft_confirmations(node_type, &ledger_db, up_to_block) + ); + log_result_or_error!("slots", prune_slots(node_type, &ledger_db, up_to_block)); + } + PruningNodeType::BatchProver => { + log_result_or_error!( + "soft_confirmations", + prune_soft_confirmations(node_type, &ledger_db, up_to_block) + ); + log_result_or_error!("slots", prune_slots(node_type, &ledger_db, up_to_block)); + } + PruningNodeType::LightClient => { + log_result_or_error!( + "soft_confirmations", + prune_soft_confirmations(node_type, &ledger_db, up_to_block) + ); + log_result_or_error!("slots", prune_slots(node_type, &ledger_db, up_to_block)); + } + } +} diff --git a/crates/storage-ops/src/pruning/components/ledger_db/slots.rs b/crates/storage-ops/src/pruning/components/ledger_db/slots.rs new file mode 100644 index 000000000..0ab8898cb --- /dev/null +++ b/crates/storage-ops/src/pruning/components/ledger_db/slots.rs @@ -0,0 +1,73 @@ +use sov_db::schema::tables::{ + CommitmentsByNumber, L2RangeByL1Height, LightClientProofBySlotNumber, ProofsBySlotNumber, + ProofsBySlotNumberV2, SlotByHash, VerifiedBatchProofsBySlotNumber, +}; +use sov_db::schema::types::{SlotNumber, SoftConfirmationNumber}; +use sov_schema_db::{ScanDirection, DB}; + +use crate::pruning::types::PruningNodeType; + +pub(crate) fn prune_slots( + node_type: PruningNodeType, + ledger_db: &DB, + up_to_block: u64, +) -> anyhow::Result { + let mut slots_to_l2_range = ledger_db + .iter_with_direction::(Default::default(), ScanDirection::Forward)?; + slots_to_l2_range.seek_to_first(); + + let mut deleted = 0; + for record in slots_to_l2_range { + let Ok(record) = record else { + continue; + }; + + let slot_height = record.key; + let slot_range = record.value; + + if slot_range.1 > SoftConfirmationNumber(up_to_block) { + break; + } + ledger_db.delete::(&slot_height)?; + ledger_db.delete::(&slot_height)?; + + if !matches!(node_type, PruningNodeType::Sequencer) { + prune_slot_by_hash(ledger_db, slot_height)?; + } + + if matches!(node_type, PruningNodeType::FullNode) { + ledger_db.delete::(&slot_height)?; + } + + if matches!(node_type, PruningNodeType::BatchProver) { + ledger_db.delete::(&slot_height)?; + ledger_db.delete::(&slot_height)?; + } + + if matches!(node_type, PruningNodeType::LightClient) { + ledger_db.delete::(&slot_height)?; + } + + deleted += 1; + } + + Ok(deleted) +} + +fn prune_slot_by_hash(ledger_db: &DB, slot_height: SlotNumber) -> anyhow::Result<()> { + let mut slots = + ledger_db.iter_with_direction::(Default::default(), ScanDirection::Forward)?; + slots.seek_to_first(); + + for record in slots { + let Ok(record) = record else { + continue; + }; + + if record.value < slot_height { + ledger_db.delete::(&record.key)?; + } + } + + Ok(()) +} diff --git a/crates/storage-ops/src/pruning/components/ledger_db/soft_confirmations.rs b/crates/storage-ops/src/pruning/components/ledger_db/soft_confirmations.rs new file mode 100644 index 000000000..a2e1bf51b --- /dev/null +++ b/crates/storage-ops/src/pruning/components/ledger_db/soft_confirmations.rs @@ -0,0 +1,51 @@ +use sov_db::schema::tables::{ + L2Witness, ProverStateDiffs, SoftConfirmationByHash, SoftConfirmationByNumber, + SoftConfirmationStatus, +}; +use sov_db::schema::types::SoftConfirmationNumber; +use sov_schema_db::{ScanDirection, DB}; + +use crate::pruning::types::PruningNodeType; + +pub(crate) fn prune_soft_confirmations( + node_type: PruningNodeType, + ledger_db: &DB, + up_to_block: u64, +) -> anyhow::Result { + let mut soft_confirmations = ledger_db.iter_with_direction::( + Default::default(), + ScanDirection::Forward, + )?; + soft_confirmations.seek_to_first(); + + let mut deleted = 0; + for record in soft_confirmations { + let Ok(record) = record else { + continue; + }; + + let soft_confirmation_number = record.key; + + if soft_confirmation_number > SoftConfirmationNumber(up_to_block) { + break; + } + ledger_db.delete::(&soft_confirmation_number)?; + + if matches!(node_type, PruningNodeType::LightClient) { + continue; + } + + let soft_confirmation = record.value; + ledger_db.delete::(&soft_confirmation.hash)?; + ledger_db.delete::(&soft_confirmation_number)?; + + if matches!(node_type, PruningNodeType::BatchProver) { + ledger_db.delete::(&soft_confirmation_number)?; + ledger_db.delete::(&soft_confirmation_number)?; + } + + deleted += 1; + } + + Ok(deleted) +} diff --git a/crates/storage-ops/src/pruning/components/state_db.rs b/crates/storage-ops/src/pruning/components/state_db.rs index 4c5614a1a..38b0ddf1e 100644 --- a/crates/storage-ops/src/pruning/components/state_db.rs +++ b/crates/storage-ops/src/pruning/components/state_db.rs @@ -12,7 +12,9 @@ pub(crate) fn prune_state_db(state_db: Arc, to_block: u64) { let to_version = to_block + 1; - let mut indices = state_db.iter::().expect("should get iter"); + let mut indices = state_db + .iter::() + .expect("Tried to prune state DB but could not obtain an iterator"); indices.seek_to_first(); diff --git a/crates/storage-ops/src/pruning/mod.rs b/crates/storage-ops/src/pruning/mod.rs index 543cbfeb3..69343ab94 100644 --- a/crates/storage-ops/src/pruning/mod.rs +++ b/crates/storage-ops/src/pruning/mod.rs @@ -2,8 +2,9 @@ use std::sync::Arc; use futures::future; use serde::{Deserialize, Serialize}; -use sov_db::ledger_db::SharedLedgerOps; +use sov_db::schema::tables::LastPrunedBlock; use tracing::info; +use types::PruningNodeType; use self::components::{prune_ledger, prune_native_db}; use self::criteria::{Criteria, DistanceCriteria}; @@ -12,6 +13,7 @@ pub use self::service::*; pub(crate) mod components; pub(crate) mod criteria; pub(crate) mod service; +pub mod types; /// A configuration type to define the behaviour of the pruner. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] @@ -26,12 +28,9 @@ impl Default for PruningConfig { } } -pub struct Pruner -where - DB: SharedLedgerOps, -{ +pub struct Pruner { /// Access to ledger tables. - ledger_db: DB, + ledger_db: Arc, /// Access to native DB. native_db: Arc, /// Access to state DB. @@ -41,13 +40,10 @@ where criteria: Box, } -impl Pruner -where - DB: SharedLedgerOps + Send + Sync + Clone + 'static, -{ +impl Pruner { pub fn new( config: PruningConfig, - ledger_db: DB, + ledger_db: Arc, state_db: Arc, native_db: Arc, ) -> Self { @@ -65,7 +61,7 @@ where pub fn store_last_pruned_l2_height(&self, last_pruned_l2_height: u64) -> anyhow::Result<()> { self.ledger_db - .set_last_pruned_l2_height(last_pruned_l2_height) + .put::(&(), &last_pruned_l2_height) } pub(crate) fn should_prune( @@ -78,7 +74,7 @@ where } /// Prune everything - pub async fn prune(&self, up_to_block: u64) { + pub async fn prune(&self, node_type: PruningNodeType, up_to_block: u64) { info!("Pruning up to L2 block: {}", up_to_block); let ledger_db = self.ledger_db.clone(); @@ -87,7 +83,7 @@ where // let state_db = self.state_db.clone(); let ledger_pruning_handle = - tokio::task::spawn_blocking(move || prune_ledger(ledger_db, up_to_block)); + tokio::task::spawn_blocking(move || prune_ledger(node_type, ledger_db, up_to_block)); // TODO: Fix me // let state_db_pruning_handle = diff --git a/crates/storage-ops/src/pruning/service.rs b/crates/storage-ops/src/pruning/service.rs index 816415cfc..65aec00c8 100644 --- a/crates/storage-ops/src/pruning/service.rs +++ b/crates/storage-ops/src/pruning/service.rs @@ -1,25 +1,22 @@ -use sov_db::ledger_db::SharedLedgerOps; use tokio::select; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; use tracing::{debug, error}; +use super::types::PruningNodeType; use super::Pruner; -pub struct PrunerService { - pruner: Pruner, +pub struct PrunerService { + pruner: Pruner, /// The last block number which was pruned. last_pruned_block: u64, /// A channel receiver which gets notified of new L2 blocks. l2_receiver: broadcast::Receiver, } -impl PrunerService -where - DB: SharedLedgerOps + Send + Sync + Clone + 'static, -{ +impl PrunerService { pub fn new( - pruner: Pruner, + pruner: Pruner, last_pruned_block: u64, l2_receiver: broadcast::Receiver, ) -> Self { @@ -30,7 +27,7 @@ where } } - pub async fn run(mut self, cancellation_token: CancellationToken) { + pub async fn run(mut self, node_type: PruningNodeType, cancellation_token: CancellationToken) { loop { select! { biased; @@ -45,7 +42,7 @@ where if let Ok(current_l2_block) = current_l2_block { debug!("Pruner received L2 {}, checking criteria", current_l2_block); if let Some(up_to_block) = self.pruner.should_prune(self.last_pruned_block, current_l2_block) { - self.pruner.prune(up_to_block).await; + self.pruner.prune(node_type, up_to_block).await; self.last_pruned_block = up_to_block; } } diff --git a/crates/storage-ops/src/pruning/types.rs b/crates/storage-ops/src/pruning/types.rs new file mode 100644 index 000000000..1213a4a45 --- /dev/null +++ b/crates/storage-ops/src/pruning/types.rs @@ -0,0 +1,7 @@ +#[derive(Copy, Clone)] +pub enum PruningNodeType { + Sequencer, + FullNode, + BatchProver, + LightClient, +} diff --git a/crates/storage-ops/src/tests.rs b/crates/storage-ops/src/tests.rs index 372856921..36b05ca87 100644 --- a/crates/storage-ops/src/tests.rs +++ b/crates/storage-ops/src/tests.rs @@ -2,19 +2,33 @@ use std::sync::Arc; use std::thread::sleep; use std::time::Duration; -use sov_db::ledger_db::LedgerDB; +use sov_db::ledger_db::{LedgerDB, SharedLedgerOps}; use sov_db::native_db::NativeDB; use sov_db::rocks_db_config::RocksdbConfig; +use sov_db::schema::tables::{ + CommitmentsByNumber, L2RangeByL1Height, L2Witness, LightClientProofBySlotNumber, + ProofsBySlotNumber, ProofsBySlotNumberV2, ProverStateDiffs, SlotByHash, SoftConfirmationByHash, + SoftConfirmationByNumber, SoftConfirmationStatus, VerifiedBatchProofsBySlotNumber, +}; +use sov_db::schema::types::light_client_proof::{ + StoredLatestDaState, StoredLightClientProof, StoredLightClientProofOutput, +}; +use sov_db::schema::types::soft_confirmation::StoredSoftConfirmation; +use sov_db::schema::types::{SlotNumber, SoftConfirmationNumber}; use sov_db::state_db::StateDB; use sov_prover_storage_manager::SnapshotManager; +use sov_rollup_interface::mmr::MMRGuest; +use sov_schema_db::DB; use tokio::sync::broadcast; use tokio_util::sync::CancellationToken; +use crate::pruning::components::prune_ledger; use crate::pruning::criteria::{Criteria, DistanceCriteria}; +use crate::pruning::types::PruningNodeType; use crate::pruning::{Pruner, PrunerService, PruningConfig}; #[tokio::test(flavor = "multi_thread")] -async fn test_pruner_simple_run() { +async fn test_pruning_simple_run() { let tmpdir = tempfile::tempdir().unwrap(); let (sender, receiver) = broadcast::channel(1); let cancellation_token = CancellationToken::new(); @@ -26,13 +40,13 @@ async fn test_pruner_simple_run() { let pruner = Pruner::new( PruningConfig { distance: 5 }, - ledger_db, + ledger_db.inner(), Arc::new(state_db), Arc::new(native_db), ); let pruner_service = PrunerService::new(pruner, 0, receiver); - tokio::spawn(pruner_service.run(cancellation_token.clone())); + tokio::spawn(pruner_service.run(PruningNodeType::Sequencer, cancellation_token.clone())); sleep(Duration::from_secs(1)); @@ -46,7 +60,7 @@ async fn test_pruner_simple_run() { } #[test] -pub fn test_should_prune() { +pub fn test_pruning_should_prune() { let criteria = DistanceCriteria { distance: 1000 }; assert_eq!(criteria.should_prune(0, 1000), None); assert_eq!(criteria.should_prune(0, 1999), None); @@ -56,3 +70,689 @@ pub fn test_should_prune() { assert_eq!(criteria.should_prune(1000, 2999), None); assert_eq!(criteria.should_prune(1000, 3000), Some(2000)); } + +#[test] +pub fn test_pruning_ledger_db_soft_confirmations() { + let tmpdir = tempfile::tempdir().unwrap(); + let rocksdb_config = RocksdbConfig::new(tmpdir.path(), None, None); + let ledger_db = LedgerDB::with_config(&rocksdb_config).unwrap().inner(); + + let mut da_slot_height = 1; + + for i in 1u64..=20 { + let soft_confirmation = StoredSoftConfirmation { + l2_height: i, + da_slot_height, + da_slot_hash: [i as u8; 32], + da_slot_txs_commitment: [i as u8; 32], + hash: [i as u8; 32], + prev_hash: [(i as u8) - 1; 32], + txs: vec![], + deposit_data: vec![], + state_root: vec![i as u8; 32], + soft_confirmation_signature: vec![], + pub_key: vec![0; 32], + l1_fee_rate: 0, + timestamp: i, + }; + + ledger_db + .put::(&SoftConfirmationNumber(i), &soft_confirmation) + .unwrap(); + ledger_db + .put::(&[i as u8; 32], &SoftConfirmationNumber(i)) + .unwrap(); + ledger_db + .put::( + &SoftConfirmationNumber(i), + &sov_rollup_interface::rpc::SoftConfirmationStatus::Finalized, + ) + .unwrap(); + ledger_db + .put::(&SoftConfirmationNumber(i), &(vec![5; 32], vec![6; 32])) + .unwrap(); + ledger_db + .put::(&SoftConfirmationNumber(i), &vec![]) + .unwrap(); + + da_slot_height += 1; + } + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&[1; 32]) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&[10; 32]) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&[20; 32]) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + prune_ledger(PruningNodeType::Sequencer, ledger_db.clone(), 10); + + // Pruned + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_none()); + // Pruned + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_none()); + // NOT Pruned + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + // Pruned + assert!(ledger_db + .get::(&[1; 32]) + .unwrap() + .is_none()); + // Pruned + assert!(ledger_db + .get::(&[10; 32]) + .unwrap() + .is_none()); + // NOT Pruned + assert!(ledger_db + .get::(&[20; 32]) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); +} + +#[test] +pub fn test_pruning_ledger_db_batch_prover_soft_confirmations() { + let tmpdir = tempfile::tempdir().unwrap(); + let rocksdb_config = RocksdbConfig::new(tmpdir.path(), None, None); + let ledger_db = LedgerDB::with_config(&rocksdb_config).unwrap().inner(); + + let mut da_slot_height = 1; + + for i in 1u64..=20 { + let soft_confirmation = StoredSoftConfirmation { + l2_height: i, + da_slot_height, + da_slot_hash: [i as u8; 32], + da_slot_txs_commitment: [i as u8; 32], + hash: [i as u8; 32], + prev_hash: [(i as u8) - 1; 32], + txs: vec![], + deposit_data: vec![], + state_root: vec![i as u8; 32], + soft_confirmation_signature: vec![], + pub_key: vec![0; 32], + l1_fee_rate: 0, + timestamp: i, + }; + + ledger_db + .put::(&SoftConfirmationNumber(i), &soft_confirmation) + .unwrap(); + ledger_db + .put::(&SoftConfirmationNumber(i), &(vec![5; 32], vec![6; 32])) + .unwrap(); + ledger_db + .put::(&SoftConfirmationNumber(i), &vec![]) + .unwrap(); + + da_slot_height += 1; + } + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + prune_ledger(PruningNodeType::BatchProver, ledger_db.clone(), 10); + + // Pruned + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_none()); + // Pruned + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_none()); + // NOT Pruned + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SoftConfirmationNumber(1)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SoftConfirmationNumber(20)) + .unwrap() + .is_some()); +} + +fn prepare_slots_data(ledger_db: &DB) { + for da_slot_height in 2u64..=20 { + ledger_db + .put::( + &SlotNumber(da_slot_height), + &( + SoftConfirmationNumber(da_slot_height - 1), + SoftConfirmationNumber(da_slot_height), + ), + ) + .unwrap(); + ledger_db + .put::(&SlotNumber(da_slot_height), &vec![]) + .unwrap(); + ledger_db + .put::(&SlotNumber(da_slot_height), &vec![]) + .unwrap(); + ledger_db + .put::(&SlotNumber(da_slot_height), &vec![]) + .unwrap(); + ledger_db + .put::(&SlotNumber(da_slot_height), &vec![]) + .unwrap(); + ledger_db + .put::( + &SlotNumber(da_slot_height), + &StoredLightClientProof { + proof: vec![1; 32], + light_client_proof_output: StoredLightClientProofOutput { + state_root: [0u8; 32], + light_client_proof_method_id: [1u32; 8], + latest_da_state: StoredLatestDaState { + block_hash: [0; 32], + block_height: da_slot_height, + total_work: [0; 32], + current_target_bits: 0, + epoch_start_time: 0, + prev_11_timestamps: [0; 11], + }, + unchained_batch_proofs_info: vec![], + last_l2_height: da_slot_height, + batch_proof_method_ids: vec![], + mmr_guest: MMRGuest::new(), + }, + }, + ) + .unwrap(); + ledger_db + .put::(&[da_slot_height as u8; 32], &SlotNumber(da_slot_height)) + .unwrap(); + } + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); +} + +#[test] +pub fn test_pruning_ledger_db_fullnode_slots() { + let tmpdir = tempfile::tempdir().unwrap(); + let rocksdb_config = RocksdbConfig::new(tmpdir.path(), None, None); + let ledger_db = LedgerDB::with_config(&rocksdb_config).unwrap().inner(); + + prepare_slots_data(&ledger_db); + + prune_ledger(PruningNodeType::FullNode, ledger_db.clone(), 10); + + // SHOULD NOT CHANGE + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + // SHOULD BE PRUNED UP TO 10 + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); +} + +#[test] +pub fn test_pruning_ledger_db_light_client_slots() { + let tmpdir = tempfile::tempdir().unwrap(); + let rocksdb_config = RocksdbConfig::new(tmpdir.path(), None, None); + let ledger_db = LedgerDB::with_config(&rocksdb_config).unwrap().inner(); + + prepare_slots_data(&ledger_db); + + prune_ledger(PruningNodeType::LightClient, ledger_db.clone(), 10); + + // SHOULD NOT CHANGE + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + // SHOULD BE PRUNED UP TO 10 + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); +} + +#[test] +pub fn test_pruning_ledger_db_batch_prover_slots() { + let tmpdir = tempfile::tempdir().unwrap(); + let rocksdb_config = RocksdbConfig::new(tmpdir.path(), None, None); + let ledger_db = LedgerDB::with_config(&rocksdb_config).unwrap().inner(); + + prepare_slots_data(&ledger_db); + + prune_ledger(PruningNodeType::BatchProver, ledger_db.clone(), 10); + + // SHOULD NOT CHANGE + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_some()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + // SHOULD BE PRUNED UP TO 10 + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); + + assert!(ledger_db + .get::(&SlotNumber(2)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(10)) + .unwrap() + .is_none()); + assert!(ledger_db + .get::(&SlotNumber(20)) + .unwrap() + .is_some()); +}