diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index ce9cf159f..ea026e45f 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -29,7 +29,6 @@ use crate::core::notify::Notify; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::AppendEntriesTx; use crate::core::raft_msg::ClientReadTx; -use crate::core::raft_msg::ClientWriteTx; use crate::core::raft_msg::RaftMsg; use crate::core::raft_msg::ResultSender; use crate::core::raft_msg::VoteTx; @@ -67,6 +66,7 @@ use crate::progress::entry::ProgressEntry; use crate::progress::Inflight; use crate::progress::Progress; use crate::quorum::QuorumSet; +use crate::raft::responder::Responder; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::raft::ClientWriteResponse; @@ -87,6 +87,7 @@ use crate::storage::RaftStateMachine; use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::InstantOf; use crate::type_config::alias::OneshotReceiverOf; +use crate::type_config::alias::ResponderOf; use crate::AsyncRuntime; use crate::ChangeMembers; use crate::Instant; @@ -181,7 +182,7 @@ where pub(crate) engine: Engine, /// Channels to send result back to client when logs are applied. - pub(crate) client_resp_channels: BTreeMap>, + pub(crate) client_resp_channels: BTreeMap>, pub(crate) leader_data: Option>, @@ -441,13 +442,13 @@ where &mut self, changes: ChangeMembers, retain: bool, - tx: ResultSender, ClientWriteError>, + tx: ResponderOf, ) { let res = self.engine.state.membership_state.change_handler().apply(changes, retain); let new_membership = match res { Ok(x) => x, Err(e) => { - let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(e))); + tx.send(Err(ClientWriteError::ChangeMembershipError(e))); return; } }; @@ -464,7 +465,7 @@ where /// The result of applying it to state machine is sent to `resp_tx`, if it is not `None`. /// The calling side may not receive a result from `resp_tx`, if raft is shut down. #[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))] - pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option>) -> bool { + pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option>) -> bool { tracing::debug!(payload = display(&entry), "write_entry"); let (mut lh, tx) = if let Some((lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) { @@ -494,7 +495,7 @@ where pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool { tracing::debug!(now = debug(InstantOf::::now()), "send_heartbeat"); - let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject::<(), ClientWriteError>(None) { + let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) { lh } else { tracing::debug!( @@ -773,7 +774,7 @@ where /// Send result of applying a log entry to its client. #[tracing::instrument(level = "debug", skip_all)] - pub(super) fn send_response(entry: ApplyingEntry, resp: C::R, tx: Option>) { + pub(super) fn send_response(entry: ApplyingEntry, resp: C::R, tx: Option>) { tracing::debug!(entry = debug(&entry), "send_response"); let tx = match tx { @@ -789,11 +790,7 @@ where membership, }); - let send_res = tx.send(res); - tracing::debug!( - "send client response through tx, send_res is error: {}", - send_res.is_err() - ); + tx.send(res); } /// Spawn a new replication stream returning its replication state handle. @@ -1629,16 +1626,12 @@ where #[allow(clippy::let_underscore_future)] let _ = AsyncRuntimeOf::::spawn(async move { for (log_index, tx) in removed.into_iter() { - let res = tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader { + tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader { leader_id, leader_node: leader_node.clone(), }))); - tracing::debug!( - "sent ForwardToLeader for log_index: {}, is_ok: {}", - log_index, - res.is_ok() - ); + tracing::debug!("sent ForwardToLeader for log_index: {}", log_index,); } }); } diff --git a/openraft/src/core/raft_msg/mod.rs b/openraft/src/core/raft_msg/mod.rs index a358a6d36..9334fc0b7 100644 --- a/openraft/src/core/raft_msg/mod.rs +++ b/openraft/src/core/raft_msg/mod.rs @@ -3,18 +3,17 @@ use std::fmt; use crate::core::raft_msg::external_command::ExternalCommand; use crate::error::CheckIsLeaderError; -use crate::error::ClientWriteError; use crate::error::Infallible; use crate::error::InitializeError; use crate::raft::AppendEntriesRequest; use crate::raft::AppendEntriesResponse; use crate::raft::BoxCoreFn; -use crate::raft::ClientWriteResponse; use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; use crate::raft::VoteResponse; use crate::type_config::alias::LogIdOf; use crate::type_config::alias::OneshotSenderOf; +use crate::type_config::alias::ResponderOf; use crate::type_config::alias::SnapshotDataOf; use crate::ChangeMembers; use crate::RaftTypeConfig; @@ -32,9 +31,6 @@ pub(crate) type VoteTx = ResultSender>; /// TX for Append Entries Response pub(crate) type AppendEntriesTx = ResultSender>; -/// TX for Client Write Response -pub(crate) type ClientWriteTx = ResultSender, ClientWriteError>; - /// TX for Linearizable Read Response pub(crate) type ClientReadTx = ResultSender>, Option>), CheckIsLeaderError>; @@ -72,7 +68,7 @@ where C: RaftTypeConfig ClientWriteRequest { app_data: C::D, - tx: ClientWriteTx, + tx: ResponderOf, }, CheckIsLeaderRequest { @@ -91,7 +87,7 @@ where C: RaftTypeConfig /// config will be converted into learners, otherwise they will be removed. retain: bool, - tx: ResultSender, ClientWriteError>, + tx: ResponderOf, }, ExternalCoreRequest { diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index 6c16c5cc4..b2091e217 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -175,6 +175,7 @@ mod tests { type Entry = crate::Entry; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; + type Responder = crate::impls::OneshotResponder; } // AsyncRuntime::spawn is `spawn_local` with singlethreaded enabled. diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 57313f2ad..228b3d7cc 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -2,7 +2,6 @@ use std::time::Duration; use validit::Valid; -use crate::async_runtime::AsyncOneshotSendExt; use crate::core::raft_msg::AppendEntriesTx; use crate::core::raft_msg::ResultSender; use crate::core::sm; @@ -31,6 +30,7 @@ use crate::error::NotInMembers; use crate::error::RejectAppendEntries; use crate::internal_server_state::InternalServerState; use crate::membership::EffectiveMembership; +use crate::raft::responder::Responder; use crate::raft::AppendEntriesResponse; use crate::raft::SnapshotResponse; use crate::raft::VoteRequest; @@ -38,12 +38,12 @@ use crate::raft::VoteResponse; use crate::raft_state::LogStateReader; use crate::raft_state::RaftState; use crate::type_config::alias::InstantOf; +use crate::type_config::alias::ResponderOf; use crate::type_config::alias::SnapshotDataOf; use crate::Instant; use crate::LogId; use crate::LogIdOptionExt; use crate::Membership; -use crate::OptionalSend; use crate::RaftLogId; use crate::RaftTypeConfig; use crate::Snapshot; @@ -222,15 +222,10 @@ where C: RaftTypeConfig /// /// If tx is None, no response will be sent. #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn get_leader_handler_or_reject( + pub(crate) fn get_leader_handler_or_reject( &mut self, - tx: Option>, - ) -> Option<(LeaderHandler, Option>)> - where - T: OptionalSend, - E: OptionalSend, - E: From>, - { + tx: Option>, + ) -> Option<(LeaderHandler, Option>)> { let res = self.leader_handler(); let forward_err = match res { Ok(lh) => { @@ -241,7 +236,7 @@ where C: RaftTypeConfig }; if let Some(tx) = tx { - let _ = tx.send(Err(forward_err.into())); + tx.send(Err(forward_err.into())); } None diff --git a/openraft/src/engine/testing.rs b/openraft/src/engine/testing.rs index a549545a4..b08e858d1 100644 --- a/openraft/src/engine/testing.rs +++ b/openraft/src/engine/testing.rs @@ -30,4 +30,5 @@ where N: Node + Ord type Entry = crate::Entry; type SnapshotData = Cursor>; type AsyncRuntime = TokioRuntime; + type Responder = crate::impls::OneshotResponder; } diff --git a/openraft/src/impls/mod.rs b/openraft/src/impls/mod.rs new file mode 100644 index 000000000..e42daced5 --- /dev/null +++ b/openraft/src/impls/mod.rs @@ -0,0 +1,7 @@ +//! Collection of implementations of usually used traits defined by Openraft + +pub use crate::async_runtime::TokioRuntime; +pub use crate::entry::Entry; +pub use crate::node::BasicNode; +pub use crate::node::EmptyNode; +pub use crate::raft::responder::impls::OneshotResponder; diff --git a/openraft/src/lib.rs b/openraft/src/lib.rs index 14df6e1ac..bae568d7c 100644 --- a/openraft/src/lib.rs +++ b/openraft/src/lib.rs @@ -41,6 +41,7 @@ mod vote; pub mod async_runtime; pub mod entry; pub mod error; +pub mod impls; pub mod instant; pub mod log_id; pub mod metrics; diff --git a/openraft/src/raft/declare_raft_types_test.rs b/openraft/src/raft/declare_raft_types_test.rs index b33c0cbdd..4b978fcfb 100644 --- a/openraft/src/raft/declare_raft_types_test.rs +++ b/openraft/src/raft/declare_raft_types_test.rs @@ -14,6 +14,7 @@ declare_raft_types!( Entry = crate::Entry, SnapshotData = Cursor>, AsyncRuntime = TokioRuntime, + Responder = crate::impls::OneshotResponder, ); declare_raft_types!( diff --git a/openraft/src/raft/impl_raft_blocking_write.rs b/openraft/src/raft/impl_raft_blocking_write.rs new file mode 100644 index 000000000..dd32501b8 --- /dev/null +++ b/openraft/src/raft/impl_raft_blocking_write.rs @@ -0,0 +1,178 @@ +//! Implement blocking mode write operations for Raft. +//! Blocking mode write API blocks until the write operation is completed, +//! where [`RaftTypeConfig::Responder`] is a [`OneshotResponder`]. + +use maplit::btreemap; + +use crate::core::raft_msg::RaftMsg; +use crate::error::ClientWriteError; +use crate::error::RaftError; +use crate::raft::message::WriteResult; +use crate::raft::responder::OneshotResponder; +use crate::raft::ClientWriteResponse; +use crate::type_config::alias::OneshotReceiverOf; +use crate::AsyncRuntime; +use crate::ChangeMembers; +use crate::Raft; +use crate::RaftTypeConfig; + +/// Implement blocking mode write operations those reply on oneshot channel for communication +/// between Raft core and client. +impl Raft +where C: RaftTypeConfig> +{ + /// Propose a cluster configuration change. + /// + /// A node in the proposed config has to be a learner, otherwise it fails with LearnerNotFound + /// error. + /// + /// Internally: + /// - It proposes a **joint** config. + /// - When the **joint** config is committed, it proposes a uniform config. + /// + /// If `retain` is true, then all the members which not exists in the new membership, + /// will be turned into learners, otherwise will be removed. + /// + /// Example of `retain` usage: + /// If the original membership is {"voter":{1,2,3}, "learners":{}}, and call + /// `change_membership` with `voters` {3,4,5}, then: + /// - If `retain` is `true`, the committed new membership is {"voters":{3,4,5}, + /// "learners":{1,2}}. + /// - Otherwise if `retain` is `false`, then the new membership is {"voters":{3,4,5}, + /// "learners":{}}, in which the voters not exists in the new membership just be removed + /// from the cluster. + /// + /// If it loses leadership or crashed before committing the second **uniform** config log, the + /// cluster is left in the **joint** config. + #[tracing::instrument(level = "info", skip_all)] + pub async fn change_membership( + &self, + members: impl Into>, + retain: bool, + ) -> Result, RaftError>> { + let changes: ChangeMembers = members.into(); + + tracing::info!( + changes = debug(&changes), + retain = display(retain), + "change_membership: start to commit joint config" + ); + + let (tx, rx) = oneshot_channel::(); + + // res is error if membership can not be changed. + // If no error, it will enter a joint state + let res = self + .inner + .call_core( + RaftMsg::ChangeMembership { + changes: changes.clone(), + retain, + tx, + }, + rx, + ) + .await; + + if let Err(e) = &res { + tracing::error!("the first step error: {}", e); + } + let res = res?; + + tracing::debug!("res of first step: {}", res); + + let (log_id, joint) = (res.log_id, res.membership.clone().unwrap()); + + if joint.get_joint_config().len() == 1 { + return Ok(res); + } + + tracing::debug!("committed a joint config: {} {:?}", log_id, joint); + tracing::debug!("the second step is to change to uniform config: {:?}", changes); + + let (tx, rx) = oneshot_channel::(); + + let res = self.inner.call_core(RaftMsg::ChangeMembership { changes, retain, tx }, rx).await; + + if let Err(e) = &res { + tracing::error!("the second step error: {}", e); + } + let res = res?; + + tracing::info!("res of second step of do_change_membership: {}", res); + + Ok(res) + } + + /// Add a new learner raft node, optionally, blocking until up-to-speed. + /// + /// - Add a node as learner into the cluster. + /// - Setup replication from leader to it. + /// + /// If `blocking` is `true`, this function blocks until the leader believes the logs on the new + /// node is up to date, i.e., ready to join the cluster, as a voter, by calling + /// `change_membership`. + /// + /// If blocking is `false`, this function returns at once as successfully setting up the + /// replication. + /// + /// If the node to add is already a voter or learner, it will still re-add it. + /// + /// A `node` is able to store the network address of a node. Thus an application does not + /// need another store for mapping node-id to ip-addr when implementing the RaftNetwork. + #[tracing::instrument(level = "debug", skip(self, id), fields(target=display(id)))] + pub async fn add_learner( + &self, + id: C::NodeId, + node: C::Node, + blocking: bool, + ) -> Result, RaftError>> { + let (tx, rx) = oneshot_channel::(); + + let msg = RaftMsg::ChangeMembership { + changes: ChangeMembers::AddNodes(btreemap! {id=>node}), + retain: true, + tx, + }; + + let resp = self.inner.call_core(msg, rx).await?; + + if !blocking { + return Ok(resp); + } + + if self.inner.id == id { + return Ok(resp); + } + + // Otherwise, blocks until the replication to the new learner becomes up to date. + + // The log id of the membership that contains the added learner. + let membership_log_id = resp.log_id; + + let wait_res = self + .wait(None) + .metrics( + |metrics| match self.check_replication_upto_date(metrics, id, Some(membership_log_id)) { + Ok(_matching) => true, + // keep waiting + Err(_) => false, + }, + "wait new learner to become line-rate", + ) + .await; + + tracing::info!(wait_res = debug(&wait_res), "waiting for replication to new learner"); + + Ok(resp) + } +} + +fn oneshot_channel() -> (OneshotResponder, OneshotReceiverOf>) +where C: RaftTypeConfig { + let (tx, rx) = C::AsyncRuntime::oneshot(); + + let tx = OneshotResponder::new(tx); + + (tx, rx) +} diff --git a/openraft/src/raft/message/client_write.rs b/openraft/src/raft/message/client_write.rs index 9509283d8..00e0a34e4 100644 --- a/openraft/src/raft/message/client_write.rs +++ b/openraft/src/raft/message/client_write.rs @@ -2,10 +2,14 @@ use std::fmt; use std::fmt::Debug; use crate::display_ext::DisplayOptionExt; +use crate::error::ClientWriteError; use crate::LogId; use crate::Membership; use crate::RaftTypeConfig; +/// The result of a write request to Raft. +pub type WriteResult = Result, ClientWriteError>; + /// The response to a client-request. #[cfg_attr( feature = "serde", @@ -23,6 +27,33 @@ pub struct ClientWriteResponse { pub membership: Option>, } +impl ClientWriteResponse +where C: RaftTypeConfig +{ + /// Create a new instance of `ClientWriteResponse`. + #[allow(dead_code)] + pub(crate) fn new_app_response(log_id: LogId, data: C::R) -> Self { + Self { + log_id, + data, + membership: None, + } + } + + pub fn log_id(&self) -> &LogId { + &self.log_id + } + + pub fn response(&self) -> &C::R { + &self.data + } + + /// Return membership config if the log entry is a change-membership entry. + pub fn membership(&self) -> &Option> { + &self.membership + } +} + impl Debug for ClientWriteResponse where C::R: Debug { diff --git a/openraft/src/raft/message/mod.rs b/openraft/src/raft/message/mod.rs index 63d30d509..2b93d61c4 100644 --- a/openraft/src/raft/message/mod.rs +++ b/openraft/src/raft/message/mod.rs @@ -12,6 +12,7 @@ mod client_write; pub use append_entries::AppendEntriesRequest; pub use append_entries::AppendEntriesResponse; pub use client_write::ClientWriteResponse; +pub use client_write::WriteResult; pub use install_snapshot::InstallSnapshotRequest; pub use install_snapshot::InstallSnapshotResponse; pub use install_snapshot::SnapshotResponse; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 1967db606..9672e06af 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -2,23 +2,26 @@ #[cfg(test)] mod declare_raft_types_test; mod external_request; -mod message; +mod impl_raft_blocking_write; +pub(crate) mod message; mod raft_inner; +pub mod responder; mod runtime_config_handle; pub mod trigger; use std::collections::BTreeMap; +use std::error::Error; pub(crate) use self::external_request::BoxCoreFn; pub(in crate::raft) mod core_state; use std::fmt::Debug; +use std::future::Future; use std::sync::Arc; use std::time::Duration; use core_state::CoreState; -use maplit::btreemap; pub use message::AppendEntriesRequest; pub use message::AppendEntriesResponse; pub use message::ClientWriteResponse; @@ -27,6 +30,7 @@ pub use message::InstallSnapshotResponse; pub use message::SnapshotResponse; pub use message::VoteRequest; pub use message::VoteResponse; +pub use message::WriteResult; use tokio::sync::mpsc; use tokio::sync::watch; use tokio::sync::Mutex; @@ -60,15 +64,17 @@ use crate::metrics::Wait; use crate::metrics::WaitError; use crate::network::RaftNetworkFactory; use crate::raft::raft_inner::RaftInner; +use crate::raft::responder::Responder; pub use crate::raft::runtime_config_handle::RuntimeConfigHandle; use crate::raft::trigger::Trigger; use crate::storage::RaftLogStorage; use crate::storage::RaftStateMachine; use crate::type_config::alias::AsyncRuntimeOf; use crate::type_config::alias::JoinErrorOf; +use crate::type_config::alias::ResponderOf; +use crate::type_config::alias::ResponderReceiverOf; use crate::type_config::alias::SnapshotDataOf; use crate::AsyncRuntime; -use crate::ChangeMembers; use crate::LogId; use crate::LogIdOptionExt; use crate::OptionalSend; @@ -97,21 +103,23 @@ use crate::Vote; /// Entry = openraft::Entry, /// SnapshotData = Cursor>, /// AsyncRuntime = openraft::TokioRuntime, +/// Responder = openraft::impls::OneshotResponder, /// ); /// ``` /// /// **The types must be specified in the exact order**: -/// `D`, `R`, `NodeId`, `Node`, `Entry`, `SnapshotData`, `AsyncRuntime`. +/// `D`, `R`, `NodeId`, `Node`, `Entry`, `SnapshotData`, `AsyncRuntime`, `Responder` /// /// Types can be omitted, in which case the default type will be used. /// The default values for each type are: /// - `D`: `String` /// - `R`: `String` /// - `NodeId`: `u64` -/// - `Node`: `::openraft::BasicNode` -/// - `Entry`: `::openraft::Entry` +/// - `Node`: `::openraft::impls::BasicNode` +/// - `Entry`: `::openraft::impls::Entry` /// - `SnapshotData`: `Cursor>` -/// - `AsyncRuntime`: `::openraft::TokioRuntime` +/// - `AsyncRuntime`: `::openraft::impls::TokioRuntime` +/// - `Responder`: `::openraft::impls::OneshotResponder` /// /// For example, to declare with only `D` and `R` types: /// ```ignore @@ -188,7 +196,7 @@ macro_rules! declare_raft_types { }; (@F_3, $($(#[$inner:meta])* $type_id:ident = $type:ty,)* @T) => { - type Node = $crate::BasicNode; + type Node = $crate::impls::BasicNode; $crate::declare_raft_types!(@F_4, $($(#[$inner])* $type_id = $type,)* @T); }; @@ -199,7 +207,7 @@ macro_rules! declare_raft_types { }; (@F_4, $($(#[$inner:meta])* $type_id:ident = $type:ty,)* @T) => { - type Entry = $crate::Entry; + type Entry = $crate::impls::Entry; $crate::declare_raft_types!(@F_5, $($(#[$inner])* $type_id = $type,)* @T); }; @@ -221,19 +229,30 @@ macro_rules! declare_raft_types { }; (@F_6, $($(#[$inner:meta])* $type_id:ident = $type:ty,)* @T) => { - type AsyncRuntime = $crate::TokioRuntime; + type AsyncRuntime = $crate::impls::TokioRuntime; $crate::declare_raft_types!(@F_7, $($(#[$inner])* $type_id = $type,)* @T); }; - (@F_7, @T ) => {}; + (@F_7, $(#[$meta:meta])* Responder=$t:ty, $($(#[$inner:meta])* $type_id:ident = $type:ty,)* @T) => { + $(#[$meta])* + type Responder = $t; + $crate::declare_raft_types!(@F_8, $($(#[$inner])* $type_id = $type,)* @T); + }; + + (@F_7, $($(#[$inner:meta])* $type_id:ident = $type:ty,)* @T) => { + type Responder = $crate::impls::OneshotResponder; + $crate::declare_raft_types!(@F_8, $($(#[$inner])* $type_id = $type,)* @T); + }; + + (@F_8, @T ) => {}; // Match any non-captured items to raise compile error - (@F_7, $($(#[$inner:meta])* $type_id:ident = $type:ty,)* @T ) => { + (@F_8, $($(#[$inner:meta])* $type_id:ident = $type:ty,)* @T ) => { compile_error!( stringify!( Type not in its expected position: $($type_id=$type,)* - types must present in this order: D, R, NodeId, Node, Entry, SnapshotData, AsyncRuntime + types must present in this order: D, R, NodeId, Node, Entry, SnapshotData, AsyncRuntime, Responder ) ); }; @@ -662,12 +681,21 @@ where C: RaftTypeConfig /// These are application specific requirements, and must be implemented by the application /// which is being built on top of Raft. #[tracing::instrument(level = "debug", skip(self, app_data))] - pub async fn client_write( + pub async fn client_write( &self, app_data: C::D, - ) -> Result, RaftError>> { - let (tx, rx) = C::AsyncRuntime::oneshot(); - self.inner.call_core(RaftMsg::ClientWriteRequest { app_data, tx }, rx).await + ) -> Result, RaftError>> + where + ResponderReceiverOf: Future, E>>, + E: Error + OptionalSend, + { + let (app_data, tx, rx) = ResponderOf::::from_app_data(app_data); + + self.inner.send_msg(RaftMsg::ClientWriteRequest { app_data, tx }).await?; + let res: WriteResult = self.inner.recv_msg(rx).await?; + + let client_write_response = res.map_err(|e| RaftError::APIError(e))?; + Ok(client_write_response) } /// Initialize a pristine Raft node with the given config. @@ -706,72 +734,6 @@ where C: RaftTypeConfig .await } - /// Add a new learner raft node, optionally, blocking until up-to-speed. - /// - /// - Add a node as learner into the cluster. - /// - Setup replication from leader to it. - /// - /// If `blocking` is `true`, this function blocks until the leader believes the logs on the new - /// node is up to date, i.e., ready to join the cluster, as a voter, by calling - /// `change_membership`. - /// - /// If blocking is `false`, this function returns at once as successfully setting up the - /// replication. - /// - /// If the node to add is already a voter or learner, it will still re-add it. - /// - /// A `node` is able to store the network address of a node. Thus an application does not - /// need another store for mapping node-id to ip-addr when implementing the RaftNetwork. - #[tracing::instrument(level = "debug", skip(self, id), fields(target=display(id)))] - pub async fn add_learner( - &self, - id: C::NodeId, - node: C::Node, - blocking: bool, - ) -> Result, RaftError>> { - let (tx, rx) = C::AsyncRuntime::oneshot(); - let resp = self - .inner - .call_core( - RaftMsg::ChangeMembership { - changes: ChangeMembers::AddNodes(btreemap! {id=>node}), - retain: true, - tx, - }, - rx, - ) - .await?; - - if !blocking { - return Ok(resp); - } - - if self.inner.id == id { - return Ok(resp); - } - - // Otherwise, blocks until the replication to the new learner becomes up to date. - - // The log id of the membership that contains the added learner. - let membership_log_id = resp.log_id; - - let wait_res = self - .wait(None) - .metrics( - |metrics| match self.check_replication_upto_date(metrics, id, Some(membership_log_id)) { - Ok(_matching) => true, - // keep waiting - Err(_) => false, - }, - "wait new learner to become line-rate", - ) - .await; - - tracing::info!(wait_res = debug(&wait_res), "waiting for replication to new learner"); - - Ok(resp) - } - /// Returns Ok() with the latest known matched log id if it should quit waiting: leader change, /// node removed, or replication becomes upto date. /// @@ -822,87 +784,6 @@ where C: RaftTypeConfig Err(()) } - /// Propose a cluster configuration change. - /// - /// A node in the proposed config has to be a learner, otherwise it fails with LearnerNotFound - /// error. - /// - /// Internally: - /// - It proposes a **joint** config. - /// - When the **joint** config is committed, it proposes a uniform config. - /// - /// If `retain` is true, then all the members which not exists in the new membership, - /// will be turned into learners, otherwise will be removed. - /// - /// Example of `retain` usage: - /// If the original membership is {"voter":{1,2,3}, "learners":{}}, and call - /// `change_membership` with `voters` {3,4,5}, then: - /// - If `retain` is `true`, the committed new membership is {"voters":{3,4,5}, - /// "learners":{1,2}}. - /// - Otherwise if `retain` is `false`, then the new membership is {"voters":{3,4,5}, - /// "learners":{}}, in which the voters not exists in the new membership just be removed - /// from the cluster. - /// - /// If it loses leadership or crashed before committing the second **uniform** config log, the - /// cluster is left in the **joint** config. - #[tracing::instrument(level = "info", skip_all)] - pub async fn change_membership( - &self, - members: impl Into>, - retain: bool, - ) -> Result, RaftError>> { - let changes: ChangeMembers = members.into(); - - tracing::info!( - changes = debug(&changes), - retain = display(retain), - "change_membership: start to commit joint config" - ); - - let (tx, rx) = C::AsyncRuntime::oneshot(); - // res is error if membership can not be changed. - // If no error, it will enter a joint state - let res = self - .inner - .call_core( - RaftMsg::ChangeMembership { - changes: changes.clone(), - retain, - tx, - }, - rx, - ) - .await; - - if let Err(e) = &res { - tracing::error!("the first step error: {}", e); - } - let res = res?; - - tracing::debug!("res of first step: {}", res); - - let (log_id, joint) = (res.log_id, res.membership.clone().unwrap()); - - if joint.get_joint_config().len() == 1 { - return Ok(res); - } - - tracing::debug!("committed a joint config: {} {:?}", log_id, joint); - tracing::debug!("the second step is to change to uniform config: {:?}", changes); - - let (tx, rx) = C::AsyncRuntime::oneshot(); - let res = self.inner.call_core(RaftMsg::ChangeMembership { changes, retain, tx }, rx).await; - - if let Err(e) = &res { - tracing::error!("the second step error: {}", e); - } - let res = res?; - - tracing::info!("res of second step of do_change_membership: {}", res); - - Ok(res) - } - /// Provides read-only access to [`RaftState`] through a user-provided function. /// /// The function `func` is applied to the current [`RaftState`]. The result of this function, diff --git a/openraft/src/raft/raft_inner.rs b/openraft/src/raft/raft_inner.rs index 6d0292779..ab104d41f 100644 --- a/openraft/src/raft/raft_inner.rs +++ b/openraft/src/raft/raft_inner.rs @@ -1,5 +1,6 @@ use std::fmt; use std::fmt::Debug; +use std::future::Future; use std::sync::Arc; use tokio::sync::mpsc; @@ -50,6 +51,36 @@ where C: RaftTypeConfig impl RaftInner where C: RaftTypeConfig { + /// Send a RaftMsg to RaftCore + pub(crate) async fn send_msg(&self, mes: RaftMsg) -> Result<(), Fatal> { + let send_res = self.tx_api.send(mes); + + if let Err(e) = send_res { + let fatal = self.get_core_stopped_error("sending RaftMsg to RaftCore", Some(e.0.to_string())).await; + return Err(fatal); + } + Ok(()) + } + + /// Receive a message from RaftCore, return error if RaftCore has stopped. + pub(crate) async fn recv_msg(&self, rx: impl Future>) -> Result> + where + T: OptionalSend, + E: OptionalSend, + { + let recv_res = rx.await; + tracing::debug!("{} receives result is error: {:?}", func_name!(), recv_res.is_err()); + + match recv_res { + Ok(x) => Ok(x), + Err(_) => { + let fatal = self.get_core_stopped_error("receiving rx from RaftCore", None::<&'static str>).await; + tracing::error!(error = debug(&fatal), "error when {}", func_name!()); + Err(fatal) + } + } + } + /// Invoke RaftCore by sending a RaftMsg and blocks waiting for response. #[tracing::instrument(level = "debug", skip_all)] pub(crate) async fn call_core( @@ -67,12 +98,7 @@ where C: RaftTypeConfig None }; - let send_res = self.tx_api.send(mes); - - if send_res.is_err() { - let fatal = self.get_core_stopped_error("sending tx to RaftCore", sum).await; - return Err(RaftError::Fatal(fatal)); - } + self.send_msg(mes).await?; let recv_res = rx.await; tracing::debug!("call_core receives result is error: {:?}", recv_res.is_err()); diff --git a/openraft/src/raft/responder/impls.rs b/openraft/src/raft/responder/impls.rs new file mode 100644 index 000000000..b6e955f36 --- /dev/null +++ b/openraft/src/raft/responder/impls.rs @@ -0,0 +1,50 @@ +use crate::async_runtime::AsyncOneshotSendExt; +use crate::raft::message::WriteResult; +use crate::raft::responder::Responder; +use crate::type_config::alias::AsyncRuntimeOf; +use crate::type_config::alias::OneshotReceiverOf; +use crate::type_config::alias::OneshotSenderOf; +use crate::AsyncRuntime; +use crate::RaftTypeConfig; + +/// A [`Responder`] implementation that sends the response via a oneshot channel. +/// +/// This could be used when the [`Raft::client_write`] caller want to wait for the response. +/// +/// [`Raft::client_write`]: `crate::raft::Raft::client_write` +pub struct OneshotResponder +where C: RaftTypeConfig +{ + tx: OneshotSenderOf>, +} + +impl OneshotResponder +where C: RaftTypeConfig +{ + /// Create a new instance from a [`AsyncRuntime::OneshotSender`]. + pub fn new(tx: OneshotSenderOf>) -> Self { + Self { tx } + } +} + +impl Responder for OneshotResponder +where C: RaftTypeConfig +{ + type Receiver = OneshotReceiverOf>; + + fn from_app_data(app_data: C::D) -> (C::D, Self, Self::Receiver) + where Self: Sized { + let (tx, rx) = AsyncRuntimeOf::::oneshot(); + (app_data, Self { tx }, rx) + } + + fn send(self, res: WriteResult) { + let res = self.tx.send(res); + + if res.is_ok() { + tracing::debug!("OneshotConsumer.tx.send: is_ok: {}", res.is_ok()); + } else { + tracing::warn!("OneshotConsumer.tx.send: is_ok: {}", res.is_ok()); + } + } +} diff --git a/openraft/src/raft/responder/mod.rs b/openraft/src/raft/responder/mod.rs new file mode 100644 index 000000000..ba4daf371 --- /dev/null +++ b/openraft/src/raft/responder/mod.rs @@ -0,0 +1,39 @@ +//! API to consumer a response when a client write request is completed. + +pub(crate) mod impls; +pub use impls::OneshotResponder; + +use crate::raft::message::WriteResult; +use crate::OptionalSend; +use crate::RaftTypeConfig; + +/// A trait that lets `RaftCore` send the response or an error of a client write request back to the +/// client or to somewhere else. +/// +/// It is created for each request [`AppData`], and is sent to `RaftCore`. +/// Once the request is completed, +/// the `RaftCore` send the result [`WriteResult`] via it. +/// The implementation of the trait then forward the response to application. +/// There could optionally be a receiver to wait for the response. +/// +/// Usually an implementation of [`Responder`] is a oneshot channel Sender, +/// and [`Responder::Receiver`] is a oneshot channel Receiver. +/// +/// [`AppData`]: `crate::AppData` +pub trait Responder: OptionalSend + 'static +where C: RaftTypeConfig +{ + /// An optional receiver to receive the result sent by `RaftCore`. + /// + /// If the application does not need to wait for the response, it can be `()`. + type Receiver; + + /// Build a new instance from the application request. + fn from_app_data(app_data: C::D) -> (C::D, Self, Self::Receiver) + where Self: Sized; + + /// Send result when the request has been completed. + /// + /// This method is called by the `RaftCore` once the request has been applied to state machine. + fn send(self, result: WriteResult); +} diff --git a/openraft/src/type_config.rs b/openraft/src/type_config.rs index c14282449..444a579a3 100644 --- a/openraft/src/type_config.rs +++ b/openraft/src/type_config.rs @@ -2,6 +2,7 @@ use std::fmt::Debug; use crate::entry::FromAppData; use crate::entry::RaftEntry; +use crate::raft::responder::Responder; use crate::AppData; use crate::AppDataResponse; use crate::AsyncRuntime; @@ -71,11 +72,21 @@ pub trait RaftTypeConfig: /// Asynchronous runtime type. type AsyncRuntime: AsyncRuntime; + + /// Send the response or error of a client write request([`WriteResult`]). + /// + /// For example, return [`WriteResult`] the to the caller of [`Raft::client_write`], or send to + /// some application defined channel. + /// + /// [`Raft::client_write`]: `crate::raft::Raft::client_write` + /// [`WriteResult`]: `crate::raft::message::WriteResult` + type Responder: Responder; } #[allow(dead_code)] /// Type alias for types used in `RaftTypeConfig`. pub mod alias { + use crate::raft::responder::Responder; use crate::AsyncRuntime; use crate::RaftTypeConfig; @@ -86,6 +97,8 @@ pub mod alias { pub type EntryOf = ::Entry; pub type SnapshotDataOf = ::SnapshotData; pub type AsyncRuntimeOf = ::AsyncRuntime; + pub type ResponderOf = ::Responder; + pub type ResponderReceiverOf = as Responder>::Receiver; type Rt = AsyncRuntimeOf;