From 395737da7cea6faaeca6f0bb2584f001127d4b16 Mon Sep 17 00:00:00 2001 From: Kirill Mikheev Date: Sun, 16 Jun 2024 00:34:51 +0300 Subject: [PATCH] refactor(consensus): remove last `await` from `tokio::select!()` --- consensus/src/engine/engine.rs | 4 ++-- .../src/intercom/broadcast/broadcaster.rs | 20 +++++++++++-------- consensus/src/intercom/broadcast/collector.rs | 11 +++++----- 3 files changed, 19 insertions(+), 16 deletions(-) diff --git a/consensus/src/engine/engine.rs b/consensus/src/engine/engine.rs index 1efb05149..cf7b109f7 100644 --- a/consensus/src/engine/engine.rs +++ b/consensus/src/engine/engine.rs @@ -147,7 +147,7 @@ impl Engine { .dag .top(current_dag_round.round().next(), &self.peer_schedule); - let (bcaster_ready_tx, bcaster_ready_rx) = mpsc::channel(1); + let (bcaster_ready_tx, bcaster_ready_rx) = oneshot::channel(); // let this channel unbounded - there won't be many items, but every of them is essential let (collector_signal_tx, mut collector_signal_rx) = mpsc::unbounded_channel(); let (own_point_state_tx, own_point_state_rx) = oneshot::channel(); @@ -205,7 +205,7 @@ impl Engine { (broadcaster, Some(Arc::new(prev_point))) } else { collector_signal_rx.close(); - bcaster_ready_tx.send(BroadcasterSignal::Ok).await.ok(); + bcaster_ready_tx.send(BroadcasterSignal::Ok).ok(); (broadcaster, None) } } diff --git a/consensus/src/intercom/broadcast/broadcaster.rs b/consensus/src/intercom/broadcast/broadcaster.rs index ab16e2f07..b1575c3cb 100644 --- a/consensus/src/intercom/broadcast/broadcaster.rs +++ b/consensus/src/intercom/broadcast/broadcaster.rs @@ -3,7 +3,7 @@ use std::time::Duration; use anyhow::Result; use tokio::sync::broadcast::error::RecvError; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{broadcast, mpsc, oneshot}; use tycho_network::PeerId; use tycho_util::{FastHashMap, FastHashSet}; @@ -40,7 +40,7 @@ impl Broadcaster { round_effects: &Effects, point: &Point, peer_schedule: &PeerSchedule, - bcaster_signal: mpsc::Sender, + bcaster_signal: oneshot::Sender, collector_signal: mpsc::UnboundedReceiver, ) -> FastHashMap { let signers = peer_schedule @@ -65,7 +65,7 @@ impl Broadcaster { dispatcher: self.dispatcher.clone(), current_round: point.body().location.round, point_digest: point.digest().clone(), - bcaster_signal, + bcaster_signal: Some(bcaster_signal), collector_signal, peer_updates: peer_schedule.updates(), @@ -99,7 +99,7 @@ struct BroadcasterTask { current_round: Round, point_digest: Digest, /// Receiver may be closed (collector finished), so do not require `Ok` on send - bcaster_signal: mpsc::Sender, + bcaster_signal: Option>, collector_signal: mpsc::UnboundedReceiver, peer_updates: broadcast::Receiver<(PeerId, PeerState)>, @@ -164,7 +164,7 @@ impl BroadcasterTask { self.match_signature_result(&peer_id, result); }, Some(collector_signal) = self.collector_signal.recv() => { - if self.should_finish(collector_signal).await { + if self.should_finish(collector_signal) { break; } } @@ -179,17 +179,21 @@ impl BroadcasterTask { } } - async fn should_finish(&mut self, collector_signal: CollectorSignal) -> bool { + fn should_finish(&mut self, collector_signal: CollectorSignal) -> bool { let result = match collector_signal { // though we return successful result, it will be discarded on Err CollectorSignal::Finish | CollectorSignal::Err => true, CollectorSignal::Retry => { if self.rejections.len() >= self.signers_count.reliable_minority() { - _ = self.bcaster_signal.send(BroadcasterSignal::Err).await; + if let Some(sender) = mem::take(&mut self.bcaster_signal) { + _ = sender.send(BroadcasterSignal::Err); + }; true } else { if self.signatures.len() >= self.signers_count.majority_of_others() { - _ = self.bcaster_signal.send(BroadcasterSignal::Ok).await; + if let Some(sender) = mem::take(&mut self.bcaster_signal) { + _ = sender.send(BroadcasterSignal::Ok); + }; } false } diff --git a/consensus/src/intercom/broadcast/collector.rs b/consensus/src/intercom/broadcast/collector.rs index 89ae3aa3c..e1037523d 100644 --- a/consensus/src/intercom/broadcast/collector.rs +++ b/consensus/src/intercom/broadcast/collector.rs @@ -58,7 +58,7 @@ impl Collector { next_dag_round: DagRound, // r+1 own_point_state: oneshot::Receiver, collector_signal: mpsc::UnboundedSender, - bcaster_signal: mpsc::Receiver, + bcaster_signal: oneshot::Receiver, ) -> Self { let effects = Effects::::new(&round_effects); let span_guard = effects.span().clone().entered(); @@ -104,12 +104,11 @@ impl Collector { next_includes: FuturesUnordered::new(), collector_signal, - bcaster_signal, is_bcaster_ready_ok: false, }; drop(span_guard); - let result = task.run(&mut self.from_bcast_filter).await; + let result = task.run(&mut self.from_bcast_filter, bcaster_signal).await; match result { Ok(includes) => self.next_includes = includes, Err(round) => self.next_round = round, @@ -139,7 +138,6 @@ struct CollectorTask { next_includes: FuturesUnordered>, /// Receiver may be closed (bcaster finished), so do not require `Ok` on send collector_signal: mpsc::UnboundedSender, - bcaster_signal: mpsc::Receiver, is_bcaster_ready_ok: bool, } @@ -150,11 +148,13 @@ impl CollectorTask { async fn run( mut self, from_bcast_filter: &mut mpsc::UnboundedReceiver, + bcaster_signal: oneshot::Receiver, ) -> Result>, Round> { let mut retry_interval = tokio::time::interval(MempoolConfig::RETRY_INTERVAL); + let mut bcaster_signal = std::pin::pin!(bcaster_signal); loop { tokio::select! { - Some(bcaster_signal) = self.bcaster_signal.recv() => { + Ok(bcaster_signal) = bcaster_signal.as_mut(), if !self.is_bcaster_ready_ok => { if self.should_fail(bcaster_signal) { // has to jump over one round return Err(self.next_dag_round.round().next()) @@ -198,7 +198,6 @@ impl CollectorTask { let result = match signal { BroadcasterSignal::Ok => { self.is_bcaster_ready_ok = true; - self.bcaster_signal.close(); false } BroadcasterSignal::Err => true,