Skip to content

Commit

Permalink
Block import cleanups (paritytech#4842)
Browse files Browse the repository at this point in the history
I carried these things in a fork for a long time, I think wouldn't hurt
to have it upstream.

Originally submitted as part of
paritytech#1598 that went nowhere.

---------

Co-authored-by: Bastian Köcher <[email protected]>
  • Loading branch information
2 people authored and TarekkMA committed Aug 2, 2024
1 parent 5a6cb86 commit 69ca09b
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 26 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion substrate/client/consensus/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ targets = ["x86_64-unknown-linux-gnu"]
[dependencies]
async-trait = "0.1.79"
futures = { version = "0.3.30", features = ["thread-pool"] }
futures-timer = "3.0.1"
log = { workspace = true, default-features = true }
mockall = "0.11.3"
parking_lot = "0.12.1"
Expand Down
3 changes: 2 additions & 1 deletion substrate/client/consensus/common/src/import_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ pub trait Verifier<B: BlockT>: Send {
///
/// The `import_*` methods can be called in order to send elements for the import queue to verify.
pub trait ImportQueueService<B: BlockT>: Send {
/// Import bunch of blocks.
/// Import bunch of blocks, every next block must be an ancestor of the previous block in the
/// list.
fn import_blocks(&mut self, origin: BlockOrigin, blocks: Vec<IncomingBlock<B>>);

/// Import block justifications.
Expand Down
30 changes: 7 additions & 23 deletions substrate/client/consensus/common/src/import_queue/basic_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use futures::{
prelude::*,
task::{Context, Poll},
};
use futures_timer::Delay;
use log::{debug, trace};
use prometheus_endpoint::Registry;
use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender};
Expand All @@ -28,7 +27,7 @@ use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, NumberFor},
Justification, Justifications,
};
use std::{pin::Pin, time::Duration};
use std::pin::Pin;

use crate::{
import_queue::{
Expand Down Expand Up @@ -224,7 +223,6 @@ async fn block_import_process<B: BlockT>(
mut result_sender: BufferedLinkSender<B>,
mut block_import_receiver: TracingUnboundedReceiver<worker_messages::ImportBlocks<B>>,
metrics: Option<Metrics>,
delay_between_blocks: Duration,
) {
loop {
let worker_messages::ImportBlocks(origin, blocks) = match block_import_receiver.next().await
Expand All @@ -239,15 +237,9 @@ async fn block_import_process<B: BlockT>(
},
};

let res = import_many_blocks(
&mut block_import,
origin,
blocks,
&mut verifier,
delay_between_blocks,
metrics.clone(),
)
.await;
let res =
import_many_blocks(&mut block_import, origin, blocks, &mut verifier, metrics.clone())
.await;

result_sender.blocks_processed(res.imported, res.block_count, res.results);
}
Expand Down Expand Up @@ -276,13 +268,11 @@ impl<B: BlockT> BlockImportWorker<B> {
let (justification_sender, mut justification_port) =
tracing_unbounded("mpsc_import_queue_worker_justification", 100_000);

let (block_import_sender, block_import_port) =
let (block_import_sender, block_import_receiver) =
tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000);

let mut worker = BlockImportWorker { result_sender, justification_import, metrics };

let delay_between_blocks = Duration::default();

let future = async move {
// Let's initialize `justification_import`
if let Some(justification_import) = worker.justification_import.as_mut() {
Expand All @@ -295,9 +285,8 @@ impl<B: BlockT> BlockImportWorker<B> {
block_import,
verifier,
worker.result_sender.clone(),
block_import_port,
block_import_receiver,
worker.metrics.clone(),
delay_between_blocks,
);
futures::pin_mut!(block_import_process);

Expand Down Expand Up @@ -394,7 +383,6 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(
blocks_origin: BlockOrigin,
blocks: Vec<IncomingBlock<B>>,
verifier: &mut V,
delay_between_blocks: Duration,
metrics: Option<Metrics>,
) -> ImportManyBlocksResult<B> {
let count = blocks.len();
Expand Down Expand Up @@ -460,11 +448,7 @@ async fn import_many_blocks<B: BlockT, V: Verifier<B>>(

results.push((import_result, block_hash));

if delay_between_blocks != Duration::default() && !has_error {
Delay::new(delay_between_blocks).await;
} else {
Yield::new().await
}
Yield::new().await
}
}

Expand Down

0 comments on commit 69ca09b

Please sign in to comment.