Skip to content

Commit 095a327

Browse files
committed
Change: add Mutex to AsyncRuntime
1 parent 5c82dfd commit 095a327

File tree

8 files changed

+62
-8
lines changed

8 files changed

+62
-8
lines changed

openraft/src/raft/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@ pub use message::InstallSnapshotResponse;
3737
pub use message::SnapshotResponse;
3838
pub use message::VoteRequest;
3939
pub use message::VoteResponse;
40-
use tokio::sync::Mutex;
4140
use tracing::trace_span;
4241
use tracing::Instrument;
4342
use tracing::Level;
4443

44+
use crate::async_runtime::mutex::Mutex;
4545
use crate::async_runtime::watch::WatchReceiver;
4646
use crate::async_runtime::MpscUnboundedSender;
4747
use crate::async_runtime::OneshotSender;
@@ -321,7 +321,7 @@ where C: RaftTypeConfig
321321
tx_shutdown: std::sync::Mutex::new(Some(tx_shutdown)),
322322
core_state: std::sync::Mutex::new(CoreState::Running(core_handle)),
323323

324-
snapshot: Mutex::new(None),
324+
snapshot: C::mutex(None),
325325
};
326326

327327
Ok(Self { inner: Arc::new(inner) })

openraft/src/raft/raft_inner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ use std::fmt::Debug;
33
use std::future::Future;
44
use std::sync::Arc;
55

6-
use tokio::sync::Mutex;
76
use tracing::Level;
87

98
use crate::async_runtime::watch::WatchReceiver;
@@ -20,6 +19,7 @@ use crate::metrics::RaftServerMetrics;
2019
use crate::raft::core_state::CoreState;
2120
use crate::type_config::alias::AsyncRuntimeOf;
2221
use crate::type_config::alias::MpscUnboundedSenderOf;
22+
use crate::type_config::alias::MutexOf;
2323
use crate::type_config::alias::OneshotReceiverOf;
2424
use crate::type_config::alias::OneshotSenderOf;
2525
use crate::type_config::alias::WatchReceiverOf;
@@ -48,7 +48,7 @@ where C: RaftTypeConfig
4848
pub(in crate::raft) core_state: std::sync::Mutex<CoreState<C>>,
4949

5050
/// The ongoing snapshot transmission.
51-
pub(in crate::raft) snapshot: Mutex<Option<crate::network::snapshot_transport::Streaming<C>>>,
51+
pub(in crate::raft) snapshot: MutexOf<C, Option<crate::network::snapshot_transport::Streaming<C>>>,
5252
}
5353

5454
impl<C> RaftInner<C>

openraft/src/replication/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ use request::Replicate;
1919
use response::ReplicationResult;
2020
pub(crate) use response::Response;
2121
use tokio::select;
22-
use tokio::sync::Mutex;
2322
use tracing_futures::Instrument;
2423

2524
use crate::async_runtime::MpscUnboundedReceiver;
@@ -56,8 +55,10 @@ use crate::type_config::alias::LogIdOf;
5655
use crate::type_config::alias::MpscUnboundedReceiverOf;
5756
use crate::type_config::alias::MpscUnboundedSenderOf;
5857
use crate::type_config::alias::MpscUnboundedWeakSenderOf;
58+
use crate::type_config::alias::MutexOf;
5959
use crate::type_config::alias::OneshotReceiverOf;
6060
use crate::type_config::alias::OneshotSenderOf;
61+
use crate::type_config::async_runtime::mutex::Mutex;
6162
use crate::type_config::TypeConfigExt;
6263
use crate::LogId;
6364
use crate::RaftLogId;
@@ -114,7 +115,7 @@ where
114115
/// Another `RaftNetwork` specific for snapshot replication.
115116
///
116117
/// Snapshot transmitting is a long running task, and is processed in a separate task.
117-
snapshot_network: Arc<Mutex<N::Network>>,
118+
snapshot_network: Arc<MutexOf<C, N::Network>>,
118119

119120
/// The current snapshot replication state.
120121
///
@@ -188,7 +189,7 @@ where
188189
target,
189190
session_id,
190191
network,
191-
snapshot_network: Arc::new(Mutex::new(snapshot_network)),
192+
snapshot_network: Arc::new(C::mutex(snapshot_network)),
192193
snapshot_state: None,
193194
backoff: None,
194195
log_reader,
@@ -754,7 +755,7 @@ where
754755

