Skip to content

Commit

Permalink
Merge pull request input-output-hk#1748 from input-output-hk/ensemble…
Browse files Browse the repository at this point in the history
…/1724/handle_rollbacks

Handle rollbacks when reading blocks from the chain
  • Loading branch information
Alenar authored Jun 13, 2024
2 parents 5c692e2 + f7da0e6 commit 8fa65dc
Show file tree
Hide file tree
Showing 28 changed files with 1,245 additions and 166 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ As a minor extension, we have adopted a slightly different versioning convention

- **UNSTABLE** Cardano transactions certification:
- Optimize the performances of the computation of the proof with a Merkle map.
- Handle rollback events from the Cardano chain by removing stale data.

- Crates versions:

Expand Down
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/mithril-persistence/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-persistence"
version = "0.2.5"
version = "0.2.6"
description = "Common types, interfaces, and utilities to persist data for Mithril nodes."
authors = { workspace = true }
edition = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
use anyhow::Context;
use sqlite::Value;

use mithril_common::entities::{BlockNumber, BlockRange};
use mithril_common::StdResult;

use crate::database::record::BlockRangeRootRecord;
use crate::sqlite::{Query, SourceAlias, SqLiteEntity, WhereCondition};

/// Query to delete old [BlockRangeRootRecord] from the sqlite database
pub struct DeleteBlockRangeRootQuery {
condition: WhereCondition,
}

impl Query for DeleteBlockRangeRootQuery {
type Entity = BlockRangeRootRecord;

fn filters(&self) -> WhereCondition {
self.condition.clone()
}

fn get_definition(&self, condition: &str) -> String {
// it is important to alias the fields with the same name as the table
// since the table cannot be aliased in a RETURNING statement in SQLite.
let aliases = SourceAlias::new(&[("{:block_range_root:}", "block_range_root")]);
let projection = Self::Entity::get_projection().expand(aliases);

format!("delete from block_range_root where {condition} returning {projection}")
}
}

impl DeleteBlockRangeRootQuery {
pub fn contains_or_above_block_number_threshold(
block_number_threshold: BlockNumber,
) -> StdResult<Self> {
let block_range = BlockRange::from_block_number(block_number_threshold);
let threshold = Value::Integer(block_range.start.try_into().with_context(|| {
format!("Failed to convert threshold `{block_number_threshold}` to i64")
})?);

Ok(Self {
condition: WhereCondition::new("start >= ?*", vec![threshold]),
})
}
}

