Skip to content

Commit

Permalink
Merge pull request #244 from dora-rs/fix-daemon-listener-loop
Browse files Browse the repository at this point in the history
Fix looping in daemon listener loop
  • Loading branch information
haixuanTao authored Apr 3, 2023
2 parents 4f4dfab + 9b9bac2 commit 5686dee
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions binaries/daemon/src/listener/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,13 @@ use dora_core::{
},
};
use eyre::{eyre, Context};
use futures::{
future::{self, Fuse},
FutureExt,
};
use futures::{future, task, Future};
use shared_memory_server::{ShmemConf, ShmemServer};
use std::{
collections::{BTreeMap, VecDeque},
mem,
net::Ipv4Addr,
task::Poll,
};
use tokio::{
net::TcpListener,
Expand Down Expand Up @@ -183,22 +181,17 @@ impl Listener {
loop {
let mut next_message = connection.receive_message();
let message = loop {
let next_event = if let Some(events) = &mut self.subscribed_events {
Box::pin(events.recv()).fuse()
} else {
Fuse::terminated()
};
let next_event = self.next_event();
let event = match future::select(next_event, next_message).await {
future::Either::Left((event, n)) => {
next_message = n;
event
}
future::Either::Right((message, _)) => break message,
};
if let Some(event) = event {
self.queue.push_back(Box::new(Some(event)));
self.handle_events().await?;
}

self.queue.push_back(Box::new(Some(event)));
self.handle_events().await?;
};

match message.wrap_err("failed to receive DaemonRequest") {
Expand Down Expand Up @@ -418,6 +411,25 @@ impl Listener {
.await
.wrap_err_with(|| format!("failed to send reply to node `{}`", self.node_id))
}

/// Awaits the next subscribed event if any. Never resolves if the event channel is closed.
///
/// This is similar to `self.subscribed_events.recv()`. The difference is that the future
/// does not return `None` when the channel is closed and instead stays pending forever.
/// This behavior can be useful when waiting for multiple event sources at once.
fn next_event(&mut self) -> impl Future<Output = NodeEvent> + Unpin + '_ {
let poll = |cx: &mut task::Context<'_>| {
if let Some(events) = &mut self.subscribed_events {
match events.poll_recv(cx) {
Poll::Ready(Some(event)) => Poll::Ready(event),
Poll::Ready(None) | Poll::Pending => Poll::Pending,
}
} else {
Poll::Pending
}
};
future::poll_fn(poll)
}
}

#[async_trait::async_trait]
Expand Down

0 comments on commit 5686dee

Please sign in to comment.