Skip to content

Commit

Permalink
fix: remove an unused channel that was locking the broadcast (#433)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <[email protected]>
Co-authored-by: Bastian Gruber <[email protected]>
  • Loading branch information
Freyskeyd and gruberb committed Jan 19, 2024
1 parent 342b2c7 commit 43c6fe5
Show file tree
Hide file tree
Showing 3 changed files with 2 additions and 13 deletions.
9 changes: 2 additions & 7 deletions crates/topos-tce-broadcast/src/double_echo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,9 @@ impl DoubleEcho {
pub fn spawn_task_manager(
&mut self,
task_manager_message_receiver: mpsc::Receiver<DoubleEchoCommand>,
) -> mpsc::Receiver<(CertificateId, TaskStatus)> {
let (task_completion_sender, task_completion_receiver) = mpsc::channel(2048);

) {
let task_manager = crate::task_manager::TaskManager::new(
task_manager_message_receiver,
task_completion_sender,
self.subscriptions.clone(),
self.event_sender.clone(),
self.validator_id,
Expand All @@ -112,8 +109,6 @@ impl DoubleEcho {
);

tokio::spawn(task_manager.run(self.task_manager_cancellation.child_token()));

task_completion_receiver
}

/// DoubleEcho main loop
Expand All @@ -127,7 +122,7 @@ impl DoubleEcho {
mut self,
task_manager_message_receiver: mpsc::Receiver<DoubleEchoCommand>,
) {
let mut task_completion = self.spawn_task_manager(task_manager_message_receiver);
self.spawn_task_manager(task_manager_message_receiver);

info!("DoubleEcho started");

Expand Down
4 changes: 0 additions & 4 deletions crates/topos-tce-broadcast/src/task_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ type RunningTasks =
/// or existing tasks will receive the messages.
pub struct TaskManager {
pub message_receiver: mpsc::Receiver<DoubleEchoCommand>,
pub task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>,
pub subscriptions: SubscriptionsView,
pub event_sender: mpsc::Sender<ProtocolEvents>,
pub tasks: HashMap<CertificateId, TaskContext>,
Expand All @@ -63,7 +62,6 @@ impl TaskManager {
#[allow(clippy::too_many_arguments)]
pub fn new(
message_receiver: mpsc::Receiver<DoubleEchoCommand>,
task_completion_sender: mpsc::Sender<(CertificateId, TaskStatus)>,
subscriptions: SubscriptionsView,
event_sender: mpsc::Sender<ProtocolEvents>,
validator_id: ValidatorId,
Expand All @@ -74,7 +72,6 @@ impl TaskManager {
) -> Self {
Self {
message_receiver,
task_completion_sender,
subscriptions,
event_sender,
tasks: HashMap::new(),
Expand Down Expand Up @@ -138,7 +135,6 @@ impl TaskManager {
debug!("Task for certificate {} finished successfully", certificate_id);
self.tasks.remove(&certificate_id);
DOUBLE_ECHO_ACTIVE_TASKS_COUNT.dec();
let _ = self.task_completion_sender.send((certificate_id, status)).await;
} else {
debug!("Task for certificate {} finished unsuccessfully", certificate_id);
}
Expand Down
2 changes: 0 additions & 2 deletions crates/topos-tce-broadcast/src/tests/task_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use crate::{sampler::SubscriptionsView, task_manager::TaskManager};
async fn can_start(#[future] create_validator_store: Arc<ValidatorStore>) {
let validator_store = create_validator_store.await;
let (message_sender, message_receiver) = mpsc::channel(1);
let (task_completion_sender, _) = mpsc::channel(1);
let (event_sender, _) = mpsc::channel(1);
let (broadcast_sender, _) = broadcast::channel(1);
let shutdown = CancellationToken::new();
Expand All @@ -39,7 +38,6 @@ async fn can_start(#[future] create_validator_store: Arc<ValidatorStore>) {

let mut manager = TaskManager::new(
message_receiver,
task_completion_sender,
SubscriptionsView::default(),
event_sender,
validator_id,
Expand Down

0 comments on commit 43c6fe5

Please sign in to comment.