#[cfg(test)]
mod tests {
use mithril_common::crypto_helper::MKTreeNode;
use mithril_common::entities::BlockRange;

use crate::database::query::{GetBlockRangeRootQuery, InsertBlockRangeRootQuery};
use crate::database::test_helper::cardano_tx_db_connection;
use crate::sqlite::{ConnectionExtensions, SqliteConnection};

use super::*;

fn insert_block_range_roots(connection: &SqliteConnection, records: Vec<BlockRangeRootRecord>) {
connection
.fetch_first(InsertBlockRangeRootQuery::insert_many(records).unwrap())
.unwrap();
}

fn block_range_root_dataset() -> Vec<BlockRangeRootRecord> {
[
(
BlockRange::from_block_number(BlockRange::LENGTH),
MKTreeNode::from_hex("AAAA").unwrap(),
),
(
BlockRange::from_block_number(BlockRange::LENGTH * 2),
MKTreeNode::from_hex("BBBB").unwrap(),
),
(
BlockRange::from_block_number(BlockRange::LENGTH * 3),
MKTreeNode::from_hex("CCCC").unwrap(),
),
]
.into_iter()
.map(BlockRangeRootRecord::from)
.collect()
}

#[test]
fn test_prune_work_even_without_block_range_root_in_db() {
let connection = cardano_tx_db_connection().unwrap();

let cursor = connection
.fetch(
DeleteBlockRangeRootQuery::contains_or_above_block_number_threshold(100).unwrap(),
)
.expect("pruning shouldn't crash without block range root stored");
assert_eq!(0, cursor.count());
}

#[test]
fn test_prune_all_data_if_given_block_number_is_lower_than_stored_number_of_block() {
parameterized_test_prune_block_range(0, block_range_root_dataset().len());
}

#[test]
fn test_prune_keep_all_block_range_root_if_given_number_of_block_is_greater_than_the_highest_one(
) {
parameterized_test_prune_block_range(100_000, 0);
}

#[test]
fn test_prune_block_range_when_block_number_is_block_range_start() {
parameterized_test_prune_block_range(BlockRange::LENGTH * 2, 2);
}

#[test]
fn test_prune_block_range_when_block_number_is_in_block_range() {
parameterized_test_prune_block_range(BlockRange::LENGTH * 2 + 1, 2);
}

#[test]
fn test_keep_block_range_when_block_number_is_just_before_range_start() {
parameterized_test_prune_block_range(BlockRange::LENGTH * 2 - 1, 3);
}

fn parameterized_test_prune_block_range(
block_threshold: BlockNumber,
delete_record_number: usize,
) {
let connection = cardano_tx_db_connection().unwrap();
let dataset = block_range_root_dataset();
insert_block_range_roots(&connection, dataset.clone());

let query =
DeleteBlockRangeRootQuery::contains_or_above_block_number_threshold(block_threshold)
.unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(delete_record_number, cursor.count());

let cursor = connection.fetch(GetBlockRangeRootQuery::all()).unwrap();
assert_eq!(dataset.len() - delete_record_number, cursor.count());
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod delete_block_range_root;
mod get_block_range_root;
mod get_interval_without_block_range;
mod insert_block_range;

pub use delete_block_range_root::*;
pub use get_block_range_root::*;
pub use get_interval_without_block_range::*;
pub use insert_block_range::*;
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,16 @@ impl DeleteCardanoTransactionQuery {
condition: WhereCondition::new("block_number < ?*", vec![threshold]),
})
}

pub fn above_block_number_threshold(block_number_threshold: BlockNumber) -> StdResult<Self> {
let threshold = Value::Integer(block_number_threshold.try_into().with_context(|| {
format!("Failed to convert threshold `{block_number_threshold}` to i64")
})?);

Ok(Self {
condition: WhereCondition::new("block_number > ?*", vec![threshold]),
})
}
}

#[cfg(test)]
Expand Down Expand Up @@ -66,52 +76,112 @@ mod tests {
]
}

