Skip to content

Commit

Permalink
fix(consensus): exponential download again
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed Jul 16, 2024
1 parent 3b91a1d commit 9d57d7a
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 53 deletions.
10 changes: 5 additions & 5 deletions consensus/src/engine/mempool_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ impl MempoolConfig {
/// hard limit on cached external messages ring buffer, see [`Self::PAYLOAD_BATCH_BYTES`]
pub const PAYLOAD_BUFFER_BYTES: usize = 50 * 1024 * 1024;

/// should not be less than 3 (as in average 1 of 3 is unreliable and another one did not sign);
/// value is multiplied by the current attempt number, until 2F+1 successfully responded
/// or a verifiable point is found (ill-formed or incorrectly signed points are not eligible)
pub const DOWNLOAD_PEERS: u8 = 5;
/// amount of random peers to request at each attempt; does not include mandatory peers;
/// value increases exponentially with each attempt, until 2F successfully responded `None`
/// or a verifiable point is found (ill-formed or incorrectly signed points do not count)
pub const DOWNLOAD_PEERS: u8 = 2;

/// [`Downloader`](crate::intercom::Downloader) makes responses in groups after previous
/// group completed or this interval elapsed (in order to not wait for some slow responding peer)
Expand All @@ -70,7 +70,7 @@ impl MempoolConfig {
///
/// Notice that reliable peers respond immediately with points they already have
/// validated successfully, or return `None`.
pub const DOWNLOAD_INTERVAL: Duration = Duration::from_millis(30);
pub const DOWNLOAD_INTERVAL: Duration = Duration::from_millis(25);
}

const _: () = assert!(
Expand Down
90 changes: 42 additions & 48 deletions consensus/src/intercom/dependency/downloader.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::hash_map::Entry;
use std::iter;
use std::sync::Arc;

use futures_util::future::BoxFuture;
Expand All @@ -7,6 +7,7 @@ use futures_util::{FutureExt, StreamExt};
use rand::{thread_rng, RngCore};
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, mpsc, oneshot};
use tokio::time::{Interval, MissedTickBehavior};
use tracing::Instrument;
use tycho_network::PeerId;
use tycho_util::metrics::HistogramGuard;
Expand Down Expand Up @@ -84,29 +85,23 @@ impl Downloader {
};
let peer_count = PeerCount::try_from(undone_peers.len())
.expect("validator set is unknown, must keep prev epoch's set for DAG_DEPTH rounds");
let mut undone_peers = undone_peers
let undone_peers = undone_peers
.iter()
// query author no matter if it is scheduled for the next round or not;
// it won't affect 2F reliable `NotFound`s to break the task with `DagPoint::NotExists`:
// author is a depender for its point, so its `NotFound` response is not reliable
.chain(iter::once((&point_id.location.author, &author_state)))
.map(|(peer_id, state)| {
(*peer_id, PeerStatus {
let status = PeerStatus {
state: *state,
failed_queries: 0,
is_depender: peer_id == point_id.location.author,
is_depender: false, // `true` comes from channel to start immediate download
is_in_flight: false,
})
};
(*peer_id, status)
})
.collect::<FastHashMap<_, _>>();
// query author no matter if it is in the next round, but that won't affect 2F "NotFound"
match undone_peers.entry(point_id.location.author) {
Entry::Vacant(vacant) if author_state == PeerState::Resolved => {
vacant.insert(PeerStatus {
state: author_state,
failed_queries: 0,
is_depender: true, // as author is a depender, its 'NotFound' is not reliable
is_in_flight: false,
});
}
_ => {}
};

let point_dag_round = point_dag_round_strong.downgrade();
// do not leak span and strong round ref across await
drop(point_dag_round_strong);
Expand All @@ -124,7 +119,7 @@ impl Downloader {
undone_peers,
downloading: FuturesUnordered::new(),
attempt: 0,
skip_next_attempt: false,
interval: tokio::time::interval(MempoolConfig::DOWNLOAD_INTERVAL),
};
let downloaded = task
.run(verified_broadcast)
Expand Down Expand Up @@ -187,24 +182,25 @@ struct DownloadTask {

attempt: u8,
/// skip time-driven attempt if an attempt was init by empty task queue
skip_next_attempt: bool,
interval: Interval,
}

impl DownloadTask {
// point's author is a top priority; fallback priority is (any) dependent point's author
// recursively: every dependency is expected to be signed by 2/3+1
pub async fn run(&mut self, verified_broadcast: oneshot::Receiver<Point>) -> Option<Point> {
self.download_random(true);
let mut interval = tokio::time::interval(MempoolConfig::DOWNLOAD_INTERVAL);
let mut verified_broadcast = std::pin::pin!(verified_broadcast);
pub async fn run(&mut self, mut verified_broadcast: oneshot::Receiver<Point>) -> Option<Point> {
// give equal time to every attempt, ignoring local runtime delays; do not `Burst` requests
self.interval
.set_missed_tick_behavior(MissedTickBehavior::Delay);

loop {
tokio::select! {
biased; // mandatory priority: signals lifecycle, updates, data lifecycle
Ok(point) = &mut verified_broadcast => break Some(point),
Some(depender) = self.dependers.recv() => self.add_depender(&depender),
update = self.updates.recv() => self.match_peer_updates(update),
Some((peer_id, downloaded)) = self.downloading.next() =>
match self.verify(&peer_id, downloaded) {
Some((peer_id, result)) = self.downloading.next() =>
match self.verify(&peer_id, result) {
Some(point) => break Some(point),
None => if self.shall_continue() {
continue
Expand All @@ -213,7 +209,7 @@ impl DownloadTask {
}
},
// most rare arm to make progress despite slow responding peers
_ = interval.tick() => self.download_random(false),
_ = self.interval.tick() => self.download_random(), // first tick fires immediately
}
}
// on exit futures are dropped and receivers are cleaned,
Expand All @@ -222,12 +218,12 @@ impl DownloadTask {

fn add_depender(&mut self, peer_id: &PeerId) {
let is_suitable = match self.undone_peers.get_mut(peer_id) {
Some(state) if !state.is_depender => {
state.is_depender = true;
!state.is_in_flight
&& state.state == PeerState::Resolved
Some(status) if !status.is_depender => {
status.is_depender = true;
!status.is_in_flight
&& status.state == PeerState::Resolved
// do not re-download immediately if already requested
&& state.failed_queries == 0
&& status.failed_queries == 0
}
_ => false, // either already marked or requested and removed, no panic
};
Expand All @@ -237,19 +233,7 @@ impl DownloadTask {
}
}

fn download_random(&mut self, force: bool) {
if self.skip_next_attempt {
// reset `skip_attempt` flag; do nothing, if not forced
self.skip_next_attempt = false;
if !force {
return;
}
}
self.attempt = self.attempt.wrapping_add(1);
let count = (MempoolConfig::DOWNLOAD_PEERS as usize)
.saturating_mul(self.attempt as usize)
.min(self.undone_peers.len());

fn download_random(&mut self) {
let mut filtered = self
.undone_peers
.iter()
Expand All @@ -270,9 +254,17 @@ impl DownloadTask {
.collect::<Vec<_>>();
filtered.sort_unstable_by(|(_, ord_l), (_, ord_r)| ord_l.cmp(ord_r));

let count = (MempoolConfig::DOWNLOAD_PEERS as usize)
.saturating_mul(
(MempoolConfig::DOWNLOAD_PEERS as usize).saturating_pow(self.attempt as u32),
)
.min(filtered.len());

for (peer_id, _) in filtered.iter().take(count) {
self.download_one(peer_id);
}

self.attempt = self.attempt.wrapping_add(1);
}

fn download_one(&mut self, peer_id: &PeerId) {
Expand Down Expand Up @@ -300,16 +292,18 @@ impl DownloadTask {
fn verify(
&mut self,
peer_id: &PeerId,
resolved: anyhow::Result<PointByIdResponse>,
result: anyhow::Result<PointByIdResponse>,
) -> Option<Point> {
let defined_response =
match resolved {
match result {
Ok(PointByIdResponse::Defined(response)) => response,
Ok(PointByIdResponse::TryLater) => {
let status = self.undone_peers.get_mut(peer_id).unwrap_or_else(|| {
panic!("Coding error: peer not in map {}", peer_id.alt())
});
status.is_in_flight = false;
// apply the same retry strategy as for network errors
status.failed_queries = status.failed_queries.saturating_add(1);
tracing::trace!(peer = display(peer_id.alt()), "try later");
return None;
}
Expand Down Expand Up @@ -401,8 +395,8 @@ impl DownloadTask {
false
} else {
if self.downloading.is_empty() {
self.download_random(true);
self.skip_next_attempt = true;
self.interval.reset(); // start new interval at current moment
self.download_random();
}
true
}
Expand Down

0 comments on commit 9d57d7a

Please sign in to comment.