diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index 3247a2058..28ee69cb4 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -156,9 +156,9 @@ mod tests { use tokio::time::Duration; use crate::core::Tick; + use crate::impls::TokioRuntime; use crate::type_config::TypeConfigExt; use crate::RaftTypeConfig; - use crate::TokioRuntime; #[derive(Debug, Clone, Copy, Default, Eq, PartialEq, Ord, PartialOrd)] #[cfg_attr(feature = "serde", derive(serde::Deserialize, serde::Serialize))] diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index b08e858d1..d14128894 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -1,8 +1,8 @@ use std::io::Cursor; +use crate::impls::TokioRuntime; use crate::Node; use crate::RaftTypeConfig; -use crate::TokioRuntime; /// Trivial Raft type config for Engine related unit tests, /// with an optional custom node type `N` for Node type. diff --git a/openraft/src/impls/mod.rs b/openraft/src/impls/mod.rs index e42daced5..f9b6522ad 100644 --- a/openraft/src/impls/mod.rs +++ b/openraft/src/impls/mod.rs @@ -1,7 +1,7 @@ //! Collection of implementations of usually used traits defined by Openraft -pub use crate::async_runtime::TokioRuntime; pub use crate::entry::Entry; pub use crate::node::BasicNode; pub use crate::node::EmptyNode; pub use crate::raft::responder::impls::OneshotResponder; +pub use crate::type_config::async_runtime::impls::TokioRuntime; diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 8916c0963..98ed81a2a 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -48,7 +48,6 @@ pub(crate) mod raft_state; pub(crate) mod timer; pub(crate) mod utime; -pub mod async_runtime; #[cfg(feature = "compat")] pub mod compat; pub mod docs; @@ -70,9 +69,10 @@ mod feature_serde_test; pub use anyerror; pub use anyerror::AnyError; pub use openraft_macros::add_async_trait; +pub use type_config::async_runtime; +pub use type_config::async_runtime::impls::TokioRuntime; +pub use type_config::AsyncRuntime; -pub use crate::async_runtime::AsyncRuntime; -pub use crate::async_runtime::TokioRuntime; pub use crate::change_members::ChangeMembers; pub use crate::config::Config; pub use crate::config::ConfigError; diff --git a/openraft/src/raft/declare_raft_types_test.rs b/openraft/src/raft/declare_raft_types_test.rs index 593174f35..2f575a10f 100644 --- a/openraft/src/raft/declare_raft_types_test.rs +++ b/openraft/src/raft/declare_raft_types_test.rs @@ -3,7 +3,7 @@ use std::io::Cursor; use crate::declare_raft_types; -use crate::TokioRuntime; +use crate::impls::TokioRuntime; declare_raft_types!( All: diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index ab104d41f..ad51e76d8 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -19,7 +19,7 @@ use crate::metrics::RaftServerMetrics; use crate::raft::core_state::CoreState; use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; -use crate::AsyncRuntime; +use crate::type_config::AsyncRuntime; use crate::Config; use crate::OptionalSend; use crate::RaftMetrics; diff --git a/openraft/src/timer/timeout.rs b/openraft/src/timer/timeout.rs index cd2e2e9ad..7d48ebb87 100644 --- a/openraft/src/timer/timeout.rs +++ b/openraft/src/timer/timeout.rs @@ -11,7 +11,7 @@ use tokio::sync::oneshot::Sender; use tracing::trace_span; use tracing::Instrument; -use crate::AsyncRuntime; +use crate::type_config::AsyncRuntime; use crate::Instant; use crate::OptionalSend; diff --git a/openraft/src/timer/timeout_test.rs b/openraft/src/timer/timeout_test.rs index b9ccc48d4..735852af5 100644 --- a/openraft/src/timer/timeout_test.rs +++ b/openraft/src/timer/timeout_test.rs @@ -3,10 +3,10 @@ use std::time::Duration; use tokio::time::sleep; use tokio::time::Instant; +use crate::impls::TokioRuntime; use crate::timer::timeout::RaftTimer; use crate::timer::Timeout; -use crate::AsyncRuntime; -use crate::TokioRuntime; +use crate::type_config::AsyncRuntime; #[cfg(not(feature = "singlethreaded"))] #[async_entry::test(worker_threads = 3)] diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index 0c689e67d..db7c0ae67 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -3,10 +3,13 @@ //! [`NodeId`]: `RaftTypeConfig::NodeId` //! [`Entry`]: `RaftTypeConfig::Entry` +pub mod async_runtime; pub(crate) mod util; use std::fmt::Debug; +pub use async_runtime::AsyncRuntime; +pub use async_runtime::OneshotSender; pub use util::TypeConfigExt; use crate::entry::FromAppData; @@ -14,7 +17,6 @@ use crate::entry::RaftEntry; use crate::raft::responder::Responder; use crate::AppData; use crate::AppDataResponse; -use crate::AsyncRuntime; use crate::Node; use crate::NodeId; use crate::OptionalSend; @@ -92,7 +94,7 @@ pub trait RaftTypeConfig: /// [`type-alias`]: crate::docs::feature_flags#feature-flag-type-alias pub mod alias { use crate::raft::responder::Responder; - use crate::AsyncRuntime; + use crate::type_config::AsyncRuntime; use crate::RaftTypeConfig; pub type DOf = ::D; diff --git a/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs b/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs new file mode 100644 index 000000000..7008fdc2e --- /dev/null +++ b/openraft/src/type_config/async_runtime/impls/tokio_runtime.rs @@ -0,0 +1,84 @@ +use std::future::Future; +use std::time::Duration; + +use crate::type_config::OneshotSender; +use crate::AsyncRuntime; +use crate::OptionalSend; +use crate::TokioInstant; + +/// `Tokio` is the default asynchronous executor. +#[derive(Debug, Default, PartialEq, Eq)] +pub struct TokioRuntime; + +impl AsyncRuntime for TokioRuntime { + type JoinError = tokio::task::JoinError; + type JoinHandle = tokio::task::JoinHandle; + type Sleep = tokio::time::Sleep; + type Instant = TokioInstant; + type TimeoutError = tokio::time::error::Elapsed; + type Timeout + OptionalSend> = tokio::time::Timeout; + type ThreadLocalRng = rand::rngs::ThreadRng; + type OneshotSender = tokio::sync::oneshot::Sender; + type OneshotReceiver = tokio::sync::oneshot::Receiver; + type OneshotReceiverError = tokio::sync::oneshot::error::RecvError; + + #[inline] + fn spawn(future: T) -> Self::JoinHandle + where + T: Future + OptionalSend + 'static, + T::Output: OptionalSend + 'static, + { + #[cfg(feature = "singlethreaded")] + { + tokio::task::spawn_local(future) + } + #[cfg(not(feature = "singlethreaded"))] + { + tokio::task::spawn(future) + } + } + + #[inline] + fn sleep(duration: Duration) -> Self::Sleep { + tokio::time::sleep(duration) + } + + #[inline] + fn sleep_until(deadline: Self::Instant) -> Self::Sleep { + tokio::time::sleep_until(deadline) + } + + #[inline] + fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { + tokio::time::timeout(duration, future) + } + + #[inline] + fn timeout_at + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout { + tokio::time::timeout_at(deadline, future) + } + + #[inline] + fn is_panic(join_error: &Self::JoinError) -> bool { + join_error.is_panic() + } + + #[inline] + fn thread_rng() -> Self::ThreadLocalRng { + rand::thread_rng() + } + + #[inline] + fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) + where T: OptionalSend { + let (tx, rx) = tokio::sync::oneshot::channel(); + (tx, rx) + } +} + +impl OneshotSender for tokio::sync::oneshot::Sender { + #[inline] + fn send(self, t: T) -> Result<(), T> { + self.send(t) + } +} diff --git a/openraft/src/async_runtime.rs b/openraft/src/type_config/async_runtime/mod.rs similarity index 57% rename from openraft/src/async_runtime.rs rename to openraft/src/type_config/async_runtime/mod.rs index a71a485b0..47a789c94 100644 --- a/openraft/src/async_runtime.rs +++ b/openraft/src/type_config/async_runtime/mod.rs @@ -2,15 +2,24 @@ //! //! `async` runtime is an abstraction over different asynchronous runtimes, such as `tokio`, //! `async-std`, etc. + +pub(crate) mod impls { + mod tokio_runtime; + + pub use tokio_runtime::TokioRuntime; +} +mod oneshot; + use std::fmt::Debug; use std::fmt::Display; use std::future::Future; use std::time::Duration; +pub use oneshot::OneshotSender; + use crate::Instant; use crate::OptionalSend; use crate::OptionalSync; -use crate::TokioInstant; /// A trait defining interfaces with an asynchronous runtime. /// @@ -99,92 +108,3 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) where T: OptionalSend; } - -/// `Tokio` is the default asynchronous executor. -#[derive(Debug, Default, PartialEq, Eq)] -pub struct TokioRuntime; - -impl AsyncRuntime for TokioRuntime { - type JoinError = tokio::task::JoinError; - type JoinHandle = tokio::task::JoinHandle; - type Sleep = tokio::time::Sleep; - type Instant = TokioInstant; - type TimeoutError = tokio::time::error::Elapsed; - type Timeout + OptionalSend> = tokio::time::Timeout; - type ThreadLocalRng = rand::rngs::ThreadRng; - type OneshotSender = tokio::sync::oneshot::Sender; - type OneshotReceiver = tokio::sync::oneshot::Receiver; - type OneshotReceiverError = tokio::sync::oneshot::error::RecvError; - - #[inline] - fn spawn(future: T) -> Self::JoinHandle - where - T: Future + OptionalSend + 'static, - T::Output: OptionalSend + 'static, - { - #[cfg(feature = "singlethreaded")] - { - tokio::task::spawn_local(future) - } - #[cfg(not(feature = "singlethreaded"))] - { - tokio::task::spawn(future) - } - } - - #[inline] - fn sleep(duration: Duration) -> Self::Sleep { - tokio::time::sleep(duration) - } - - #[inline] - fn sleep_until(deadline: Self::Instant) -> Self::Sleep { - tokio::time::sleep_until(deadline) - } - - #[inline] - fn timeout + OptionalSend>(duration: Duration, future: F) -> Self::Timeout { - tokio::time::timeout(duration, future) - } - - #[inline] - fn timeout_at + OptionalSend>(deadline: Self::Instant, future: F) -> Self::Timeout { - tokio::time::timeout_at(deadline, future) - } - - #[inline] - fn is_panic(join_error: &Self::JoinError) -> bool { - join_error.is_panic() - } - - #[inline] - fn thread_rng() -> Self::ThreadLocalRng { - rand::thread_rng() - } - - #[inline] - fn oneshot() -> (Self::OneshotSender, Self::OneshotReceiver) - where T: OptionalSend { - let (tx, rx) = tokio::sync::oneshot::channel(); - (tx, rx) - } -} - -pub trait OneshotSender { - /// Attempts to send a value on this channel, returning it back if it could - /// not be sent. - /// - /// This method consumes `self` as only one value may ever be sent on a `oneshot` - /// channel. It is not marked async because sending a message to an `oneshot` - /// channel never requires any form of waiting. Because of this, the `send` - /// method can be used in both synchronous and asynchronous code without - /// problems. - fn send(self, t: T) -> Result<(), T>; -} - -impl OneshotSender for tokio::sync::oneshot::Sender { - #[inline] - fn send(self, t: T) -> Result<(), T> { - self.send(t) - } -} diff --git a/openraft/src/type_config/async_runtime/oneshot.rs b/openraft/src/type_config/async_runtime/oneshot.rs new file mode 100644 index 000000000..8dda9a068 --- /dev/null +++ b/openraft/src/type_config/async_runtime/oneshot.rs @@ -0,0 +1,11 @@ +pub trait OneshotSender { + /// Attempts to send a value on this channel, returning it back if it could + /// not be sent. + /// + /// This method consumes `self` as only one value may ever be sent on a `oneshot` + /// channel. It is not marked async because sending a message to an `oneshot` + /// channel never requires any form of waiting. Because of this, the `send` + /// method can be used in both synchronous and asynchronous code without + /// problems. + fn send(self, t: T) -> Result<(), T>; +} diff --git a/openraft/src/type_config/util.rs b/openraft/src/type_config/util.rs index 12c8b7f68..c26e14038 100644 --- a/openraft/src/type_config/util.rs +++ b/openraft/src/type_config/util.rs @@ -10,7 +10,7 @@ use crate::type_config::alias::OneshotReceiverOf; use crate::type_config::alias::OneshotSenderOf; use crate::type_config::alias::SleepOf; use crate::type_config::alias::TimeoutOf; -use crate::AsyncRuntime; +use crate::type_config::AsyncRuntime; use crate::Instant; use crate::OptionalSend; use crate::RaftTypeConfig; diff --git a/tests/tests/append_entries/t60_enable_heartbeat.rs b/tests/tests/append_entries/t60_enable_heartbeat.rs index 3eba34e9d..60bd4a34f 100644 --- a/tests/tests/append_entries/t60_enable_heartbeat.rs +++ b/tests/tests/append_entries/t60_enable_heartbeat.rs @@ -3,10 +3,10 @@ use std::time::Duration; use anyhow::Result; use maplit::btreeset; +use openraft::impls::TokioRuntime; +use openraft::type_config::AsyncRuntime; use openraft::type_config::TypeConfigExt; -use openraft::AsyncRuntime; use openraft::Config; -use openraft::TokioRuntime; use openraft_memstore::TypeConfig; use crate::fixtures::init_default_ut_tracing;