#[test]
fn test_prune_work_even_without_transactions_in_db() {
let connection = cardano_tx_db_connection().unwrap();

let cursor = connection
.fetch(DeleteCardanoTransactionQuery::below_block_number_threshold(100).unwrap())
.expect("pruning shouldn't crash without transactions stored");
assert_eq!(0, cursor.count());
mod prune_below_threshold_tests {
use super::*;

#[test]
fn test_prune_work_even_without_transactions_in_db() {
let connection = cardano_tx_db_connection().unwrap();

let cursor = connection
.fetch(DeleteCardanoTransactionQuery::below_block_number_threshold(100).unwrap())
.expect("pruning shouldn't crash without transactions stored");
assert_eq!(0, cursor.count());
}

#[test]
fn test_prune_all_data_if_given_block_number_is_larger_than_stored_number_of_block() {
let connection = cardano_tx_db_connection().unwrap();
insert_transactions(&connection, test_transaction_set());

let query =
DeleteCardanoTransactionQuery::below_block_number_threshold(100_000).unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(test_transaction_set().len(), cursor.count());

let cursor = connection.fetch(GetCardanoTransactionQuery::all()).unwrap();
assert_eq!(0, cursor.count());
}

#[test]
fn test_prune_keep_all_tx_of_last_block_if_given_number_of_block_is_zero() {
let connection = cardano_tx_db_connection().unwrap();
insert_transactions(&connection, test_transaction_set());

let query = DeleteCardanoTransactionQuery::below_block_number_threshold(0).unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(0, cursor.count());

let cursor = connection.fetch(GetCardanoTransactionQuery::all()).unwrap();
assert_eq!(test_transaction_set().len(), cursor.count());
}

#[test]
fn test_prune_data_of_below_given_blocks() {
let connection = cardano_tx_db_connection().unwrap();
insert_transactions(&connection, test_transaction_set());

let query = DeleteCardanoTransactionQuery::below_block_number_threshold(12).unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(4, cursor.count());

let cursor = connection.fetch(GetCardanoTransactionQuery::all()).unwrap();
assert_eq!(2, cursor.count());
}
}

#[test]
fn test_prune_all_data_if_given_block_number_is_larger_than_stored_number_of_block() {
let connection = cardano_tx_db_connection().unwrap();
insert_transactions(&connection, test_transaction_set());

let query = DeleteCardanoTransactionQuery::below_block_number_threshold(100_000).unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(test_transaction_set().len(), cursor.count());

let cursor = connection.fetch(GetCardanoTransactionQuery::all()).unwrap();
assert_eq!(0, cursor.count());
}

#[test]
fn test_prune_keep_all_tx_of_last_block_if_given_number_of_block_is_zero() {
let connection = cardano_tx_db_connection().unwrap();
insert_transactions(&connection, test_transaction_set());

let query = DeleteCardanoTransactionQuery::below_block_number_threshold(0).unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(0, cursor.count());

let cursor = connection.fetch(GetCardanoTransactionQuery::all()).unwrap();
assert_eq!(test_transaction_set().len(), cursor.count());
}

#[test]
fn test_prune_data_of_below_given_blocks() {
let connection = cardano_tx_db_connection().unwrap();
insert_transactions(&connection, test_transaction_set());

let query = DeleteCardanoTransactionQuery::below_block_number_threshold(12).unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(4, cursor.count());

let cursor = connection.fetch(GetCardanoTransactionQuery::all()).unwrap();
assert_eq!(2, cursor.count());
mod prune_above_threshold_tests {
use super::*;

#[test]
fn test_prune_work_even_without_transactions_in_db() {
let connection = cardano_tx_db_connection().unwrap();

let cursor = connection
.fetch(DeleteCardanoTransactionQuery::above_block_number_threshold(100).unwrap())
.expect("pruning shouldn't crash without transactions stored");
assert_eq!(0, cursor.count());
}

#[test]
fn test_prune_all_data_if_given_block_number_is_lower_than_stored_number_of_block() {
let connection = cardano_tx_db_connection().unwrap();
insert_transactions(&connection, test_transaction_set());

let query = DeleteCardanoTransactionQuery::above_block_number_threshold(0).unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(test_transaction_set().len(), cursor.count());

let cursor = connection.fetch(GetCardanoTransactionQuery::all()).unwrap();
assert_eq!(0, cursor.count());
}

#[test]
fn test_prune_keep_all_tx_of_last_block_if_given_number_of_block_is_greater_than_the_highest_one(
) {
let connection = cardano_tx_db_connection().unwrap();
insert_transactions(&connection, test_transaction_set());

let query =
DeleteCardanoTransactionQuery::above_block_number_threshold(100_000).unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(0, cursor.count());

let cursor = connection.fetch(GetCardanoTransactionQuery::all()).unwrap();
assert_eq!(test_transaction_set().len(), cursor.count());
}

#[test]
fn test_prune_data_of_above_given_blocks() {
let connection = cardano_tx_db_connection().unwrap();
insert_transactions(&connection, test_transaction_set());

let query = DeleteCardanoTransactionQuery::above_block_number_threshold(10).unwrap();
let cursor = connection.fetch(query).unwrap();
assert_eq!(4, cursor.count());

let cursor = connection.fetch(GetCardanoTransactionQuery::all()).unwrap();
assert_eq!(2, cursor.count());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ impl GetCardanoTransactionQuery {

pub fn by_transaction_hashes(
transactions_hashes: Vec<TransactionHash>,
up_to: BlockNumber,
up_to_or_equal: BlockNumber,
) -> Self {
let hashes_values = transactions_hashes.into_iter().map(Value::String).collect();
let condition = WhereCondition::where_in("transaction_hash", hashes_values).and_where(
WhereCondition::new("block_number <= ?*", vec![Value::Integer(up_to as i64)]),
WhereCondition::new(
"block_number <= ?*",
vec![Value::Integer(up_to_or_equal as i64)],
),
);

Self { condition }
Expand Down
Loading

0 comments on commit 8fa65dc

Please sign in to comment.