755756
async fn send_snapshot(
756757
request_id: RequestId,
757-
network: Arc<Mutex<N::Network>>,
758+
network: Arc<MutexOf<C, N::Network>>,
758759
vote: Vote<C::NodeId>,
759760
snapshot: Snapshot<C>,
760761
option: RPCOption,

openraft/src/type_config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ pub mod alias {
137137
pub type WatchSenderOf<C, T> = <WatchOf<C> as watch::Watch>::Sender<T>;
138138
pub type WatchReceiverOf<C, T> = <WatchOf<C> as watch::Watch>::Receiver<T>;
139139

140+
pub type MutexOf<C, T> = <Rt<C> as AsyncRuntime>::Mutex<T>;
141+
140142
// Usually used types
141143
pub type LogIdOf<C> = crate::LogId<NodeIdOf<C>>;
142144
pub type VoteOf<C> = crate::Vote<NodeIdOf<C>>;

openraft/src/type_config/async_runtime/impls/tokio_runtime.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use tokio::sync::watch as tokio_watch;
66

77
use crate::async_runtime::mpsc_unbounded;
88
use crate::async_runtime::mpsc_unbounded::MpscUnbounded;
9+
use crate::async_runtime::mutex;
910
use crate::async_runtime::oneshot;
1011
use crate::async_runtime::watch;
1112
use crate::type_config::OneshotSender;
@@ -76,6 +77,7 @@ impl AsyncRuntime for TokioRuntime {
7677
type MpscUnbounded = TokioMpscUnbounded;
7778
type Watch = TokioWatch;
7879
type Oneshot = TokioOneshot;
80+
type Mutex<T: OptionalSend + 'static> = TokioMutex<T>;
7981
}
8082

8183
pub struct TokioMpscUnbounded;
@@ -197,3 +199,19 @@ where T: OptionalSend
197199
self.send(t)
198200
}
199201
}
202+
203+
type TokioMutex<T> = tokio::sync::Mutex<T>;
204+
205+
impl<T> mutex::Mutex<T> for TokioMutex<T>
206+
where T: OptionalSend + 'static
207+
{
208+
type Guard<'a> = tokio::sync::MutexGuard<'a, T>;
209+
210+
fn new(value: T) -> Self {
211+
TokioMutex::new(value)
212+
}
213+
214+
fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + OptionalSend {
215+
self.lock()
216+
}
217+
}

openraft/src/type_config/async_runtime/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ pub(crate) mod impls {
99
pub use tokio_runtime::TokioRuntime;
1010
}
1111
pub mod mpsc_unbounded;
12+
pub mod mutex;
1213
pub mod oneshot;
1314
pub mod watch;
1415

@@ -23,6 +24,7 @@ pub use mpsc_unbounded::MpscUnboundedSender;
2324
pub use mpsc_unbounded::MpscUnboundedWeakSender;
2425
pub use mpsc_unbounded::SendError;
2526
pub use mpsc_unbounded::TryRecvError;
27+
pub use mutex::Mutex;
2628
pub use oneshot::Oneshot;
2729
pub use oneshot::OneshotSender;
2830
pub use watch::Watch;
@@ -101,4 +103,6 @@ pub trait AsyncRuntime: Debug + Default + PartialEq + Eq + OptionalSend + Option
101103
type Watch: Watch;
102104

103105
type Oneshot: Oneshot;
106+
107+
type Mutex<T: OptionalSend + 'static>: Mutex<T>;
104108
}
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
use std::future::Future;
2+
use std::ops::DerefMut;
3+
4+
use crate::OptionalSend;
5+
use crate::OptionalSync;
6+
7+
/// Represents an implementation of an asynchronous Mutex.
8+
pub trait Mutex<T: OptionalSend + 'static>: OptionalSend + OptionalSync {
9+
/// Handle to an acquired lock, should release it when dropped.
10+
type Guard<'a>: DerefMut<Target = T> + OptionalSend
11+
where Self: 'a;
12+
13+
/// Creates a new lock.
14+
fn new(value: T) -> Self;
15+
16+
/// Locks this Mutex.
17+
fn lock(&self) -> impl Future<Output = Self::Guard<'_>> + OptionalSend;
18+
}

openraft/src/type_config/util.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use std::time::Duration;
33

44
use openraft_macros::since;
55

6+
use crate::async_runtime::mutex::Mutex;
67
use crate::async_runtime::watch::Watch;
78
use crate::async_runtime::MpscUnbounded;
89
use crate::async_runtime::Oneshot;
@@ -12,6 +13,7 @@ use crate::type_config::alias::JoinHandleOf;
1213
use crate::type_config::alias::MpscUnboundedOf;
1314
use crate::type_config::alias::MpscUnboundedReceiverOf;
1415
use crate::type_config::alias::MpscUnboundedSenderOf;
16+
use crate::type_config::alias::MutexOf;
1517
use crate::type_config::alias::OneshotOf;
1618
use crate::type_config::alias::OneshotReceiverOf;
1719
use crate::type_config::alias::OneshotSenderOf;
@@ -90,6 +92,15 @@ pub trait TypeConfigExt: RaftTypeConfig {
9092
WatchOf::<Self>::channel(init)
9193
}
9294

95+
/// Creates a Mutex lock.
96+
///
97+
/// This is just a wrapper of
98+
/// [`AsyncRuntime::Mutex::new()`](`crate::async_runtime::Mutex::new`).
99+
fn mutex<T>(value: T) -> MutexOf<Self, T>
100+
where T: OptionalSend {
101+
MutexOf::<Self, T>::new(value)
102+
}
103+
93104
// Task methods
94105

95106
/// Spawn a new task.

0 commit comments

Comments
 (0)