Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelise block/transaction changes creation in Importer #2618

Open
wants to merge 19 commits into
base: take_vec_changes_rocksdb
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
5c822da
Parallelise encoding to changes of the block and transaction data
AurelienFT Jan 22, 2025
aeb0f7b
Add CHANGELOG.md
AurelienFT Jan 22, 2025
9c08308
Update order check error in commit importer
AurelienFT Jan 22, 2025
e5d4ef9
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 23, 2025
50a672b
Use new list of changes feature in importer
AurelienFT Jan 23, 2025
55edadd
add not unique check
AurelienFT Jan 24, 2025
91f9e2e
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
f78d496
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
ab75ebb
Update importer to use the correct commit function
AurelienFT Jan 24, 2025
6a95719
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
6f50c51
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
aba1fec
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
60c5e03
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
f8e39c4
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
ed3d6cd
fix importer mock
AurelienFT Jan 24, 2025
6ac67f1
fix importer tests
AurelienFT Jan 24, 2025
6f7f24b
Fix clippy
AurelienFT Jan 24, 2025
7c37e56
update changes returned by Importer
AurelienFT Jan 27, 2025
4e69661
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/).

## [Unreleased]

### Changed
- [2618](https://github.com/FuelLabs/fuel-core/pull/2618): Parallelize block/transaction changes creation in Importer

### Added
- [2619](https://github.com/FuelLabs/fuel-core/pull/2691): Add possibility to submit list of changes to rocksdb.

Expand Down
18 changes: 16 additions & 2 deletions crates/fuel-core/src/service/adapters/block_importer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::{
database::Database,
database::{
commit_changes_with_height_update,
Database,
},
service::adapters::{
BlockImporterAdapter,
ExecutorAdapter,
Expand Down Expand Up @@ -27,7 +30,10 @@ use fuel_core_storage::{
},
FuelBlocks,
},
transactional::Changes,
transactional::{
Changes,
StorageChanges,
},
MerkleRoot,
Result as StorageResult,
StorageAsRef,
Expand All @@ -52,6 +58,7 @@ use fuel_core_types::{
UncommittedValidationResult,
},
};
use itertools::Itertools;
use std::sync::Arc;

impl BlockImporterAdapter {
Expand Down Expand Up @@ -104,6 +111,13 @@ impl ImporterDatabase for Database {
.get(&DenseMetadataKey::Latest)?
.map(|cow| *cow.root()))
}

fn commit_changes(&mut self, changes: StorageChanges) -> StorageResult<()> {
commit_changes_with_height_update(self, changes, |iter| {
iter.iter_all_keys::<FuelBlocks>(Some(IterDirection::Reverse))
.try_collect()
})
}
}

impl Validator for ExecutorAdapter {
Expand Down
172 changes: 106 additions & 66 deletions crates/services/importer/src/importer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,10 @@ use crate::{
use fuel_core_metrics::importer::importer_metrics;
use fuel_core_storage::{
not_found,
transactional::Changes,
transactional::{
Changes,
StorageChanges,
},
Error as StorageError,
MerkleRoot,
};
Expand Down Expand Up @@ -105,6 +108,7 @@ pub enum Error {
UnsupportedConsensusVariant(String),
ActiveBlockResultsSemaphoreClosed(tokio::sync::AcquireError),
RayonTaskWasCanceled,
TaskError(String),
}

impl From<Error> for anyhow::Error {
Expand All @@ -121,7 +125,7 @@ impl PartialEq for Error {
}

pub struct Importer<D, E, V> {
database: Mutex<D>,
database: Arc<Mutex<D>>,
executor: Arc<E>,
verifier: Arc<V>,
chain_id: ChainId,
Expand Down Expand Up @@ -156,7 +160,7 @@ impl<D, E, V> Importer<D, E, V> {
.expect("Failed to create a thread pool for the block processing");

Self {
database: Mutex::new(database),
database: Arc::new(Mutex::new(database)),
executor: Arc::new(executor),
verifier: Arc::new(verifier),
chain_id,
Expand Down Expand Up @@ -261,7 +265,12 @@ where
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();
self._commit_result(result, permit, database)
let block_changes = create_block_changes(
&self.chain_id,
&result.result().sealed_block,
database,
)?;
self._commit_result(result, block_changes, permit, database)
})
.await?
}
Expand All @@ -279,77 +288,19 @@ where
fn _commit_result(
&self,
result: UncommittedResult<Changes>,
block_changes: Changes,
permit: OwnedSemaphorePermit,
database: &mut D,
) -> Result<(), Error> {
let (result, changes) = result.into();
let block = &result.sealed_block.entity;
let consensus = &result.sealed_block.consensus;
let actual_next_height = *block.header().height();

// During importing of the genesis block, the database should not be initialized
// and the genesis block defines the next height.
// During the production of the non-genesis block, the next height should be underlying
// database height + 1.
let expected_next_height = match consensus {
Consensus::Genesis(_) => {
let result = database.latest_block_height()?;
let found = result.is_some();
// Because the genesis block is not committed, it should return `None`.
// If we find the latest height, something is wrong with the state of the database.
if found {
return Err(Error::InvalidUnderlyingDatabaseGenesisState)
}
actual_next_height
}
Consensus::PoA(_) => {
if actual_next_height == BlockHeight::from(0u32) {
return Err(Error::ZeroNonGenericHeight)
}

let last_db_height = database
.latest_block_height()?
.ok_or(not_found!("Latest block height"))?;
last_db_height
.checked_add(1u32)
.ok_or(Error::Overflow)?
.into()
}
_ => {
return Err(Error::UnsupportedConsensusVariant(format!(
"{:?}",
consensus
)))
}
};

if expected_next_height != actual_next_height {
return Err(Error::IncorrectBlockHeight(
expected_next_height,
actual_next_height,
))
}

// Importer expects that `UncommittedResult` contains the result of block
// execution without block itself.
let expected_block_root = database.latest_block_root()?;

#[cfg(feature = "test-helpers")]
let changes_clone = changes.clone();
let mut db_after_execution = database.storage_transaction(changes);
let actual_block_root = db_after_execution.latest_block_root()?;
if actual_block_root != expected_block_root {
return Err(Error::InvalidDatabaseStateAfterExecution(
expected_block_root,
actual_block_root,
))
}

if !db_after_execution.store_new_block(&self.chain_id, &result.sealed_block)? {
return Err(Error::NotUnique(expected_next_height))
}

db_after_execution.commit()?;
database
.commit_changes(StorageChanges::ChangesList(vec![block_changes, changes]))?;

if self.metrics {
Self::update_metrics(&result, &actual_next_height);
Expand Down Expand Up @@ -525,6 +476,19 @@ where
) -> Result<(), Error> {
let _guard = self.lock()?;

let block_changes = std::thread::spawn({
let sealed_block = sealed_block.clone();
let database = self.database.clone();
let chain_id = self.chain_id;
move || {
let mut guard = database
.try_lock()
.expect("Semaphore prevents concurrent access to the database");
let database = guard.deref_mut();
create_block_changes(&chain_id, &sealed_block, database)
}
});

let executor = self.executor.clone();
let verifier = self.verifier.clone();
let (result, execute_time) = self
Expand All @@ -541,6 +505,9 @@ where
.await?;

let result = result?;
let block_changes = block_changes.join().map_err(|e| {
Error::TaskError(format!("Error while waiting for block changes: {:?}", e))
})??;

// Await until all receivers of the notification process the result.
const TIMEOUT: u64 = 20;
Expand Down Expand Up @@ -568,7 +535,8 @@ where
let database = guard.deref_mut();

let start = Instant::now();
self._commit_result(result, permit, database).map(|_| start)
self._commit_result(result, block_changes, permit, database)
.map(|_| start)
})
.await?;

Expand Down Expand Up @@ -607,3 +575,75 @@ impl Awaiter {
}
}
}

fn create_block_changes<D: ImporterDatabase + Transactional>(
chain_id: &ChainId,
sealed_block: &SealedBlock,
database: &mut D,
) -> Result<Changes, Error> {
let consensus = &sealed_block.consensus;
let actual_next_height = *sealed_block.entity.header().height();

// During importing of the genesis block, the database should not be initialized
// and the genesis block defines the next height.
// During the production of the non-genesis block, the next height should be underlying
// database height + 1.
let expected_next_height = match consensus {
Consensus::Genesis(_) => {
let result = database.latest_block_height()?;
let found = result.is_some();
// Because the genesis block is not committed, it should return `None`.
// If we find the latest height, something is wrong with the state of the database.
if found {
return Err(Error::InvalidUnderlyingDatabaseGenesisState)
}
actual_next_height
}
Consensus::PoA(_) => {
if actual_next_height == BlockHeight::from(0u32) {
return Err(Error::ZeroNonGenericHeight)
}

let last_db_height = database
.latest_block_height()?
.ok_or(not_found!("Latest block height"))?;
last_db_height
.checked_add(1u32)
.ok_or(Error::Overflow)?
.into()
}
_ => {
return Err(Error::UnsupportedConsensusVariant(format!(
"{:?}",
consensus
)))
}
};

if expected_next_height != actual_next_height {
return Err(Error::IncorrectBlockHeight(
expected_next_height,
actual_next_height,
))
}

// Importer expects that `UncommittedResult` contains the result of block
// execution without block itself.
let expected_block_root = database.latest_block_root()?;

let mut db_after_execution = database.storage_transaction(Default::default());
let actual_block_root = db_after_execution.latest_block_root()?;

if actual_block_root != expected_block_root {
return Err(Error::InvalidDatabaseStateAfterExecution(
expected_block_root,
actual_block_root,
))
}

if !db_after_execution.store_new_block(chain_id, sealed_block)? {
return Err(Error::NotUnique(actual_next_height))
}

Ok(db_after_execution.into_changes())
}
Loading
Loading