Skip to content

Commit

Permalink
feat(core): check archive size from validator before start downloading
Browse files Browse the repository at this point in the history
  • Loading branch information
pashinov committed Feb 17, 2025
1 parent ec56ab8 commit fd50eae
Showing 1 changed file with 85 additions and 22 deletions.
107 changes: 85 additions & 22 deletions core/src/blockchain_rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use bytesize::ByteSize;
use everscale_types::models::BlockId;
use futures_util::stream::{FuturesUnordered, StreamExt};
use parking_lot::Mutex;
use rand::prelude::SliceRandom;
use scopeguard::ScopeGuard;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
Expand Down Expand Up @@ -480,22 +481,26 @@ impl BlockchainRpcClient {
pub async fn find_archive(&self, mc_seqno: u32) -> Result<PendingArchiveResponse, Error> {
const NEIGHBOUR_COUNT: usize = 10;

// Get reliable neighbours with higher weight
let neighbours = self
.overlay_client()
.neighbours()
.choose_multiple(NEIGHBOUR_COUNT, NeighbourType::Reliable);
let req = Request::from_tl(rpc::GetArchiveInfo { mc_seqno });

// Find a neighbour which has the requested archive
let pending_archive = 'info: {
let req = Request::from_tl(rpc::GetArchiveInfo { mc_seqno });
// Request validators for reliable archive size
let expected_size = 'size: {
// Get random {NEIGHBOUR_COUNT} validators
let default_roundtrip = self.overlay_client().config().neighbors.default_roundtrip;

let validators = self
.overlay_client()
.get_broadcast_targets()
.choose_multiple(&mut rand::thread_rng(), NEIGHBOUR_COUNT)
.map(|x| Neighbour::new(x.peer_id(), u32::MAX, &default_roundtrip))
.collect::<Vec<_>>();

// Number of ArchiveInfo::TooNew responses
let mut new_archive_count = 0usize;

let mut futures = FuturesUnordered::new();
for neighbour in neighbours {
futures.push(self.overlay_client().query_raw(neighbour, req.clone()));
for validator in validators {
futures.push(self.overlay_client().query_raw(validator, req.clone()));
}

let mut err = None;
Expand All @@ -509,18 +514,7 @@ impl BlockchainRpcClient {
};

match info {
ArchiveInfo::Found {
id,
size,
chunk_size,
} => {
break 'info PendingArchive {
id,
size,
chunk_size,
neighbour: handle.accept(),
}
}
ArchiveInfo::Found { size, .. } => break 'size size,
ArchiveInfo::TooNew => {
new_archive_count += 1;

Expand All @@ -545,13 +539,82 @@ impl BlockchainRpcClient {
};
};

let is_reliable_size = |left: u64, right: u64| -> bool {
// Allow 5% deviation
const RATIO: f64 = 0.05;

let max = left.max(right);
let min = left.min(right);

let difference = max - min;
let threshold = (max as f64 * RATIO) as u64;

threshold > difference
};

// Find a neighbour which has the requested archive
let pending_archive = 'info: {
// Get reliable neighbours with higher weight
let neighbours = self
.overlay_client()
.neighbours()
.choose_multiple(NEIGHBOUR_COUNT, NeighbourType::Reliable);

let mut futures = FuturesUnordered::new();
for neighbour in neighbours {
futures.push(self.overlay_client().query_raw(neighbour, req.clone()));
}

let mut err = None;
while let Some(info) = futures.next().await {
let (handle, info) = match info {
Ok(res) => res.split(),
Err(e) => {
err = Some(e);
continue;
}
};

match info {
ArchiveInfo::Found {
id,
size,
chunk_size,
} => match is_reliable_size(expected_size.get(), size.get()) {
true => {
break 'info PendingArchive {
id,
size,
chunk_size,
neighbour: handle.accept(),
}
}
false => {
handle.reject();
continue;
}
},
ArchiveInfo::TooNew | ArchiveInfo::NotFound => {
handle.accept();
continue;
}
}
}

return match err {
None => Err(Error::NotFound),
Some(err) => Err(err),
};
};

tracing::info!(
peer_id = %pending_archive.neighbour.peer_id(),
archive_id = pending_archive.id,
archive_size = %ByteSize(pending_archive.size.get()),
archuve_chunk_size = %ByteSize(pending_archive.chunk_size.get() as _),
"found archive",
);

Ok(PendingArchiveResponse::Found(pending_archive))
}

Expand Down

0 comments on commit fd50eae

Please sign in to comment.