Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sync issue related changes #6

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions substrate/client/network/sync/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,8 @@ impl<B: BlockT> BlockCollection<B> {
};
// crop to peers best
if range.start > peer_best {
trace!(target: "sync", "Out of range for peer {} ({} vs {})", who, range.start, peer_best);
trace!(target: "sync", "Out of range for peer {who}, peer_common = {common}, \
peer_best = {peer_best}, range = {range:?}");
return None
}
range.end = cmp::min(peer_best + One::one(), range.end);
Expand All @@ -157,7 +158,8 @@ impl<B: BlockT> BlockCollection<B> {
.next()
.map_or(false, |(n, _)| range.start > *n + max_ahead.into())
{
trace!(target: "sync", "Too far ahead for peer {} ({})", who, range.start);
trace!(target: "sync", "Too far ahead for peer {who}, peer_common = {common}, \
peer_best = {peer_best}, range = {range:?}, max_ahead = {max_ahead}");
return None
}

Expand Down
88 changes: 70 additions & 18 deletions substrate/client/network/sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,22 @@ where
return
}

let mut is_reorg = false;
if is_best {
if peer.best_number == number && peer.best_hash != hash {
trace!(
target: "sync",
"Reorg detected on block announce from {}: {}, hash {} -> {}, \
known_parent = {}, {:?}",
who,
number,
peer.best_hash,
hash,
known_parent,
announce.summary(),
);
is_reorg = true;
}
// update their best block
peer.best_number = number;
peer.best_hash = hash;
Expand All @@ -1107,7 +1122,11 @@ where
// If the announced block is the best they have and is not ahead of us, our common number
// is either one further ahead or it's the one they just announced, if we know about it.
if is_best {
if known && self.best_queued_number >= number {
if is_reorg && known_parent {
// If the peer announced different hash for same block number,
// fall back to the parent as the common ancestor.
peer.common_number = number.saturating_sub(One::one());
} else if known && self.best_queued_number >= number {
self.update_peer_common_number(&who, number);
} else if announce.header.parent_hash() == &self.best_queued_hash ||
known_parent && self.best_queued_number >= number
Expand Down Expand Up @@ -1561,12 +1580,27 @@ where
// the start block has a parent on chain.
let parent_on_chain =
self.blocks.first_ready_block_header(start_block).map_or(false, |hdr| {
std::matches!(
self.block_status(hdr.parent_hash()).unwrap_or(BlockStatus::Unknown),
let block_status = self.block_status(hdr.parent_hash());
let parent_on_chain = std::matches!(
*block_status.as_ref().unwrap_or(&BlockStatus::Unknown),
BlockStatus::InChainWithState |
BlockStatus::InChainPruned |
BlockStatus::Queued
)
);
if !parent_on_chain {
trace!(
target: LOG_TARGET,
"Ready block parent not on chain: block_status={:?}, best_queued={}/{}, \
first_ready={}/{}, first_ready_parent={}",
block_status,
self.best_queued_number,
self.best_queued_hash,
*hdr.number(),
hdr.hash(),
hdr.parent_hash()
);
}
parent_on_chain
});

if !parent_on_chain {
Expand Down Expand Up @@ -3881,6 +3915,7 @@ mod test {
let non_canonical_chain_length = common_ancestor + 3;
let canonical_chain_length = common_ancestor + max_blocks_per_request + 10;

let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let mut client = Arc::new(TestClientBuilder::new().build());
Expand Down Expand Up @@ -3908,22 +3943,30 @@ mod test {
.collect::<Vec<_>>()
};

let mut sync = ChainSync::new(
SyncMode::Full,
let (mut sync, _) = ChainSync::new(
Arc::new(Atomic::new(SyncMode::Full)),
client.clone(),
ProtocolName::from("test-block-announce-protocol"),
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
1,
max_blocks_per_request,
None,
None,
chain_sync_network_handle,
import_queue,
Arc::new(MockBlockDownloader::new()),
ProtocolName::from("state-request"),
None,
true,
)
.unwrap();

// Connect the node we will sync from
let peer_id = PeerId::random();
let canonical_tip = canonical_blocks.last().unwrap().clone();
let mut request = sync
.new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number())
.new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number(), true)
.unwrap()
.unwrap();
assert_eq!(FromBlock::Number(client.info().best_number), request.from);
Expand Down Expand Up @@ -3968,7 +4011,7 @@ mod test {
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty()
OnBlockData::Import(_, blocks) if blocks.is_empty()
),);

// Gap filled, expect max_blocks_per_request being imported now.
Expand All @@ -3978,7 +4021,7 @@ mod test {
let response = create_block_response(resp_blocks.clone());
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
let to_import: Vec<_> = match &res {
OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => {
OnBlockData::Import(_, blocks) => {
assert_eq!(blocks.len(), sync.max_blocks_per_request as usize);
blocks
.iter()
Expand Down Expand Up @@ -4029,7 +4072,7 @@ mod test {
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize
OnBlockData::Import(_, blocks) if blocks.len() == 10 as usize
),);
let _ = sync.on_blocks_processed(
max_blocks_per_request as usize,
Expand Down Expand Up @@ -4069,6 +4112,7 @@ mod test {
let non_canonical_chain_length = common_ancestor + 3;
let canonical_chain_length = common_ancestor + max_blocks_per_request + 10;

let import_queue = Box::new(sc_consensus::import_queue::mock::MockImportQueueHandle::new());
let (_chain_sync_network_provider, chain_sync_network_handle) =
NetworkServiceProvider::new();
let mut client = Arc::new(TestClientBuilder::new().build());
Expand Down Expand Up @@ -4096,22 +4140,30 @@ mod test {
.collect::<Vec<_>>()
};

let mut sync = ChainSync::new(
SyncMode::Full,
let (mut sync, _) = ChainSync::new(
Arc::new(Atomic::new(SyncMode::Full)),
client.clone(),
ProtocolName::from("test-block-announce-protocol"),
ProtocolId::from("test-protocol-name"),
&Some(String::from("test-fork-id")),
Roles::from(&Role::Full),
1,
max_blocks_per_request,
None,
None,
chain_sync_network_handle,
import_queue,
Arc::new(MockBlockDownloader::new()),
ProtocolName::from("state-request"),
None,
true,
)
.unwrap();

// Connect the node we will sync from
let peer_id = PeerId::random();
let canonical_tip = canonical_blocks.last().unwrap().clone();
let mut request = sync
.new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number())
.new_peer(peer_id, canonical_tip.hash(), *canonical_tip.header().number(), true)
.unwrap()
.unwrap();
assert_eq!(FromBlock::Number(client.info().best_number), request.from);
Expand Down Expand Up @@ -4156,7 +4208,7 @@ mod test {
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.is_empty()
OnBlockData::Import(_, blocks) if blocks.is_empty()
),);

// Gap filled, expect max_blocks_per_request being imported now.
Expand All @@ -4166,7 +4218,7 @@ mod test {
let response = create_block_response(resp_blocks.clone());
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
let to_import: Vec<_> = match &res {
OnBlockData::Import(ImportBlocksAction { origin: _, blocks }) => {
OnBlockData::Import(_, blocks) => {
assert_eq!(blocks.len(), sync.max_blocks_per_request as usize);
blocks
.iter()
Expand Down Expand Up @@ -4217,7 +4269,7 @@ mod test {
let res = sync.on_block_data(&peer_id, Some(request), response).unwrap();
assert!(matches!(
res,
OnBlockData::Import(ImportBlocksAction{ origin: _, blocks }) if blocks.len() == 10 as usize
OnBlockData::Import(_, blocks) if blocks.len() == 10 as usize
),);
let _ = sync.on_blocks_processed(
max_blocks_per_request as usize,
Expand Down
Loading