Skip to content

Commit

Permalink
refactor(consensus): remove last await from tokio::select!()
Browse files Browse the repository at this point in the history
  • Loading branch information
Mododo committed Jun 15, 2024
1 parent 1f15777 commit 395737d
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 16 deletions.
4 changes: 2 additions & 2 deletions consensus/src/engine/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl Engine {
.dag
.top(current_dag_round.round().next(), &self.peer_schedule);

let (bcaster_ready_tx, bcaster_ready_rx) = mpsc::channel(1);
let (bcaster_ready_tx, bcaster_ready_rx) = oneshot::channel();
// let this channel unbounded - there won't be many items, but every of them is essential
let (collector_signal_tx, mut collector_signal_rx) = mpsc::unbounded_channel();
let (own_point_state_tx, own_point_state_rx) = oneshot::channel();
Expand Down Expand Up @@ -205,7 +205,7 @@ impl Engine {
(broadcaster, Some(Arc::new(prev_point)))
} else {
collector_signal_rx.close();
bcaster_ready_tx.send(BroadcasterSignal::Ok).await.ok();
bcaster_ready_tx.send(BroadcasterSignal::Ok).ok();
(broadcaster, None)
}
}
Expand Down
20 changes: 12 additions & 8 deletions consensus/src/intercom/broadcast/broadcaster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;

use anyhow::Result;
use tokio::sync::broadcast::error::RecvError;
use tokio::sync::{broadcast, mpsc};
use tokio::sync::{broadcast, mpsc, oneshot};
use tycho_network::PeerId;
use tycho_util::{FastHashMap, FastHashSet};

Expand Down Expand Up @@ -40,7 +40,7 @@ impl Broadcaster {
round_effects: &Effects<CurrentRoundContext>,
point: &Point,
peer_schedule: &PeerSchedule,
bcaster_signal: mpsc::Sender<BroadcasterSignal>,
bcaster_signal: oneshot::Sender<BroadcasterSignal>,
collector_signal: mpsc::UnboundedReceiver<CollectorSignal>,
) -> FastHashMap<PeerId, Signature> {
let signers = peer_schedule
Expand All @@ -65,7 +65,7 @@ impl Broadcaster {
dispatcher: self.dispatcher.clone(),
current_round: point.body().location.round,
point_digest: point.digest().clone(),
bcaster_signal,
bcaster_signal: Some(bcaster_signal),
collector_signal,

peer_updates: peer_schedule.updates(),
Expand Down Expand Up @@ -99,7 +99,7 @@ struct BroadcasterTask {
current_round: Round,
point_digest: Digest,
/// Receiver may be closed (collector finished), so do not require `Ok` on send
bcaster_signal: mpsc::Sender<BroadcasterSignal>,
bcaster_signal: Option<oneshot::Sender<BroadcasterSignal>>,
collector_signal: mpsc::UnboundedReceiver<CollectorSignal>,

peer_updates: broadcast::Receiver<(PeerId, PeerState)>,
Expand Down Expand Up @@ -164,7 +164,7 @@ impl BroadcasterTask {
self.match_signature_result(&peer_id, result);
},
Some(collector_signal) = self.collector_signal.recv() => {
if self.should_finish(collector_signal).await {
if self.should_finish(collector_signal) {
break;
}
}
Expand All @@ -179,17 +179,21 @@ impl BroadcasterTask {
}
}

async fn should_finish(&mut self, collector_signal: CollectorSignal) -> bool {
fn should_finish(&mut self, collector_signal: CollectorSignal) -> bool {
let result = match collector_signal {
// though we return successful result, it will be discarded on Err
CollectorSignal::Finish | CollectorSignal::Err => true,
CollectorSignal::Retry => {
if self.rejections.len() >= self.signers_count.reliable_minority() {
_ = self.bcaster_signal.send(BroadcasterSignal::Err).await;
if let Some(sender) = mem::take(&mut self.bcaster_signal) {
_ = sender.send(BroadcasterSignal::Err);
};
true
} else {
if self.signatures.len() >= self.signers_count.majority_of_others() {
_ = self.bcaster_signal.send(BroadcasterSignal::Ok).await;
if let Some(sender) = mem::take(&mut self.bcaster_signal) {
_ = sender.send(BroadcasterSignal::Ok);
};
}
false
}
Expand Down
11 changes: 5 additions & 6 deletions consensus/src/intercom/broadcast/collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Collector {
next_dag_round: DagRound, // r+1
own_point_state: oneshot::Receiver<InclusionState>,
collector_signal: mpsc::UnboundedSender<CollectorSignal>,
bcaster_signal: mpsc::Receiver<BroadcasterSignal>,
bcaster_signal: oneshot::Receiver<BroadcasterSignal>,
) -> Self {
let effects = Effects::<CollectorContext>::new(&round_effects);
let span_guard = effects.span().clone().entered();
Expand Down Expand Up @@ -104,12 +104,11 @@ impl Collector {
next_includes: FuturesUnordered::new(),

collector_signal,
bcaster_signal,
is_bcaster_ready_ok: false,
};

drop(span_guard);
let result = task.run(&mut self.from_bcast_filter).await;
let result = task.run(&mut self.from_bcast_filter, bcaster_signal).await;
match result {
Ok(includes) => self.next_includes = includes,
Err(round) => self.next_round = round,
Expand Down Expand Up @@ -139,7 +138,6 @@ struct CollectorTask {
next_includes: FuturesUnordered<BoxFuture<'static, InclusionState>>,
/// Receiver may be closed (bcaster finished), so do not require `Ok` on send
collector_signal: mpsc::UnboundedSender<CollectorSignal>,
bcaster_signal: mpsc::Receiver<BroadcasterSignal>,
is_bcaster_ready_ok: bool,
}

Expand All @@ -150,11 +148,13 @@ impl CollectorTask {
async fn run(
mut self,
from_bcast_filter: &mut mpsc::UnboundedReceiver<ConsensusEvent>,
bcaster_signal: oneshot::Receiver<BroadcasterSignal>,
) -> Result<FuturesUnordered<BoxFuture<'static, InclusionState>>, Round> {
let mut retry_interval = tokio::time::interval(MempoolConfig::RETRY_INTERVAL);
let mut bcaster_signal = std::pin::pin!(bcaster_signal);
loop {
tokio::select! {
Some(bcaster_signal) = self.bcaster_signal.recv() => {
Ok(bcaster_signal) = bcaster_signal.as_mut(), if !self.is_bcaster_ready_ok => {
if self.should_fail(bcaster_signal) {
// has to jump over one round
return Err(self.next_dag_round.round().next())
Expand Down Expand Up @@ -198,7 +198,6 @@ impl CollectorTask {
let result = match signal {
BroadcasterSignal::Ok => {
self.is_bcaster_ready_ok = true;
self.bcaster_signal.close();
false
}
BroadcasterSignal::Err => true,
Expand Down

0 comments on commit 395737d

Please sign in to comment.