diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d896f7a46c..a23bcb450d1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/). ## [Unreleased] +### Changed +- [2618](https://github.com/FuelLabs/fuel-core/pull/2618): Parallelize block/transaction changes creation in Importer + ### Added - [2619](https://github.com/FuelLabs/fuel-core/pull/2691): Add possibility to submit list of changes to rocksdb. diff --git a/crates/fuel-core/src/service/adapters/block_importer.rs b/crates/fuel-core/src/service/adapters/block_importer.rs index 5b8a06e7f1e..ca0b9d8d186 100644 --- a/crates/fuel-core/src/service/adapters/block_importer.rs +++ b/crates/fuel-core/src/service/adapters/block_importer.rs @@ -1,5 +1,8 @@ use crate::{ - database::Database, + database::{ + commit_changes_with_height_update, + Database, + }, service::adapters::{ BlockImporterAdapter, ExecutorAdapter, @@ -27,7 +30,10 @@ use fuel_core_storage::{ }, FuelBlocks, }, - transactional::Changes, + transactional::{ + Changes, + StorageChanges, + }, MerkleRoot, Result as StorageResult, StorageAsRef, @@ -52,6 +58,7 @@ use fuel_core_types::{ UncommittedValidationResult, }, }; +use itertools::Itertools; use std::sync::Arc; impl BlockImporterAdapter { @@ -104,6 +111,13 @@ impl ImporterDatabase for Database { .get(&DenseMetadataKey::Latest)? .map(|cow| *cow.root())) } + + fn commit_changes(&mut self, changes: StorageChanges) -> StorageResult<()> { + commit_changes_with_height_update(self, changes, |iter| { + iter.iter_all_keys::(Some(IterDirection::Reverse)) + .try_collect() + }) + } } impl Validator for ExecutorAdapter { diff --git a/crates/services/importer/src/importer.rs b/crates/services/importer/src/importer.rs index 4479c4efd37..6dd1bb08ee1 100644 --- a/crates/services/importer/src/importer.rs +++ b/crates/services/importer/src/importer.rs @@ -12,7 +12,10 @@ use crate::{ use fuel_core_metrics::importer::importer_metrics; use fuel_core_storage::{ not_found, - transactional::Changes, + transactional::{ + Changes, + StorageChanges, + }, Error as StorageError, MerkleRoot, }; @@ -105,6 +108,7 @@ pub enum Error { UnsupportedConsensusVariant(String), ActiveBlockResultsSemaphoreClosed(tokio::sync::AcquireError), RayonTaskWasCanceled, + TaskError(String), } impl From for anyhow::Error { @@ -121,7 +125,7 @@ impl PartialEq for Error { } pub struct Importer { - database: Mutex, + database: Arc>, executor: Arc, verifier: Arc, chain_id: ChainId, @@ -156,7 +160,7 @@ impl Importer { .expect("Failed to create a thread pool for the block processing"); Self { - database: Mutex::new(database), + database: Arc::new(Mutex::new(database)), executor: Arc::new(executor), verifier: Arc::new(verifier), chain_id, @@ -261,7 +265,12 @@ where .try_lock() .expect("Semaphore prevents concurrent access to the database"); let database = guard.deref_mut(); - self._commit_result(result, permit, database) + let block_changes = create_block_changes( + &self.chain_id, + &result.result().sealed_block, + database, + )?; + self._commit_result(result, block_changes, permit, database) }) .await? } @@ -279,77 +288,19 @@ where fn _commit_result( &self, result: UncommittedResult, + block_changes: Changes, permit: OwnedSemaphorePermit, database: &mut D, ) -> Result<(), Error> { let (result, changes) = result.into(); let block = &result.sealed_block.entity; - let consensus = &result.sealed_block.consensus; let actual_next_height = *block.header().height(); - // During importing of the genesis block, the database should not be initialized - // and the genesis block defines the next height. - // During the production of the non-genesis block, the next height should be underlying - // database height + 1. - let expected_next_height = match consensus { - Consensus::Genesis(_) => { - let result = database.latest_block_height()?; - let found = result.is_some(); - // Because the genesis block is not committed, it should return `None`. - // If we find the latest height, something is wrong with the state of the database. - if found { - return Err(Error::InvalidUnderlyingDatabaseGenesisState) - } - actual_next_height - } - Consensus::PoA(_) => { - if actual_next_height == BlockHeight::from(0u32) { - return Err(Error::ZeroNonGenericHeight) - } - - let last_db_height = database - .latest_block_height()? - .ok_or(not_found!("Latest block height"))?; - last_db_height - .checked_add(1u32) - .ok_or(Error::Overflow)? - .into() - } - _ => { - return Err(Error::UnsupportedConsensusVariant(format!( - "{:?}", - consensus - ))) - } - }; - - if expected_next_height != actual_next_height { - return Err(Error::IncorrectBlockHeight( - expected_next_height, - actual_next_height, - )) - } - - // Importer expects that `UncommittedResult` contains the result of block - // execution without block itself. - let expected_block_root = database.latest_block_root()?; - #[cfg(feature = "test-helpers")] let changes_clone = changes.clone(); - let mut db_after_execution = database.storage_transaction(changes); - let actual_block_root = db_after_execution.latest_block_root()?; - if actual_block_root != expected_block_root { - return Err(Error::InvalidDatabaseStateAfterExecution( - expected_block_root, - actual_block_root, - )) - } - - if !db_after_execution.store_new_block(&self.chain_id, &result.sealed_block)? { - return Err(Error::NotUnique(expected_next_height)) - } - db_after_execution.commit()?; + database + .commit_changes(StorageChanges::ChangesList(vec![block_changes, changes]))?; if self.metrics { Self::update_metrics(&result, &actual_next_height); @@ -525,6 +476,19 @@ where ) -> Result<(), Error> { let _guard = self.lock()?; + let block_changes = std::thread::spawn({ + let sealed_block = sealed_block.clone(); + let database = self.database.clone(); + let chain_id = self.chain_id; + move || { + let mut guard = database + .try_lock() + .expect("Semaphore prevents concurrent access to the database"); + let database = guard.deref_mut(); + create_block_changes(&chain_id, &sealed_block, database) + } + }); + let executor = self.executor.clone(); let verifier = self.verifier.clone(); let (result, execute_time) = self @@ -541,6 +505,9 @@ where .await?; let result = result?; + let block_changes = block_changes.join().map_err(|e| { + Error::TaskError(format!("Error while waiting for block changes: {:?}", e)) + })??; // Await until all receivers of the notification process the result. const TIMEOUT: u64 = 20; @@ -568,7 +535,8 @@ where let database = guard.deref_mut(); let start = Instant::now(); - self._commit_result(result, permit, database).map(|_| start) + self._commit_result(result, block_changes, permit, database) + .map(|_| start) }) .await?; @@ -607,3 +575,75 @@ impl Awaiter { } } } + +fn create_block_changes( + chain_id: &ChainId, + sealed_block: &SealedBlock, + database: &mut D, +) -> Result { + let consensus = &sealed_block.consensus; + let actual_next_height = *sealed_block.entity.header().height(); + + // During importing of the genesis block, the database should not be initialized + // and the genesis block defines the next height. + // During the production of the non-genesis block, the next height should be underlying + // database height + 1. + let expected_next_height = match consensus { + Consensus::Genesis(_) => { + let result = database.latest_block_height()?; + let found = result.is_some(); + // Because the genesis block is not committed, it should return `None`. + // If we find the latest height, something is wrong with the state of the database. + if found { + return Err(Error::InvalidUnderlyingDatabaseGenesisState) + } + actual_next_height + } + Consensus::PoA(_) => { + if actual_next_height == BlockHeight::from(0u32) { + return Err(Error::ZeroNonGenericHeight) + } + + let last_db_height = database + .latest_block_height()? + .ok_or(not_found!("Latest block height"))?; + last_db_height + .checked_add(1u32) + .ok_or(Error::Overflow)? + .into() + } + _ => { + return Err(Error::UnsupportedConsensusVariant(format!( + "{:?}", + consensus + ))) + } + }; + + if expected_next_height != actual_next_height { + return Err(Error::IncorrectBlockHeight( + expected_next_height, + actual_next_height, + )) + } + + // Importer expects that `UncommittedResult` contains the result of block + // execution without block itself. + let expected_block_root = database.latest_block_root()?; + + let mut db_after_execution = database.storage_transaction(Default::default()); + let actual_block_root = db_after_execution.latest_block_root()?; + + if actual_block_root != expected_block_root { + return Err(Error::InvalidDatabaseStateAfterExecution( + expected_block_root, + actual_block_root, + )) + } + + if !db_after_execution.store_new_block(chain_id, sealed_block)? { + return Err(Error::NotUnique(actual_next_height)) + } + + Ok(db_after_execution.into_changes()) +} diff --git a/crates/services/importer/src/importer/test.rs b/crates/services/importer/src/importer/test.rs index 3b07d82ebbe..4160111000a 100644 --- a/crates/services/importer/src/importer/test.rs +++ b/crates/services/importer/src/importer/test.rs @@ -11,7 +11,10 @@ use crate::{ }; use anyhow::anyhow; use fuel_core_storage::{ - transactional::Changes, + transactional::{ + Changes, + StorageChanges, + }, Error as StorageError, MerkleRoot, Result as StorageResult, @@ -58,6 +61,11 @@ mockall::mock! { fn latest_block_height(&self) -> StorageResult>; fn latest_block_root(&self) -> StorageResult>; + + fn commit_changes( + &mut self, + changes: StorageChanges, + ) -> StorageResult<()>; } } @@ -89,7 +97,7 @@ fn poa_block(height: u32) -> SealedBlock { } } -fn underlying_db(result: R) -> impl Fn() -> MockDatabase +fn underlying_db(result: R, commits: usize) -> impl Fn() -> MockDatabase where R: Fn() -> StorageResult> + Send + Clone + 'static, { @@ -101,15 +109,14 @@ where .returning(move || result_height().map(|v| v.map(Into::into))); db.expect_latest_block_root() .returning(move || result_root().map(|v| v.map(u32_to_merkle_root))); + db.expect_commit_changes() + .times(commits) + .returning(|_| Ok(())); db } } -fn db_transaction( - height: H, - store_block: B, - commits: usize, -) -> impl Fn() -> MockDatabaseTransaction +fn db_transaction(height: H, store_block: B) -> impl Fn() -> MockDatabaseTransaction where H: Fn() -> StorageResult> + Send + Clone + 'static, B: Fn() -> StorageResult + Send + Clone + 'static, @@ -122,7 +129,7 @@ where .returning(move || height().map(|v| v.map(u32_to_merkle_root))); db.expect_store_new_block() .returning(move |_, _| store_block()); - db.expect_commit().times(commits).returning(|| Ok(())); + db.expect_into_changes().returning(Changes::default); db } } @@ -191,43 +198,43 @@ where //////////////// //////////// Genesis Block /////////// //////////////// #[test_case( genesis(0), - underlying_db(ok(None)), - db_transaction(ok(None), ok(true), 1) + underlying_db(ok(None), 1), + db_transaction(ok(None), ok(true)) => Ok(()); "successfully imports genesis block when latest block not found" )] #[test_case( genesis(113), - underlying_db(ok(None)), - db_transaction(ok(None), ok(true), 1) + underlying_db(ok(None), 1), + db_transaction(ok(None), ok(true)) => Ok(()); "successfully imports block at arbitrary height when executor db expects it and last block not found" )] #[test_case( genesis(0), - underlying_db(storage_failure), - db_transaction(ok(Some(0)), ok(true), 0) + underlying_db(storage_failure, 0), + db_transaction(ok(Some(0)), ok(true)) => Err(storage_failure_error()); "fails to import genesis when underlying database fails" )] #[test_case( genesis(0), - underlying_db(ok(Some(0))), - db_transaction(ok(Some(0)), ok(true), 0) + underlying_db(ok(Some(0)), 0), + db_transaction(ok(Some(0)), ok(true)) => Err(Error::InvalidUnderlyingDatabaseGenesisState); "fails to import genesis block when already exists" )] #[test_case( genesis(1), - underlying_db(ok(None)), - db_transaction(ok(Some(0)), ok(true), 0) + underlying_db(ok(None), 0), + db_transaction(ok(Some(0)), ok(true)) => Err(Error::InvalidDatabaseStateAfterExecution(None, Some(u32_to_merkle_root(0)))); "fails to import genesis block when next height is not 0" )] #[test_case( genesis(0), - underlying_db(ok(None)), - db_transaction(ok(None), ok(false), 0) + underlying_db(ok(None), 0), + db_transaction(ok(None), ok(false)) => Err(Error::NotUnique(0u32.into())); "fails to import genesis block when block exists for height 0" )] @@ -243,64 +250,64 @@ async fn commit_result_genesis( //////////////////////////// PoA Block //////////////////////////// #[test_case( poa_block(1), - underlying_db(ok(Some(0))), - db_transaction(ok(Some(0)), ok(true), 1) + underlying_db(ok(Some(0)), 1), + db_transaction(ok(Some(0)), ok(true)) => Ok(()); "successfully imports block at height 1 when latest block is genesis" )] #[test_case( poa_block(113), - underlying_db(ok(Some(112))), - db_transaction(ok(Some(112)), ok(true), 1) + underlying_db(ok(Some(112)), 1), + db_transaction(ok(Some(112)), ok(true)) => Ok(()); "successfully imports block at arbitrary height when latest block height is one fewer and executor db expects it" )] #[test_case( poa_block(0), - underlying_db(ok(Some(0))), - db_transaction(ok(Some(1)), ok(true), 0) + underlying_db(ok(Some(0)), 0), + db_transaction(ok(Some(1)), ok(true)) => Err(Error::ZeroNonGenericHeight); "fails to import PoA block with height 0" )] #[test_case( poa_block(113), - underlying_db(ok(Some(111))), - db_transaction(ok(Some(113)), ok(true), 0) + underlying_db(ok(Some(111)), 0), + db_transaction(ok(Some(113)), ok(true)) => Err(Error::IncorrectBlockHeight(112u32.into(), 113u32.into())); "fails to import block at height 113 when latest block height is 111" )] #[test_case( poa_block(113), - underlying_db(ok(Some(114))), - db_transaction(ok(Some(113)), ok(true), 0) + underlying_db(ok(Some(114)), 0), + db_transaction(ok(Some(113)), ok(true)) => Err(Error::IncorrectBlockHeight(115u32.into(), 113u32.into())); "fails to import block at height 113 when latest block height is 114" )] #[test_case( poa_block(113), - underlying_db(ok(Some(112))), - db_transaction(ok(Some(114)), ok(true), 0) + underlying_db(ok(Some(112)), 0), + db_transaction(ok(Some(114)), ok(true)) => Err(Error::InvalidDatabaseStateAfterExecution(Some(u32_to_merkle_root(112u32)), Some(u32_to_merkle_root(114u32)))); "fails to import block 113 when executor db expects height 114" )] #[test_case( poa_block(113), - underlying_db(ok(Some(112))), - db_transaction(storage_failure, ok(true), 0) + underlying_db(ok(Some(112)), 0), + db_transaction(storage_failure, ok(true)) => Err(storage_failure_error()); "fails to import block when executor db fails to find latest block" )] #[test_case( poa_block(113), - underlying_db(ok(Some(112))), - db_transaction(ok(Some(112)), ok(false), 0) + underlying_db(ok(Some(112)), 0), + db_transaction(ok(Some(112)), ok(false)) => Err(Error::NotUnique(113u32.into())); "fails to import block when block exists" )] #[test_case( poa_block(113), - underlying_db(ok(Some(112))), - db_transaction(ok(Some(112)), storage_failure, 0) + underlying_db(ok(Some(112)), 0), + db_transaction(ok(Some(112)), storage_failure) => Err(storage_failure_error()); "fails to import block when executor db fails to find block" )] @@ -481,10 +488,9 @@ where // databases to always pass the committing part. let expected_height: u32 = (*sealed_block.entity.header().height()).into(); let previous_height = expected_height.checked_sub(1).unwrap_or_default(); - let mut db = underlying_db(ok(Some(previous_height)))(); - db.expect_storage_transaction().return_once(move |_| { - db_transaction(ok(Some(previous_height)), ok(true), commits)() - }); + let mut db = underlying_db(ok(Some(previous_height)), commits)(); + db.expect_storage_transaction() + .return_once(move |_| db_transaction(ok(Some(previous_height)), ok(true))()); let execute_and_commit_result = execute_and_commit_assert( sealed_block, db, diff --git a/crates/services/importer/src/ports.rs b/crates/services/importer/src/ports.rs index a98f88739f0..63469eff64b 100644 --- a/crates/services/importer/src/ports.rs +++ b/crates/services/importer/src/ports.rs @@ -14,6 +14,7 @@ use fuel_core_storage::{ Changes, ConflictPolicy, Modifiable, + StorageChanges, StorageTransaction, WriteTransaction, }, @@ -68,6 +69,9 @@ pub trait ImporterDatabase: Send + Sync { /// Returns the latest block root. fn latest_block_root(&self) -> StorageResult>; + + /// Commit changes + fn commit_changes(&mut self, changes: StorageChanges) -> StorageResult<()>; } /// The port of the storage transaction required by the importer. @@ -89,6 +93,9 @@ pub trait DatabaseTransaction { /// Commits the changes to the underlying storage. fn commit(self) -> StorageResult<()>; + + /// Returns the changes of the transaction. + fn into_changes(self) -> Changes; } #[cfg_attr(test, mockall::automock)] @@ -161,4 +168,8 @@ where self.commit()?; Ok(()) } + + fn into_changes(self) -> Changes { + self.into_changes() + } }