Skip to content

Commit

Permalink
Feature: Add RaftTypeConfig::Responder to customize returning clien…
Browse files Browse the repository at this point in the history
…t write response

This commit introduces the `Responder` trait that defines the mechanism
by which `RaftCore` sends responses back to the client after processing
write requests.  Applications can now customize response handling by
implementing their own version of the `RaftTypeConfig::Responder` trait.

The `Responder::from_app_data(RaftTypeConfig::D)` method is invoked to
create a new `Responder` instance when a client write request is
received.
Once the write operation is completed within `RaftCore`,
`Responder::send(WriteResult)` is called to dispatch the result
back to the client.

By default, `RaftTypeConfig::Responder` retains the existing
functionality using a oneshot channel, ensuring backward compatibility.

This change is non-breaking, requiring no modifications to existing
applications.

- Fix: databendlabs#1068
  • Loading branch information
drmingdrmer committed Apr 8, 2024
1 parent 4512491 commit d67fbce
Show file tree
Hide file tree
Showing 16 changed files with 420 additions and 206 deletions.
29 changes: 11 additions & 18 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use crate::core::notify::Notify;
use crate::core::raft_msg::external_command::ExternalCommand;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::raft_msg::ClientReadTx;
use crate::core::raft_msg::ClientWriteTx;
use crate::core::raft_msg::RaftMsg;
use crate::core::raft_msg::ResultSender;
use crate::core::raft_msg::VoteTx;
Expand Down Expand Up @@ -67,6 +66,7 @@ use crate::progress::entry::ProgressEntry;
use crate::progress::Inflight;
use crate::progress::Progress;
use crate::quorum::QuorumSet;
use crate::raft::responder::Responder;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ClientWriteResponse;
Expand All @@ -87,6 +87,7 @@ use crate::storage::RaftStateMachine;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::ResponderOf;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::Instant;
Expand Down Expand Up @@ -181,7 +182,7 @@ where
pub(crate) engine: Engine<C>,

/// Channels to send result back to client when logs are applied.
pub(crate) client_resp_channels: BTreeMap<u64, ClientWriteTx<C>>,
pub(crate) client_resp_channels: BTreeMap<u64, ResponderOf<C>>,

pub(crate) leader_data: Option<LeaderData<C>>,

