From 43c6fe5caffd35103b20ec11d7ae2f35b1af98b9 Mon Sep 17 00:00:00 2001 From: Simon Paitrault Date: Fri, 19 Jan 2024 20:54:27 +0100 Subject: [PATCH] fix: remove an unused channel that was locking the broadcast (#433) Signed-off-by: Simon Paitrault Co-authored-by: Bastian Gruber --- crates/topos-tce-broadcast/src/double_echo/mod.rs | 9 ++------- crates/topos-tce-broadcast/src/task_manager/mod.rs | 4 ---- crates/topos-tce-broadcast/src/tests/task_manager.rs | 2 -- 3 files changed, 2 insertions(+), 13 deletions(-) diff --git a/crates/topos-tce-broadcast/src/double_echo/mod.rs b/crates/topos-tce-broadcast/src/double_echo/mod.rs index 85efed9fa..ef3daea49 100644 --- a/crates/topos-tce-broadcast/src/double_echo/mod.rs +++ b/crates/topos-tce-broadcast/src/double_echo/mod.rs @@ -96,12 +96,9 @@ impl DoubleEcho { pub fn spawn_task_manager( &mut self, task_manager_message_receiver: mpsc::Receiver, - ) -> mpsc::Receiver<(CertificateId, TaskStatus)> { - let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048); - + ) { let task_manager = crate::task_manager::TaskManager::new( task_manager_message_receiver, - task_completion_sender, self.subscriptions.clone(), self.event_sender.clone(), self.validator_id, @@ -112,8 +109,6 @@ impl DoubleEcho { ); tokio::spawn(task_manager.run(self.task_manager_cancellation.child_token())); - - task_completion_receiver } /// DoubleEcho main loop @@ -127,7 +122,7 @@ impl DoubleEcho { mut self, task_manager_message_receiver: mpsc::Receiver, ) { - let mut task_completion = self.spawn_task_manager(task_manager_message_receiver); + self.spawn_task_manager(task_manager_message_receiver); info!("DoubleEcho started"); diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 2df66f276..10265d374 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -43,7 +43,6 @@ type RunningTasks = /// or existing tasks will receive the messages. pub struct TaskManager { pub message_receiver: mpsc::Receiver, - pub task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, pub subscriptions: SubscriptionsView, pub event_sender: mpsc::Sender, pub tasks: HashMap, @@ -63,7 +62,6 @@ impl TaskManager { #[allow(clippy::too_many_arguments)] pub fn new( message_receiver: mpsc::Receiver, - task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>, subscriptions: SubscriptionsView, event_sender: mpsc::Sender, validator_id: ValidatorId, @@ -74,7 +72,6 @@ impl TaskManager { ) -> Self { Self { message_receiver, - task_completion_sender, subscriptions, event_sender, tasks: HashMap::new(), @@ -138,7 +135,6 @@ impl TaskManager { debug!("Task for certificate {} finished successfully", certificate_id); self.tasks.remove(&certificate_id); DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec(); - let _ = self.task_completion_sender.send((certificate_id, status)).await; } else { debug!("Task for certificate {} finished unsuccessfully", certificate_id); } diff --git a/crates/topos-tce-broadcast/src/tests/task_manager.rs b/crates/topos-tce-broadcast/src/tests/task_manager.rs index dd50bac3d..ed7a5a112 100644 --- a/crates/topos-tce-broadcast/src/tests/task_manager.rs +++ b/crates/topos-tce-broadcast/src/tests/task_manager.rs @@ -21,7 +21,6 @@ use crate::{sampler::SubscriptionsView, task_manager::TaskManager}; async fn can_start(#[future] create_validator_store: Arc) { let validator_store = create_validator_store.await; let (message_sender, message_receiver) = mpsc::channel(1); - let (task_completion_sender, _) = mpsc::channel(1); let (event_sender, _) = mpsc::channel(1); let (broadcast_sender, _) = broadcast::channel(1); let shutdown = CancellationToken::new(); @@ -39,7 +38,6 @@ async fn can_start(#[future] create_validator_store: Arc) { let mut manager = TaskManager::new( message_receiver, - task_completion_sender, SubscriptionsView::default(), event_sender, validator_id,