Skip to content

Commit

Permalink
Refactor: Tick::shutdown() returns a JoinHandle
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Feb 18, 2024
1 parent 0311b58 commit a27e04b
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 23 deletions.
66 changes: 44 additions & 22 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use tracing::Span;

use crate::core::notify::Notify;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::AsyncRuntime;
use crate::Instant;
Expand All @@ -36,9 +37,17 @@ pub(crate) struct TickHandle<C>
where C: RaftTypeConfig
{
enabled: Arc<AtomicBool>,
cancel: Mutex<Option<oneshot::Sender<()>>>,
#[allow(dead_code)]
join_handle: JoinHandleOf<C, ()>,
shutdown: Mutex<Option<oneshot::Sender<()>>>,
join_handle: Mutex<Option<JoinHandleOf<C, ()>>>,
}

impl<C> Drop for TickHandle<C>
where C: RaftTypeConfig
{
/// Signal the tick loop to stop, without waiting for it to stop.
fn drop(&mut self) {
let _ = self.shutdown();
}
}

impl<C> Tick<C>
Expand All @@ -52,17 +61,20 @@ where C: RaftTypeConfig
tx,
};

let (cancel, cancel_rx) = oneshot::channel();
let (shutdown, shutdown_rx) = oneshot::channel();

let join_handle = AsyncRuntimeOf::<C>::spawn(this.tick_loop(cancel_rx).instrument(tracing::span!(
let shutdown = Mutex::new(Some(shutdown));

let join_handle = AsyncRuntimeOf::<C>::spawn(this.tick_loop(shutdown_rx).instrument(tracing::span!(
parent: &Span::current(),
Level::DEBUG,
"tick"
)));

TickHandle {
enabled,
cancel: Mutex::new(Some(cancel)),
join_handle,
shutdown,
join_handle: Mutex::new(Some(join_handle)),
}
}

Expand All @@ -72,9 +84,7 @@ where C: RaftTypeConfig
let mut cancel = std::pin::pin!(cancel_rx);

loop {
i += 1;

let at = <C::AsyncRuntime as AsyncRuntime>::Instant::now() + self.interval;
let at = InstantOf::<C>::now() + self.interval;
let mut sleep_fut = AsyncRuntimeOf::<C>::sleep_until(at);
let sleep_fut = std::pin::pin!(sleep_fut);
let cancel_fut = cancel.as_mut();
Expand All @@ -90,10 +100,11 @@ where C: RaftTypeConfig
}

if !self.enabled.load(Ordering::Relaxed) {
i -= 1;
continue;
}

i += 1;

let send_res = self.tx.send(Notify::Tick { i });
if let Err(_e) = send_res {
tracing::info!("Stopping tick_loop(), main loop terminated");
Expand All @@ -112,18 +123,29 @@ where C: RaftTypeConfig
self.enabled.store(enabled, Ordering::Relaxed);
}

pub(crate) async fn shutdown(&self) {
let got = {
let mut x = self.cancel.lock().unwrap();
/// Signal the tick loop to stop. And return a JoinHandle to wait for the loop to stop.
///
/// If it is called twice, the second call will return None.
pub(crate) fn shutdown(&self) -> Option<JoinHandleOf<C, ()>> {
{
let shutdown = {
let mut x = self.shutdown.lock().unwrap();
x.take()
};

if let Some(shutdown) = shutdown {
let send_res = shutdown.send(());
tracing::info!("Timer shutdown signal sent: {send_res:?}");
} else {
tracing::warn!("Double call to Raft::shutdown()");
}
}

let jh = {
let mut x = self.join_handle.lock().unwrap();
x.take()
};

if let Some(cancel) = got {
let send_res = cancel.send(());
tracing::info!("Timer cancel signal is sent, result is ok: {}", send_res.is_ok());
} else {
tracing::info!("Timer cancel signal is already sent");
}
jh
}
}

Expand Down Expand Up @@ -162,7 +184,7 @@ mod tests {
let th = Tick::<TickUTConfig>::spawn(Duration::from_millis(100), tx, true);

AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;
th.shutdown().await;
let _ = th.shutdown().unwrap().await;
AsyncRuntimeOf::<TickUTConfig>::sleep(Duration::from_millis(500)).await;

let mut received = vec![];
Expand Down
4 changes: 3 additions & 1 deletion openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,9 @@ where C: RaftTypeConfig
tracing::info!("sending shutdown signal to RaftCore, sending res: {:?}", send_res);
}
self.inner.join_core_task().await;
self.inner.tick_handle.shutdown().await;
if let Some(join_handle) = self.inner.tick_handle.shutdown() {
let _ = join_handle.await;
}

// TODO(xp): API change: replace `JoinError` with `Fatal`,
// to let the caller know the return value of RaftCore task.
Expand Down

0 comments on commit a27e04b

Please sign in to comment.