Expand Down Expand Up @@ -441,13 +442,13 @@ where
&mut self,
changes: ChangeMembers<C::NodeId, C::Node>,
retain: bool,
tx: ResultSender<C, ClientWriteResponse<C>, ClientWriteError<C>>,
tx: ResponderOf<C>,
) {
let res = self.engine.state.membership_state.change_handler().apply(changes, retain);
let new_membership = match res {
Ok(x) => x,
Err(e) => {
let _ = tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
tx.send(Err(ClientWriteError::ChangeMembershipError(e)));
return;
}
};
Expand All @@ -464,7 +465,7 @@ where
/// The result of applying it to state machine is sent to `resp_tx`, if it is not `None`.
/// The calling side may not receive a result from `resp_tx`, if raft is shut down.
#[tracing::instrument(level = "debug", skip_all, fields(id = display(self.id)))]
pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option<ClientWriteTx<C>>) -> bool {
pub fn write_entry(&mut self, entry: C::Entry, resp_tx: Option<ResponderOf<C>>) -> bool {
tracing::debug!(payload = display(&entry), "write_entry");

let (mut lh, tx) = if let Some((lh, tx)) = self.engine.get_leader_handler_or_reject(resp_tx) {
Expand Down Expand Up @@ -494,7 +495,7 @@ where
pub fn send_heartbeat(&mut self, emitter: impl Display) -> bool {
tracing::debug!(now = debug(InstantOf::<C>::now()), "send_heartbeat");

let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject::<(), ClientWriteError<C>>(None) {
let mut lh = if let Some((lh, _)) = self.engine.get_leader_handler_or_reject(None) {
lh
} else {
tracing::debug!(
Expand Down Expand Up @@ -773,7 +774,7 @@ where

/// Send result of applying a log entry to its client.
#[tracing::instrument(level = "debug", skip_all)]
pub(super) fn send_response(entry: ApplyingEntry<C>, resp: C::R, tx: Option<ClientWriteTx<C>>) {
pub(super) fn send_response(entry: ApplyingEntry<C>, resp: C::R, tx: Option<ResponderOf<C>>) {
tracing::debug!(entry = debug(&entry), "send_response");

let tx = match tx {
Expand All @@ -789,11 +790,7 @@ where
membership,
});

let send_res = tx.send(res);
tracing::debug!(
"send client response through tx, send_res is error: {}",
send_res.is_err()
);
tx.send(res);
}

/// Spawn a new replication stream returning its replication state handle.
Expand Down Expand Up @@ -1629,16 +1626,12 @@ where
#[allow(clippy::let_underscore_future)]
let _ = AsyncRuntimeOf::<C>::spawn(async move {
for (log_index, tx) in removed.into_iter() {
let res = tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
tx.send(Err(ClientWriteError::ForwardToLeader(ForwardToLeader {
leader_id,
leader_node: leader_node.clone(),
})));

tracing::debug!(
"sent ForwardToLeader for log_index: {}, is_ok: {}",
log_index,
res.is_ok()
);
tracing::debug!("sent ForwardToLeader for log_index: {}", log_index,);
}
});
}
Expand Down
10 changes: 3 additions & 7 deletions openraft/src/core/raft_msg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,17 @@ use std::fmt;

use crate::core::raft_msg::external_command::ExternalCommand;
use crate::error::CheckIsLeaderError;
use crate::error::ClientWriteError;
use crate::error::Infallible;
use crate::error::InitializeError;
use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::BoxCoreFn;
use crate::raft::ClientWriteResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::OneshotSenderOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::ChangeMembers;
use crate::RaftTypeConfig;
Expand All @@ -32,9 +31,6 @@ pub(crate) type VoteTx<C> = ResultSender<C, VoteResponse<C>>;
/// TX for Append Entries Response
pub(crate) type AppendEntriesTx<C> = ResultSender<C, AppendEntriesResponse<C>>;

/// TX for Client Write Response
pub(crate) type ClientWriteTx<C> = ResultSender<C, ClientWriteResponse<C>, ClientWriteError<C>>;

/// TX for Linearizable Read Response
pub(crate) type ClientReadTx<C> = ResultSender<C, (Option<LogIdOf<C>>, Option<LogIdOf<C>>), CheckIsLeaderError<C>>;

Expand Down Expand Up @@ -72,7 +68,7 @@ where C: RaftTypeConfig

ClientWriteRequest {
app_data: C::D,
tx: ClientWriteTx<C>,
tx: ResponderOf<C>,
},

CheckIsLeaderRequest {
Expand All @@ -91,7 +87,7 @@ where C: RaftTypeConfig
/// config will be converted into learners, otherwise they will be removed.
retain: bool,

tx: ResultSender<C, ClientWriteResponse<C>, ClientWriteError<C>>,
tx: ResponderOf<C>,
},

ExternalCoreRequest {
Expand Down
1 change: 1 addition & 0 deletions openraft/src/core/tick.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ mod tests {
type Entry = crate::Entry<TickUTConfig>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder = crate::impls::OneshotResponder<Self>;
}

// AsyncRuntime::spawn is `spawn_local` with singlethreaded enabled.
Expand Down
17 changes: 6 additions & 11 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use std::time::Duration;

use validit::Valid;

use crate::async_runtime::AsyncOneshotSendExt;
use crate::core::raft_msg::AppendEntriesTx;
use crate::core::raft_msg::ResultSender;
use crate::core::sm;
Expand Down Expand Up @@ -31,19 +30,20 @@ use crate::error::NotInMembers;
use crate::error::RejectAppendEntries;
use crate::internal_server_state::InternalServerState;
use crate::membership::EffectiveMembership;
use crate::raft::responder::Responder;
use crate::raft::AppendEntriesResponse;
use crate::raft::SnapshotResponse;
use crate::raft::VoteRequest;
use crate::raft::VoteResponse;
use crate::raft_state::LogStateReader;
use crate::raft_state::RaftState;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::Instant;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::Membership;
use crate::OptionalSend;
use crate::RaftLogId;
use crate::RaftTypeConfig;
use crate::Snapshot;
Expand Down Expand Up @@ -222,15 +222,10 @@ where C: RaftTypeConfig
///
/// If tx is None, no response will be sent.
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn get_leader_handler_or_reject<T, E>(
pub(crate) fn get_leader_handler_or_reject(
&mut self,
tx: Option<ResultSender<C, T, E>>,
) -> Option<(LeaderHandler<C>, Option<ResultSender<C, T, E>>)>
where
T: OptionalSend,
E: OptionalSend,
E: From<ForwardToLeader<C>>,
{
tx: Option<ResponderOf<C>>,
) -> Option<(LeaderHandler<C>, Option<ResponderOf<C>>)> {
let res = self.leader_handler();
let forward_err = match res {
Ok(lh) => {
Expand All @@ -241,7 +236,7 @@ where C: RaftTypeConfig
};

if let Some(tx) = tx {
let _ = tx.send(Err(forward_err.into()));
tx.send(Err(forward_err.into()));
}

None
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,5 @@ where N: Node + Ord
type Entry = crate::Entry<Self>;
type SnapshotData = Cursor<Vec<u8>>;
type AsyncRuntime = TokioRuntime;
type Responder = crate::impls::OneshotResponder<Self>;
}
7 changes: 7 additions & 0 deletions openraft/src/impls/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
//! Collection of implementations of usually used traits defined by Openraft

pub use crate::async_runtime::TokioRuntime;
pub use crate::entry::Entry;
pub use crate::node::BasicNode;
pub use crate::node::EmptyNode;
pub use crate::raft::responder::impls::OneshotResponder;
1 change: 1 addition & 0 deletions openraft/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ mod vote;
pub mod async_runtime;
pub mod entry;
pub mod error;
pub mod impls;
pub mod instant;
pub mod log_id;
pub mod metrics;
Expand Down
1 change: 1 addition & 0 deletions openraft/src/raft/declare_raft_types_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ declare_raft_types!(
Entry = crate::Entry<Self>,
SnapshotData = Cursor<Vec<u8>>,
AsyncRuntime = TokioRuntime,
Responder = crate::impls::OneshotResponder<Self>,
);

declare_raft_types!(
Expand Down
Loading

0 comments on commit d67fbce

Please sign in to comment.