Skip to content

Commit

Permalink
Feature: Add MPSC channel to AsyncRuntime
Browse files Browse the repository at this point in the history
`AsyncRuntime` trait defines the async-runtime such as tokio to run
Openraft. This commit add MPSC abstraction to `AsyncRuntime` and
MPSC implementations to tokio based runtime and monoio based runtime.
  • Loading branch information
drmingdrmer committed Aug 18, 2024
1 parent e54a1fd commit c279fb8
Show file tree
Hide file tree
Showing 8 changed files with 366 additions and 7 deletions.
83 changes: 83 additions & 0 deletions openraft/src/testing/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ use std::task::Poll;

use crate::async_runtime::watch::WatchReceiver;
use crate::async_runtime::watch::WatchSender;
use crate::async_runtime::Mpsc;
use crate::async_runtime::MpscReceiver;
use crate::async_runtime::MpscSender;
use crate::async_runtime::MpscUnboundedWeakSender;
use crate::async_runtime::MpscWeakSender;
use crate::instant::Instant;
use crate::type_config::async_runtime::mpsc_unbounded::MpscUnbounded;
use crate::type_config::async_runtime::mpsc_unbounded::MpscUnboundedReceiver;
Expand Down Expand Up @@ -43,11 +47,19 @@ impl<Rt: AsyncRuntime> Suite<Rt> {
Self::test_sleep_until().await;
Self::test_timeout().await;
Self::test_timeout_at().await;

Self::test_mpsc_recv_empty().await;
Self::test_mpsc_recv_channel_closed().await;
Self::test_mpsc_weak_sender_wont_prevent_channel_close().await;
Self::test_mpsc_weak_sender_upgrade().await;
Self::test_mpsc_send().await;

Self::test_unbounded_mpsc_recv_empty().await;
Self::test_unbounded_mpsc_recv_channel_closed().await;
Self::test_unbounded_mpsc_weak_sender_wont_prevent_channel_close().await;
Self::test_unbounded_mpsc_weak_sender_upgrade().await;
Self::test_unbounded_mpsc_send().await;

Self::test_watch_init_value().await;
Self::test_watch_overwrite_init_value().await;
Self::test_watch_send_error_no_receiver().await;
Expand Down Expand Up @@ -131,6 +143,77 @@ impl<Rt: AsyncRuntime> Suite<Rt> {
assert!(timeout_result.is_err());
}

pub async fn test_mpsc_recv_empty() {
let (_tx, mut rx) = Rt::Mpsc::channel::<()>(5);
let recv_err = rx.try_recv().unwrap_err();
assert!(matches!(recv_err, TryRecvError::Empty));
}

pub async fn test_mpsc_recv_channel_closed() {
let (_, mut rx) = Rt::Mpsc::channel::<()>(5);
let recv_err = rx.try_recv().unwrap_err();
assert!(matches!(recv_err, TryRecvError::Disconnected));

let recv_result = rx.recv().await;
assert!(recv_result.is_none());
}

pub async fn test_mpsc_weak_sender_wont_prevent_channel_close() {
let (tx, mut rx) = Rt::Mpsc::channel::<()>(5);

let _weak_tx = tx.downgrade();
drop(tx);
let recv_err = rx.try_recv().unwrap_err();
assert!(matches!(recv_err, TryRecvError::Disconnected));

let recv_result = rx.recv().await;
assert!(recv_result.is_none());
}

pub async fn test_mpsc_weak_sender_upgrade() {
let (tx, _rx) = Rt::Mpsc::channel::<()>(5);

let weak_tx = tx.downgrade();
let opt_tx = weak_tx.upgrade();
assert!(opt_tx.is_some());

drop(tx);
drop(opt_tx);
// now there is no Sender instances alive

let opt_tx = weak_tx.upgrade();
assert!(opt_tx.is_none());
}

pub async fn test_mpsc_send() {
let (tx, mut rx) = Rt::Mpsc::channel::<usize>(5);
let tx = Arc::new(tx);

let n_senders = 10_usize;
let recv_expected = (0..n_senders).collect::<Vec<_>>();

for idx in 0..n_senders {
let tx = tx.clone();
// no need to wait for senders here, we wait by recv()ing
let _handle = Rt::spawn(async move {
tx.send(idx).await.unwrap();
});
}

let mut recv = Vec::with_capacity(n_senders);
while let Some(recv_number) = rx.recv().await {
recv.push(recv_number);

if recv.len() == n_senders {
break;
}
}

recv.sort();

assert_eq!(recv_expected, recv);
}

pub async fn test_unbounded_mpsc_recv_empty() {
let (_tx, mut rx) = Rt::MpscUnbounded::channel::<()>();
let recv_err = rx.try_recv().unwrap_err();
Expand Down
19 changes: 15 additions & 4 deletions openraft/src/type_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ pub trait RaftTypeConfig:
/// [`type-alias`]: crate::docs::feature_flags#feature-flag-type-alias
pub mod alias {
use crate::async_runtime::watch;
use crate::async_runtime::Mpsc;
use crate::async_runtime::MpscUnbounded;
use crate::async_runtime::Oneshot;
use crate::raft::responder::Responder;
Expand Down Expand Up @@ -125,13 +126,23 @@ pub mod alias {
pub type OneshotReceiverErrorOf<C> = <OneshotOf<C> as Oneshot>::ReceiverError;
pub type OneshotReceiverOf<C, T> = <OneshotOf<C> as Oneshot>::Receiver<T>;

pub type MpscOf<C> = <Rt<C> as AsyncRuntime>::Mpsc;

// MPSC bounded
type MpscB<C> = MpscOf<C>;

pub type MpscSenderOf<C, T> = <MpscB<C> as Mpsc>::Sender<T>;
pub type MpscReceiverOf<C, T> = <MpscB<C> as Mpsc>::Receiver<T>;
pub type MpscWeakSenderOf<C, T> = <MpscB<C> as Mpsc>::WeakSender<T>;

pub type MpscUnboundedOf<C> = <Rt<C> as AsyncRuntime>::MpscUnbounded;

type Mpsc<C> = MpscUnboundedOf<C>;
// MPSC unbounded
type MpscUB<C> = MpscUnboundedOf<C>;

pub type MpscUnboundedSenderOf<C, T> = <Mpsc<C> as MpscUnbounded>::Sender<T>;
pub type MpscUnboundedReceiverOf<C, T> = <Mpsc<C> as MpscUnbounded>::Receiver<T>;
pub type MpscUnboundedWeakSenderOf<C, T> = <Mpsc<C> as MpscUnbounded>::WeakSender<T>;
pub type MpscUnboundedSenderOf<C, T> = <MpscUB<C> as MpscUnbounded>::Sender<T>;
pub type MpscUnboundedReceiverOf<C, T> = <MpscUB<C> as MpscUnbounded>::Receiver<T>;
pub type MpscUnboundedWeakSenderOf<C, T> = <MpscUB<C> as MpscUnbounded>::WeakSender<T>;

pub type WatchOf<C> = <Rt<C> as AsyncRuntime>::Watch;
pub type WatchSenderOf<C, T> = <WatchOf<C> as watch::Watch>::Sender<T>;
Expand Down
7 changes: 7 additions & 0 deletions openraft/src/type_config/async_runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub(crate) mod tokio_impls {
mod tokio_runtime;
pub use tokio_runtime::TokioRuntime;
}
pub mod mpsc;
pub mod mpsc_unbounded;
pub mod mutex;
pub mod oneshot;
Expand All @@ -19,6 +20,10 @@ use std::fmt::Display;
use std::future::Future;
use std::time::Duration;

pub use mpsc::Mpsc;
pub use mpsc::MpscReceiver;
pub use mpsc::MpscSender;
pub use mpsc::MpscWeakSender;
pub use mpsc_unbounded::MpscUnbounded;
pub use mpsc_unbounded::MpscUnboundedReceiver;
pub use mpsc_unbounded::MpscUnboundedSender;
Expand Down Expand Up @@ -99,6 +104,8 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
/// sent to another thread.
fn thread_rng() -> Self::ThreadLocalRng;

type Mpsc: Mpsc;

type MpscUnbounded: MpscUnbounded;

type Watch: Watch;
Expand Down
73 changes: 73 additions & 0 deletions openraft/src/type_config/async_runtime/mpsc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::future::Future;

use base::OptionalSend;
use base::OptionalSync;

/// mpsc shares the same error types as mpsc_unbounded
pub use super::mpsc_unbounded::SendError;
pub use super::mpsc_unbounded::TryRecvError;
use crate::base;

/// Multi-producer, single-consumer channel.
pub trait Mpsc: Sized + OptionalSend {
type Sender<T: OptionalSend>: MpscSender<Self, T>;
type Receiver<T: OptionalSend>: MpscReceiver<T>;
type WeakSender<T: OptionalSend>: MpscWeakSender<Self, T>;

/// Creates a bounded mpsc channel for communicating between asynchronous tasks with
/// backpressure.
fn channel<T: OptionalSend>(buffer: usize) -> (Self::Sender<T>, Self::Receiver<T>);
}

/// Send values to the associated [`MpscReceiver`].
pub trait MpscSender<MU, T>: OptionalSend + OptionalSync + Clone
where
MU: Mpsc,
T: OptionalSend,
{
/// Attempts to send a message, blocks if there is no capacity.
///
/// If the receiving half of the channel is closed, this
/// function returns an error. The error includes the value passed to `send`.
fn send(&self, msg: T) -> impl Future<Output = Result<(), SendError<T>>> + OptionalSend;

/// Converts the [`MpscSender`] to a [`MpscWeakSender`] that does not count
/// towards RAII semantics, i.e. if all `Sender` instances of the
/// channel were dropped and only `WeakSender` instances remain,
/// the channel is closed.
fn downgrade(&self) -> MU::WeakSender<T>;
}

/// Receive values from the associated [`MpscSender`].
pub trait MpscReceiver<T>: OptionalSend + OptionalSync {
/// Receives the next value for this receiver.
///
/// This method returns `None` if the channel has been closed and there are
/// no remaining messages in the channel's buffer.
fn recv(&mut self) -> impl Future<Output = Option<T>> + OptionalSend;

/// Tries to receive the next value for this receiver.
///
/// This method returns the [`TryRecvError::Empty`] error if the channel is currently
/// empty, but there are still outstanding senders.
///
/// This method returns the [`TryRecvError::Disconnected`] error if the channel is
/// currently empty, and there are no outstanding senders.
fn try_recv(&mut self) -> Result<T, TryRecvError>;
}

/// A sender that does not prevent the channel from being closed.
///
/// If all [`MpscSender`] instances of a channel were dropped and only
/// `WeakSender` instances remain, the channel is closed.
pub trait MpscWeakSender<MU, T>: OptionalSend + OptionalSync + Clone
where
MU: Mpsc,
T: OptionalSend,
{
/// Tries to convert a [`MpscWeakSender`] into an [`MpscSender`].
///
/// This will return `Some` if there are other `Sender` instances alive and
/// the channel wasn't previously dropped, otherwise `None` is returned.
fn upgrade(&self) -> Option<MU::Sender<T>>;
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ impl AsyncRuntime for TokioRuntime {
rand::thread_rng()
}

type Mpsc = mpsc_impl::TokioMpsc;
type MpscUnbounded = TokioMpscUnbounded;
type Watch = TokioWatch;
type Oneshot = TokioOneshot;
Expand Down Expand Up @@ -134,6 +135,75 @@ where T: OptionalSend
}
}

mod mpsc_impl {
use std::future::Future;

use futures::TryFutureExt;
use tokio::sync::mpsc;

use crate::async_runtime::Mpsc;
use crate::async_runtime::MpscReceiver;
use crate::async_runtime::MpscSender;
use crate::async_runtime::MpscWeakSender;
use crate::async_runtime::SendError;
use crate::async_runtime::TryRecvError;
use crate::OptionalSend;

pub struct TokioMpsc;

impl Mpsc for TokioMpsc {
type Sender<T: OptionalSend> = mpsc::Sender<T>;
type Receiver<T: OptionalSend> = mpsc::Receiver<T>;
type WeakSender<T: OptionalSend> = mpsc::WeakSender<T>;

/// Creates a bounded mpsc channel for communicating between asynchronous
/// tasks with backpressure.
fn channel<T: OptionalSend>(buffer: usize) -> (Self::Sender<T>, Self::Receiver<T>) {
mpsc::channel(buffer)
}
}

impl<T> MpscSender<TokioMpsc, T> for mpsc::Sender<T>
where T: OptionalSend
{
#[inline]
fn send(&self, msg: T) -> impl Future<Output = Result<(), SendError<T>>> + OptionalSend {
self.send(msg).map_err(|e| SendError(e.0))
}

#[inline]
fn downgrade(&self) -> <TokioMpsc as Mpsc>::WeakSender<T> {
self.downgrade()
}
}

impl<T> MpscReceiver<T> for mpsc::Receiver<T>
where T: OptionalSend
{
#[inline]
fn recv(&mut self) -> impl Future<Output = Option<T>> + OptionalSend {
self.recv()
}

#[inline]
fn try_recv(&mut self) -> Result<T, TryRecvError> {
self.try_recv().map_err(|e| match e {
mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
})
}
}

impl<T> MpscWeakSender<TokioMpsc, T> for mpsc::WeakSender<T>
where T: OptionalSend
{
#[inline]
fn upgrade(&self) -> Option<<TokioMpsc as Mpsc>::Sender<T>> {
self.upgrade()
}
}
}

pub struct TokioWatch;

impl watch::Watch for TokioWatch {
Expand Down
14 changes: 14 additions & 0 deletions openraft/src/type_config/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@ use openraft_macros::since;

use crate::async_runtime::mutex::Mutex;
use crate::async_runtime::watch::Watch;
use crate::async_runtime::Mpsc;
use crate::async_runtime::MpscUnbounded;
use crate::async_runtime::Oneshot;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::MpscOf;
use crate::type_config::alias::MpscReceiverOf;
use crate::type_config::alias::MpscSenderOf;
use crate::type_config::alias::MpscUnboundedOf;
use crate::type_config::alias::MpscUnboundedReceiverOf;
use crate::type_config::alias::MpscUnboundedSenderOf;
Expand Down Expand Up @@ -72,6 +76,16 @@ pub trait TypeConfigExt: RaftTypeConfig {
OneshotOf::<Self>::channel()
}

/// Creates a mpsc channel for communicating between asynchronous
/// tasks with backpressure.
///
/// This is just a wrapper of
/// [`AsyncRuntime::Mpsc::channel()`](`crate::async_runtime::Mpsc::channel`).
fn mpsc<T>(buffer: usize) -> (MpscSenderOf<Self, T>, MpscReceiverOf<Self, T>)
where T: OptionalSend {
MpscOf::<Self>::channel(buffer)
}

/// Creates an unbounded mpsc channel for communicating between asynchronous
/// tasks without backpressure.
///
Expand Down
7 changes: 5 additions & 2 deletions rt-monoio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ repository = "https://github.com/datafuselabs/openraft"
openraft = { path = "../openraft", version = "0.10.0", default-features = false, features = ["singlethreaded"] }

rand = "0.8"
tokio = { version = "1.22", features = ["sync"] }
monoio = "0.2.3"

futures = { version = "0.3" }
local-sync = "0.1.1"

monoio = "0.2.3"
tokio = { version = "1.22", features = ["sync"] }
Loading

0 comments on commit c279fb8

Please sign in to comment.