Skip to content

Commit 0691c91

Browse files
Move import queue from ChainSync to SyncingEngine (#1736)
This PR is part of [Sync 2.0](#534) refactoring aimed at making `ChainSync` a pure state machine. Resolves #501.
1 parent d8d90a8 commit 0691c91

File tree

4 files changed

+176
-171
lines changed

4 files changed

+176
-171
lines changed

substrate/client/network/common/src/sync.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,18 @@ impl fmt::Display for BadPeer {
116116

117117
impl std::error::Error for BadPeer {}
118118

119+
/// Action that the parent of [`ChainSync`] should perform if we want to import blocks.
120+
#[derive(Debug, Clone, PartialEq, Eq)]
121+
pub struct ImportBlocksAction<B: BlockT> {
122+
pub origin: BlockOrigin,
123+
pub blocks: Vec<IncomingBlock<B>>,
124+
}
125+
119126
/// Result of [`ChainSync::on_block_data`].
120127
#[derive(Debug, Clone, PartialEq, Eq)]
121128
pub enum OnBlockData<Block: BlockT> {
122129
/// The block should be imported.
123-
Import(BlockOrigin, Vec<IncomingBlock<Block>>),
130+
Import(ImportBlocksAction<Block>),
124131
/// A new block request needs to be made to the given peer.
125132
Request(PeerId, BlockRequest<Block>),
126133
/// Continue processing events.
@@ -134,7 +141,7 @@ pub enum OnBlockJustification<Block: BlockT> {
134141
Nothing,
135142
/// The justification should be imported.
136143
Import {
137-
peer: PeerId,
144+
peer_id: PeerId,
138145
hash: Block::Hash,
139146
number: NumberFor<Block>,
140147
justifications: Justifications,
@@ -309,6 +316,7 @@ pub trait ChainSync<Block: BlockT>: Send {
309316
/// Handle a new connected peer.
310317
///
311318
/// Call this method whenever we connect to a new peer.
319+
#[must_use]
312320
fn new_peer(
313321
&mut self,
314322
who: PeerId,
@@ -340,6 +348,7 @@ pub trait ChainSync<Block: BlockT>: Send {
340348
///
341349
/// If this corresponds to a valid block, this outputs the block that
342350
/// must be imported in the import queue.
351+
#[must_use]
343352
fn on_block_data(
344353
&mut self,
345354
who: &PeerId,
@@ -350,6 +359,7 @@ pub trait ChainSync<Block: BlockT>: Send {
350359
/// Handle a response from the remote to a justification request that we made.
351360
///
352361
/// `request` must be the original request that triggered `response`.
362+
#[must_use]
353363
fn on_block_justification(
354364
&mut self,
355365
who: PeerId,
@@ -379,7 +389,8 @@ pub trait ChainSync<Block: BlockT>: Send {
379389
/// Call when a peer has disconnected.
380390
/// Canceled obsolete block request may result in some blocks being ready for
381391
/// import, so this functions checks for such blocks and returns them.
382-
fn peer_disconnected(&mut self, who: &PeerId);
392+
#[must_use]
393+
fn peer_disconnected(&mut self, who: &PeerId) -> Option<ImportBlocksAction<Block>>;
383394

384395
/// Return some key metrics.
385396
fn metrics(&self) -> Metrics;

substrate/client/network/sync/src/engine.rs

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ use crate::{
2828
schema::v1::{StateRequest, StateResponse},
2929
service::{self, chain_sync::ToServiceCommand},
3030
warp::WarpSyncParams,
31-
BlockRequestEvent, ChainSync, ClientError, SyncingService,
31+
BlockRequestAction, ChainSync, ClientError, ImportBlocksAction, ImportJustificationsAction,
32+
OnBlockResponse, SyncingService,
3233
};
3334

3435
use codec::{Decode, Encode};
@@ -41,7 +42,8 @@ use futures_timer::Delay;
4142
use libp2p::{request_response::OutboundFailure, PeerId};
4243
use log::{debug, trace};
4344
use prometheus_endpoint::{
44-
register, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry, SourcedGauge, U64,
45+
register, Counter, Gauge, GaugeVec, MetricSource, Opts, PrometheusError, Registry,
46+
SourcedGauge, U64,
4547
};
4648
use prost::Message;
4749
use schnellru::{ByLength, LruMap};
@@ -135,6 +137,8 @@ struct Metrics {
135137
queued_blocks: Gauge<U64>,
136138
fork_targets: Gauge<U64>,
137139
justifications: GaugeVec<U64>,
140+
import_queue_blocks_submitted: Counter<U64>,
141+
import_queue_justifications_submitted: Counter<U64>,
138142
}
139143

140144
impl Metrics {
@@ -164,6 +168,20 @@ impl Metrics {
164168
)?;
165169
register(g, r)?
166170
},
171+
import_queue_blocks_submitted: {
172+
let c = Counter::new(
173+
"substrate_sync_import_queue_blocks_submitted",
174+
"Number of blocks submitted to the import queue.",
175+
)?;
176+
register(c, r)?
177+
},
178+
import_queue_justifications_submitted: {
179+
let c = Counter::new(
180+
"substrate_sync_import_queue_justifications_submitted",
181+
"Number of justifications submitted to the import queue.",
182+
)?;
183+
register(c, r)?
184+
},
167185
})
168186
}
169187
}
@@ -311,6 +329,9 @@ pub struct SyncingEngine<B: BlockT, Client> {
311329

312330
/// Protocol name used to send out warp sync requests
313331
warp_sync_protocol_name: Option<ProtocolName>,
332+
333+
/// Handle to import queue.
334+
import_queue: Box<dyn ImportQueueService<B>>,
314335
}
315336

316337
impl<B: BlockT, Client> SyncingEngine<B, Client>
@@ -436,9 +457,7 @@ where
436457
max_parallel_downloads,
437458
max_blocks_per_request,
438459
warp_sync_config,
439-
metrics_registry,
440460
network_service.clone(),
441-
import_queue,
442461
)?;
443462

444463
let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000);
@@ -501,6 +520,7 @@ where
501520
block_downloader,
502521
state_request_protocol_name,
503522
warp_sync_protocol_name,
523+
import_queue,
504524
},
505525
SyncingService::new(tx, num_connected, is_major_syncing),
506526
block_announce_config,
@@ -728,13 +748,13 @@ where
728748
ToServiceCommand::BlocksProcessed(imported, count, results) => {
729749
for result in self.chain_sync.on_blocks_processed(imported, count, results) {
730750
match result {
731-
Ok(event) => match event {
732-
BlockRequestEvent::SendRequest { peer_id, request } => {
751+
Ok(action) => match action {
752+
BlockRequestAction::SendRequest { peer_id, request } => {
733753
// drop obsolete pending response first
734754
self.pending_responses.remove(&peer_id);
735755
self.send_block_request(peer_id, request);
736756
},
737-
BlockRequestEvent::RemoveStale { peer_id } => {
757+
BlockRequestAction::RemoveStale { peer_id } => {
738758
self.pending_responses.remove(&peer_id);
739759
},
740760
},
@@ -922,7 +942,10 @@ where
922942
}
923943
}
924944

925-
self.chain_sync.peer_disconnected(&peer_id);
945+
if let Some(import_blocks_action) = self.chain_sync.peer_disconnected(&peer_id) {
946+
self.import_blocks(import_blocks_action)
947+
}
948+
926949
self.pending_responses.remove(&peer_id);
927950
self.event_streams.retain(|stream| {
928951
stream.unbounded_send(SyncEvent::PeerDisconnected(peer_id)).is_ok()
@@ -1181,10 +1204,14 @@ where
11811204
PeerRequest::Block(req) => {
11821205
match self.block_downloader.block_response_into_blocks(&req, resp) {
11831206
Ok(blocks) => {
1184-
if let Some((peer_id, new_req)) =
1185-
self.chain_sync.on_block_response(peer_id, req, blocks)
1186-
{
1187-
self.send_block_request(peer_id, new_req);
1207+
match self.chain_sync.on_block_response(peer_id, req, blocks) {
1208+
OnBlockResponse::SendBlockRequest { peer_id, request } =>
1209+
self.send_block_request(peer_id, request),
1210+
OnBlockResponse::ImportBlocks(import_blocks_action) =>
1211+
self.import_blocks(import_blocks_action),
1212+
OnBlockResponse::ImportJustifications(action) =>
1213+
self.import_justifications(action),
1214+
OnBlockResponse::Nothing => {},
11881215
}
11891216
},
11901217
Err(BlockResponseError::DecodeFailed(e)) => {
@@ -1230,7 +1257,11 @@ where
12301257
},
12311258
};
12321259

1233-
self.chain_sync.on_state_response(peer_id, response);
1260+
if let Some(import_blocks_action) =
1261+
self.chain_sync.on_state_response(peer_id, response)
1262+
{
1263+
self.import_blocks(import_blocks_action);
1264+
}
12341265
},
12351266
PeerRequest::WarpProof => {
12361267
self.chain_sync.on_warp_sync_response(peer_id, EncodedProof(resp));
@@ -1337,4 +1368,24 @@ where
13371368
},
13381369
}
13391370
}
1371+
1372+
/// Import blocks.
1373+
fn import_blocks(&mut self, ImportBlocksAction { origin, blocks }: ImportBlocksAction<B>) {
1374+
if let Some(metrics) = &self.metrics {
1375+
metrics.import_queue_blocks_submitted.inc();
1376+
}
1377+
1378+
self.import_queue.import_blocks(origin, blocks);
1379+
}
1380+
1381+
/// Import justifications.
1382+
fn import_justifications(&mut self, action: ImportJustificationsAction<B>) {
1383+
if let Some(metrics) = &self.metrics {
1384+
metrics.import_queue_justifications_submitted.inc();
1385+
}
1386+
1387+
let ImportJustificationsAction { peer_id, hash, number, justifications } = action;
1388+
1389+
self.import_queue.import_justifications(peer_id, hash, number, justifications);
1390+
}
13401391
}

0 commit comments

Comments
 (0)