Skip to content

Parallelise block/transaction changes creation in Importer #2618

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 60 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
5c822da
Parallelise encoding to changes of the block and transaction data
AurelienFT Jan 22, 2025
aeb0f7b
Add CHANGELOG.md
AurelienFT Jan 22, 2025
9c08308
Update order check error in commit importer
AurelienFT Jan 22, 2025
b854452
Add possibility to submit a list of changes to rocksdb
AurelienFT Jan 22, 2025
3134fb0
Special case history rocksdb and changelog
AurelienFT Jan 23, 2025
8db1be9
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Jan 23, 2025
7b06b0d
Make sure we don't create a transaction if not needed & Clippy
AurelienFT Jan 23, 2025
e5d4ef9
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 23, 2025
50a672b
Use new list of changes feature in importer
AurelienFT Jan 23, 2025
55edadd
add not unique check
AurelienFT Jan 24, 2025
f8e001f
Change `commit_changes_with_height_update` to work with StorageChanges
AurelienFT Jan 24, 2025
91f9e2e
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
b92cbd8
fix compilation relayer feature
AurelienFT Jan 24, 2025
f78d496
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
ab75ebb
Update importer to use the correct commit function
AurelienFT Jan 24, 2025
fb5c157
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Jan 24, 2025
6a95719
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
81e7c12
Fix changelog
AurelienFT Jan 24, 2025
6f50c51
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
46c83e4
changelog
AurelienFT Jan 24, 2025
aba1fec
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
3a67be5
Fix test compilation and clippy
AurelienFT Jan 24, 2025
60c5e03
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
9171473
fmt
AurelienFT Jan 24, 2025
f8e39c4
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 24, 2025
ed3d6cd
fix importer mock
AurelienFT Jan 24, 2025
6ac67f1
fix importer tests
AurelienFT Jan 24, 2025
6f7f24b
Fix clippy
AurelienFT Jan 24, 2025
7c37e56
update changes returned by Importer
AurelienFT Jan 27, 2025
742a0e3
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Jan 27, 2025
4e69661
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Jan 27, 2025
1dfaa47
Refactor `iter_store_keys` (#2651)
MitchTurner Jan 29, 2025
d5b5265
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Jan 30, 2025
965d208
Merge branch 'master' into take_vec_changes_rocksdb
MitchTurner Jan 31, 2025
ca80eea
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Feb 3, 2025
eaf2c57
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Feb 3, 2025
f89556c
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Feb 11, 2025
fe267c5
Update CHANGELOG.md
AurelienFT Feb 11, 2025
b6eeb6a
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Feb 19, 2025
07c6855
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Feb 19, 2025
f0e47f4
Add changelog
AurelienFT Feb 19, 2025
6453efe
Handle conflicts in the case of multiple changes
xgreenx Feb 20, 2025
a2db262
Small things from the final review
xgreenx Feb 20, 2025
b74c791
Merge branch 'master' into take_vec_changes_rocksdb
xgreenx Feb 20, 2025
fc49831
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
xgreenx Feb 20, 2025
c70e803
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Feb 20, 2025
42cf2b8
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Feb 20, 2025
234fedc
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Feb 20, 2025
766edce
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Feb 22, 2025
bd319f1
Dedicated threads for the block importer (#2750)
xgreenx Feb 24, 2025
e738d0f
Merge branch 'master' into take_vec_changes_rocksdb
AurelienFT Feb 24, 2025
509aef3
Merge branch 'take_vec_changes_rocksdb' into parallelise_importer
AurelienFT Feb 24, 2025
b54ba66
Merge branch 'master' into parallelise_importer
AurelienFT Feb 24, 2025
c733269
Merge branch 'master' into parallelise_importer
xgreenx Feb 24, 2025
615ecc5
Merge branch 'master' into parallelise_importer
xgreenx Feb 24, 2025
64e338a
Merge branch 'master' into parallelise_importer
AurelienFT Feb 25, 2025
030cab6
Update crates/services/importer/src/local_runner.rs
AurelienFT Feb 25, 2025
1985aa6
Update crates/services/importer/src/local_runner.rs
AurelienFT Feb 25, 2025
6322689
Update crates/services/importer/src/local_runner.rs
AurelienFT Feb 25, 2025
55b96fc
fmt
AurelienFT Feb 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .changes/changed/2618.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Parallelize block/transaction changes creation in Importer
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,7 @@ pub struct BlockProducerAdapter {

#[derive(Clone)]
pub struct BlockImporterAdapter {
pub block_importer:
Arc<fuel_core_importer::Importer<Database, ExecutorAdapter, VerifierAdapter>>,
pub block_importer: Arc<fuel_core_importer::Importer>,
}

impl BlockImporterAdapter {
Expand Down
22 changes: 16 additions & 6 deletions crates/fuel-core/src/service/adapters/block_importer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use crate::{
database::Database,
database::{
commit_changes_with_height_update,
Database,
},
service::adapters::{
BlockImporterAdapter,
ExecutorAdapter,
Expand Down Expand Up @@ -27,7 +30,10 @@ use fuel_core_storage::{
},
FuelBlocks,
},
transactional::Changes,
transactional::{
Changes,
StorageChanges,
},
MerkleRoot,
Result as StorageResult,
StorageAsRef,
Expand All @@ -52,6 +58,7 @@ use fuel_core_types::{
UncommittedValidationResult,
},
};
use itertools::Itertools;
use std::sync::Arc;

impl BlockImporterAdapter {
Expand All @@ -62,11 +69,7 @@ impl BlockImporterAdapter {
executor: ExecutorAdapter,
verifier: VerifierAdapter,
) -> Self {
let metrics = config.metrics;
let importer = Importer::new(chain_id, config, database, executor, verifier);
if metrics {
importer.init_metrics();
}
Self {
block_importer: Arc::new(importer),
}
Expand Down Expand Up @@ -104,6 +107,13 @@ impl ImporterDatabase for Database {
.get(&DenseMetadataKey::Latest)?
.map(|cow| *cow.root()))
}

fn commit_changes(&mut self, changes: StorageChanges) -> StorageResult<()> {
commit_changes_with_height_update(self, changes, |iter| {
iter.iter_all_keys::<FuelBlocks>(Some(IterDirection::Reverse))
.try_collect()
})
}
}

impl Validator for ExecutorAdapter {
Expand Down
9 changes: 7 additions & 2 deletions crates/fuel-core/src/service/genesis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ pub async fn execute_and_commit_genesis_block(
config: &Config,
db: &CombinedDatabase,
) -> anyhow::Result<()> {
use fuel_core_importer::ports::{
MockBlockVerifier,
MockValidator,
};

let result = execute_genesis_block(StateWatcher::default(), config, db).await?;
let importer = fuel_core_importer::Importer::new(
config
Expand All @@ -215,8 +220,8 @@ pub async fn execute_and_commit_genesis_block(
.chain_id(),
config.block_importer.clone(),
db.on_chain().clone(),
(),
(),
MockValidator::default(),
MockBlockVerifier::default(),
);
importer.commit_result(result).await?;
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions crates/services/importer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,19 @@ derive_more = { workspace = true }
fuel-core-metrics = { workspace = true }
fuel-core-storage = { workspace = true, features = ["std"] }
fuel-core-types = { workspace = true, features = ["std"] }
parking_lot = { workspace = true }
mockall = { workspace = true, optional = true }
rayon = { workspace = true }
tokio = { workspace = true, features = ["full"] }
tracing = { workspace = true }

[dev-dependencies]
fuel-core-trace = { path = "./../../trace" }
fuel-core-types = { path = "./../../types", features = ["test-helpers"] }
mockall = { workspace = true }
test-case = { workspace = true }

[features]
test-helpers = [
"dep:mockall",
"fuel-core-types/test-helpers",
"fuel-core-storage/test-helpers",
]
Expand Down
81 changes: 81 additions & 0 deletions crates/services/importer/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use fuel_core_storage::{
Error as StorageError,
MerkleRoot,
};
use fuel_core_types::{
blockchain::primitives::BlockId,
fuel_types::BlockHeight,
services::executor,
};
use tokio::sync::{
mpsc,
oneshot,
TryAcquireError,
};

#[derive(Debug, derive_more::Display, derive_more::From)]
pub enum Error {
#[display(fmt = "The commit is already in the progress: {_0}.")]
Semaphore(TryAcquireError),
#[display(
fmt = "The wrong state of database during insertion of the genesis block."
)]
InvalidUnderlyingDatabaseGenesisState,
#[display(fmt = "The wrong state of storage after execution of the block.\
The actual root is {_1:?}, when the expected root is {_0:?}.")]
InvalidDatabaseStateAfterExecution(Option<MerkleRoot>, Option<MerkleRoot>),
#[display(fmt = "Got overflow during increasing the height.")]
Overflow,
#[display(fmt = "The non-generic block can't have zero height.")]
ZeroNonGenericHeight,
#[display(fmt = "The actual height is {_1}, when the next expected height is {_0}.")]
IncorrectBlockHeight(BlockHeight, BlockHeight),
#[display(
fmt = "Got another block id after validation of the block. Expected {_0} != Actual {_1}"
)]
BlockIdMismatch(BlockId, BlockId),
#[display(fmt = "Some of the block fields are not valid: {_0}.")]
FailedVerification(anyhow::Error),
#[display(fmt = "The execution of the block failed: {_0}.")]
FailedExecution(executor::Error),
#[display(fmt = "It is not possible to execute the genesis block.")]
ExecuteGenesis,
#[display(fmt = "The database already contains the data at the height {_0}.")]
NotUnique(BlockHeight),
#[display(fmt = "The previous block processing is not finished yet.")]
PreviousBlockProcessingNotFinished,
#[display(fmt = "The send command to the inner task failed.")]
SendCommandToInnerTaskFailed,
#[display(fmt = "The inner import task is not running.")]
InnerTaskIsNotRunning,
#[from]
Storage(StorageError),
UnsupportedConsensusVariant(String),
ActiveBlockResultsSemaphoreClosed(tokio::sync::AcquireError),
RayonTaskWasCanceled,
}

impl From<Error> for anyhow::Error {
fn from(error: Error) -> Self {
anyhow::Error::msg(error)
}
}

impl From<oneshot::error::RecvError> for Error {
fn from(_: oneshot::error::RecvError) -> Self {
Error::InnerTaskIsNotRunning
}
}

impl<T> From<mpsc::error::SendError<T>> for Error {
fn from(_: mpsc::error::SendError<T>) -> Self {
Error::SendCommandToInnerTaskFailed
}
}

#[cfg(test)]
impl PartialEq for Error {
fn eq(&self, other: &Self) -> bool {
format!("{self}") == format!("{other}")
}
}
Loading
Loading