diff --git a/binaries/daemon/src/listener/mod.rs b/binaries/daemon/src/listener/mod.rs index 2b8102d28..4e8a22124 100644 --- a/binaries/daemon/src/listener/mod.rs +++ b/binaries/daemon/src/listener/mod.rs @@ -7,15 +7,13 @@ use dora_core::{ }, }; use eyre::{eyre, Context}; -use futures::{ - future::{self, Fuse}, - FutureExt, -}; +use futures::{future, task, Future}; use shared_memory_server::{ShmemConf, ShmemServer}; use std::{ collections::{BTreeMap, VecDeque}, mem, net::Ipv4Addr, + task::Poll, }; use tokio::{ net::TcpListener, @@ -183,11 +181,7 @@ impl Listener { loop { let mut next_message = connection.receive_message(); let message = loop { - let next_event = if let Some(events) = &mut self.subscribed_events { - Box::pin(events.recv()).fuse() - } else { - Fuse::terminated() - }; + let next_event = self.next_event(); let event = match future::select(next_event, next_message).await { future::Either::Left((event, n)) => { next_message = n; @@ -195,10 +189,9 @@ impl Listener { } future::Either::Right((message, _)) => break message, }; - if let Some(event) = event { - self.queue.push_back(Box::new(Some(event))); - self.handle_events().await?; - } + + self.queue.push_back(Box::new(Some(event))); + self.handle_events().await?; }; match message.wrap_err("failed to receive DaemonRequest") { @@ -418,6 +411,25 @@ impl Listener { .await .wrap_err_with(|| format!("failed to send reply to node `{}`", self.node_id)) } + + /// Awaits the next subscribed event if any. Never resolves if the event channel is closed. + /// + /// This is similar to `self.subscribed_events.recv()`. The difference is that the future + /// does not return `None` when the channel is closed and instead stays pending forever. + /// This behavior can be useful when waiting for multiple event sources at once. + fn next_event(&mut self) -> impl Future + Unpin + '_ { + let poll = |cx: &mut task::Context<'_>| { + if let Some(events) = &mut self.subscribed_events { + match events.poll_recv(cx) { + Poll::Ready(Some(event)) => Poll::Ready(event), + Poll::Ready(None) | Poll::Pending => Poll::Pending, + } + } else { + Poll::Pending + } + }; + future::poll_fn(poll) + } } #[async_trait::async_trait]