Skip to content

Commit

Permalink
Add FetchToTarget(BlockNumber) to enum Canstart
Browse files Browse the repository at this point in the history
Signed-off-by: Eval EXEC <[email protected]>
  • Loading branch information
eval-exec committed Dec 21, 2024
1 parent 8508499 commit c146d24
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
8 changes: 6 additions & 2 deletions sync/src/synchronizer/block_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use ckb_network::PeerIndex;
use ckb_shared::block_status::BlockStatus;
use ckb_shared::types::{HeaderIndex, HeaderIndexView};
use ckb_systemtime::unix_time_as_millis;
use ckb_types::core::BlockNumber;
use ckb_types::packed;
use ckb_types::BlockNumberAndHash;
use std::cmp::min;
Expand Down Expand Up @@ -91,7 +92,7 @@ impl BlockFetcher {
Some(last_common)
}

pub fn fetch(self) -> Option<Vec<Vec<packed::Byte32>>> {
pub fn fetch(self, fetch_end: BlockNumber) -> Option<Vec<Vec<packed::Byte32>>> {
let _trace_timecost: Option<HistogramTimer> = {
ckb_metrics::handle().map(|handle| handle.ckb_sync_block_fetch_duration.start_timer())
};
Expand Down Expand Up @@ -186,7 +187,10 @@ impl BlockFetcher {
IBDState::Out => last_common.number() + 1,
}
};
let mut end = min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW);
let mut end = min(
fetch_end,
min(best_known.number(), start + BLOCK_DOWNLOAD_WINDOW),
);
let n_fetch = min(
end.saturating_sub(start) as usize + 1,
state.read_inflight_blocks().peer_can_fetch_count(self.peer),
Expand Down
74 changes: 51 additions & 23 deletions sync/src/synchronizer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ const NOT_IBD_BLOCK_FETCH_INTERVAL: Duration = Duration::from_millis(200);

#[derive(Copy, Clone)]
enum CanStart {
FetchToTarget(BlockNumber),
Ready,
MinWorkNotReach,
AssumeValidNotFound,
Expand All @@ -90,25 +91,34 @@ impl BlockFetchCMD {
fn process_fetch_cmd(&mut self, cmd: FetchCMD) {
let FetchCMD { peers, ibd_state }: FetchCMD = cmd;

match self.can_start() {
CanStart::Ready => {
for peer in peers {
if ckb_stop_handler::has_received_stop_signal() {
return;
}
let fetch_blocks_fn = |cmd: &mut BlockFetchCMD, assume_target: BlockNumber| {
for peer in peers {
if ckb_stop_handler::has_received_stop_signal() {
return;
}

if let Some(fetch) =
BlockFetcher::new(Arc::clone(&self.sync_shared), peer, ibd_state).fetch()
{
for item in fetch {
if ckb_stop_handler::has_received_stop_signal() {
return;
}
BlockFetchCMD::send_getblocks(item, &self.p2p_control, peer);
let mut fetch_end: BlockNumber = u64::MAX;
if assume_target != 0 {
fetch_end = assume_target
}

if let Some(fetch) =
BlockFetcher::new(Arc::clone(&cmd.sync_shared), peer, ibd_state)
.fetch(fetch_end)
{
for item in fetch {
if ckb_stop_handler::has_received_stop_signal() {
return;
}
BlockFetchCMD::send_getblocks(item, &cmd.p2p_control, peer);
}
}
}
};

match self.can_start() {
CanStart::FetchToTarget(assume_target) => fetch_blocks_fn(self, assume_target),
CanStart::Ready => fetch_blocks_fn(self, BlockNumber::MAX),
CanStart::MinWorkNotReach => {
let best_known = self.sync_shared.state().shared_best_header_ref();
let number = best_known.number();
Expand All @@ -129,8 +139,9 @@ impl BlockFetchCMD {
let best_known = state.shared_best_header_ref();
let number = best_known.number();
let assume_valid_target: Byte32 = shared
.assume_valid_target()
.assume_valid_targets()
.as_ref()
.and_then(|targets| targets.first())
.map(Pack::pack)
.expect("assume valid target must exist");

Expand Down Expand Up @@ -234,15 +245,28 @@ impl BlockFetchCMD {
};

let assume_valid_target_find = |flag: &mut CanStart| {
let mut assume_valid_target = shared.assume_valid_target();
if let Some(ref target) = *assume_valid_target {
match shared.header_map().get(&target.pack()) {
let mut assume_valid_targets = shared.assume_valid_targets();
if let Some(ref targets) = *assume_valid_targets {
if targets.is_empty() {
assume_valid_targets.take();
*flag = CanStart::Ready;
return;
}
let first_target = targets
.first()
.expect("has checked targets is not empty, assume valid target must exist");
match shared.header_map().get(&first_target.pack()) {
Some(header) => {
*flag = CanStart::Ready;
info!("assume valid target found in header_map; CKB will start fetch blocks now");
if matches!(*flag, CanStart::FetchToTarget(fetch_target) if fetch_target == header.number())
{
// BlockFetchCMD has set the fetch target, no need to set it again
} else {
*flag = CanStart::FetchToTarget(header.number());
info!("assume valid target found in header_map; CKB will start fetch blocks to {:?} now", header.number_and_hash());
}
// Blocks that are no longer in the scope of ibd must be forced to verify
if unix_time_as_millis().saturating_sub(header.timestamp()) < MAX_TIP_AGE {
assume_valid_target.take();
assume_valid_targets.take();
warn!("the duration gap between 'assume valid target' and 'now' is less than 24h; CKB will ignore the specified assume valid target and do full verification from now on");
}
}
Expand All @@ -254,7 +278,7 @@ impl BlockFetchCMD {
{
warn!("the duration gap between 'shared_best_header' and 'now' is less than 24h, but CKB haven't found the assume valid target in header_map; CKB will ignore the specified assume valid target and do full verification from now on");
*flag = CanStart::Ready;
assume_valid_target.take();
assume_valid_targets.take();
}
}
}
Expand All @@ -264,6 +288,10 @@ impl BlockFetchCMD {
};

match self.can_start {
CanStart::FetchToTarget(_) => {
assume_valid_target_find(&mut self.can_start);
self.can_start
}
CanStart::Ready => self.can_start,
CanStart::MinWorkNotReach => {
min_work_reach(&mut self.can_start);
Expand Down Expand Up @@ -453,7 +481,7 @@ impl Synchronizer {
peer: PeerIndex,
ibd: IBDState,
) -> Option<Vec<Vec<packed::Byte32>>> {
BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch()
BlockFetcher::new(Arc::clone(&self.shared), peer, ibd).fetch(BlockNumber::MAX)
}

pub(crate) fn on_connected(&self, nc: &dyn CKBProtocolContext, peer: PeerIndex) {
Expand Down

0 comments on commit c146d24

Please sign in to comment.