Skip to content

Commit

Permalink
Chore: remove SM type param from RaftCore
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Jul 5, 2024
1 parent 0e6efaa commit b60e7dc
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 49 deletions.
44 changes: 17 additions & 27 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ use std::collections::BTreeMap;
use std::fmt::Debug;
use std::fmt::Display;
use std::fmt::Formatter;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::atomic::Ordering;
use std::sync::Arc;
Expand Down Expand Up @@ -34,7 +33,6 @@ use crate::core::raft_msg::RaftMsg;
use crate::core::raft_msg::ResultSender;
use crate::core::raft_msg::VoteTx;
use crate::core::sm;
use crate::core::sm::handle;
use crate::core::sm::CommandSeq;
use crate::core::ServerState;
use crate::display_ext::DisplayInstantExt;
Expand Down Expand Up @@ -88,12 +86,10 @@ use crate::runtime::RaftRuntime;
use crate::storage::LogFlushed;
use crate::storage::RaftLogReaderExt;
use crate::storage::RaftLogStorage;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::OneshotReceiverOf;
use crate::type_config::alias::ResponderOf;
use crate::type_config::TypeConfigExt;
use crate::AsyncRuntime;
use crate::ChangeMembers;
use crate::Instant;
use crate::LogId;
Expand Down Expand Up @@ -136,14 +132,12 @@ impl<C: RaftTypeConfig> Debug for ApplyResult<C> {
}
}

// TODO: remove SM
/// The core type implementing the Raft protocol.
pub struct RaftCore<C, N, LS, SM>
pub struct RaftCore<C, NF, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
NF: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
/// This node's ID.
pub(crate) id: C::NodeId,
Expand All @@ -154,13 +148,13 @@ where
pub(crate) runtime_config: Arc<RuntimeConfig>,

/// The `RaftNetworkFactory` implementation.
pub(crate) network: N,
pub(crate) network_factory: NF,

/// The [`RaftLogStorage`] implementation.
pub(crate) log_store: LS,

/// A controlling handle to the [`RaftStateMachine`] worker.
pub(crate) sm_handle: handle::Handle<C>,
pub(crate) sm_handle: sm::handle::Handle<C>,

pub(crate) engine: Engine<C>,

Expand Down Expand Up @@ -188,16 +182,13 @@ where
pub(crate) command_state: CommandState,

pub(crate) span: Span,

pub(crate) _p: PhantomData<SM>,
}

impl<C, N, LS, SM> RaftCore<C, N, LS, SM>
impl<C, NF, LS> RaftCore<C, NF, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
NF: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
/// The main loop of the Raft protocol.
pub(crate) async fn main(mut self, rx_shutdown: OneshotReceiverOf<C, ()>) -> Result<Infallible, Fatal<C>> {
Expand Down Expand Up @@ -315,12 +306,12 @@ where

// Safe unwrap(): target is in membership
let target_node = eff_mem.get_node(&target).unwrap().clone();
let mut client = self.network.new_client(target, &target_node).await;
let mut client = self.network_factory.new_client(target, &target_node).await;

let option = RPCOption::new(ttl);

let fu = async move {
let outer_res = C::AsyncRuntime::timeout(ttl, client.append_entries(rpc, option)).await;
let outer_res = C::timeout(ttl, client.append_entries(rpc, option)).await;
match outer_res {
Ok(append_res) => match append_res {
Ok(x) => Ok((target, x)),
Expand All @@ -340,7 +331,7 @@ where
};

let fu = fu.instrument(tracing::debug_span!("spawn_is_leader", target = target.to_string()));
let task = C::AsyncRuntime::spawn(fu).map_err(move |err| (target, err));
let task = C::spawn(fu).map_err(move |err| (target, err));

pending.push(task);
}
Expand Down Expand Up @@ -408,7 +399,7 @@ where

// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
#[allow(clippy::let_underscore_future)]
let _ = C::AsyncRuntime::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));
let _ = C::spawn(waiting_fu.instrument(tracing::debug_span!("spawn_is_leader_waiting")));
}

/// Submit change-membership by writing a Membership log entry.
Expand Down Expand Up @@ -791,14 +782,14 @@ where
let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap();

let membership_log_id = self.engine.state.membership_state.effective().log_id();
let network = self.network.new_client(target, target_node).await;
let snapshot_network = self.network.new_client(target, target_node).await;
let network = self.network_factory.new_client(target, target_node).await;
let snapshot_network = self.network_factory.new_client(target, target_node).await;

let leader = self.engine.leader.as_ref().unwrap();

let session_id = ReplicationSessionId::new(leader.vote, *membership_log_id);

ReplicationCore::<C, N, LS>::spawn(
ReplicationCore::<C, NF, LS>::spawn(
target,
session_id,
self.config.clone(),
Expand Down Expand Up @@ -1030,7 +1021,7 @@ where

// Safe unwrap(): target must be in membership
let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone();
let mut client = self.network.new_client(target, &target_node).await;
let mut client = self.network_factory.new_client(target, &target_node).await;

let tx = self.tx_notify.clone();

Expand All @@ -1040,9 +1031,9 @@ where

// False positive lint warning(`non-binding `let` on a future`): https://github.com/rust-lang/rust-clippy/issues/9932
#[allow(clippy::let_underscore_future)]
let _ = C::AsyncRuntime::spawn(
let _ = C::spawn(
async move {
let tm_res = C::AsyncRuntime::timeout(ttl, client.vote(req, option)).await;
let tm_res = C::timeout(ttl, client.vote(req, option)).await;
let res = match tm_res {
Ok(res) => res,

Expand Down Expand Up @@ -1528,12 +1519,11 @@ where
}
}

impl<C, N, LS, SM> RaftRuntime<C> for RaftCore<C, N, LS, SM>
impl<C, N, LS> RaftRuntime<C> for RaftCore<C, N, LS>
where
C: RaftTypeConfig,
N: RaftNetworkFactory<C>,
LS: RaftLogStorage<C>,
SM: RaftStateMachine<C>,
{
async fn run_command<'e>(&mut self, cmd: Command<C>) -> Result<Option<Command<C>>, StorageError<C>> {
let condition = cmd.condition();
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/core/sm/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use crate::display_ext::DisplayOptionExt;
use crate::entry::RaftPayload;
use crate::storage::RaftStateMachine;
use crate::type_config::alias::JoinHandleOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;
use crate::RaftLogId;
use crate::RaftSnapshotBuilder;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -55,7 +55,7 @@ where
}

fn do_spawn(mut self) -> JoinHandleOf<C, ()> {
C::AsyncRuntime::spawn(async move {
C::spawn(async move {
let res = self.worker_loop().await;

if let Err(err) = res {
Expand Down Expand Up @@ -183,7 +183,7 @@ where

let mut builder = self.state_machine.get_snapshot_builder().await;

let _handle = C::AsyncRuntime::spawn(async move {
let _handle = C::spawn(async move {
let res = builder.build_snapshot().await;
let res = res.map(|snap| Response::BuildSnapshot(snap.meta));
let cmd_res = CommandResult::new(seq, res);
Expand Down
4 changes: 2 additions & 2 deletions openraft/src/raft/impl_raft_blocking_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::raft::message::ClientWriteResult;
use crate::raft::responder::OneshotResponder;
use crate::raft::ClientWriteResponse;
use crate::type_config::alias::OneshotReceiverOf;
use crate::AsyncRuntime;
use crate::type_config::TypeConfigExt;
use crate::ChangeMembers;
use crate::Raft;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -170,7 +170,7 @@ where C: RaftTypeConfig<Responder = OneshotResponder<C>>

fn oneshot_channel<C>() -> (OneshotResponder<C>, OneshotReceiverOf<C, ClientWriteResult<C>>)
where C: RaftTypeConfig {
let (tx, rx) = C::AsyncRuntime::oneshot();
let (tx, rx) = C::oneshot();

let tx = OneshotResponder::new(tx);

Expand Down
27 changes: 12 additions & 15 deletions openraft/src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ use crate::type_config::alias::ResponderOf;
use crate::type_config::alias::ResponderReceiverOf;
use crate::type_config::alias::SnapshotDataOf;
use crate::type_config::TypeConfigExt;
use crate::AsyncRuntime;
use crate::LogId;
use crate::LogIdOptionExt;
use crate::OptionalSend;
Expand Down Expand Up @@ -245,7 +244,7 @@ where C: RaftTypeConfig
let (tx_metrics, rx_metrics) = watch::channel(RaftMetrics::new_initial(id));
let (tx_data_metrics, rx_data_metrics) = watch::channel(RaftDataMetrics::default());
let (tx_server_metrics, rx_server_metrics) = watch::channel(RaftServerMetrics::default());
let (tx_shutdown, rx_shutdown) = C::AsyncRuntime::oneshot();
let (tx_shutdown, rx_shutdown) = C::oneshot();

let tick_handle = Tick::spawn(
Duration::from_millis(config.heartbeat_interval * 3 / 2),
Expand Down Expand Up @@ -274,11 +273,11 @@ where C: RaftTypeConfig

let sm_handle = worker::Worker::spawn(state_machine, tx_notify.clone());

let core: RaftCore<C, N, LS, SM> = RaftCore {
let core: RaftCore<C, N, LS> = RaftCore {
id,
config: config.clone(),
runtime_config: runtime_config.clone(),
network,
network_factory: network,
log_store,
sm_handle,

Expand All @@ -300,11 +299,9 @@ where C: RaftTypeConfig

command_state: CommandState::default(),
span: core_span,

_p: Default::default(),
};

let core_handle = C::AsyncRuntime::spawn(core.main(rx_shutdown).instrument(trace_span!("spawn").or_current()));
let core_handle = C::spawn(core.main(rx_shutdown).instrument(trace_span!("spawn").or_current()));

let inner = RaftInner {
id,
Expand Down Expand Up @@ -363,7 +360,7 @@ where C: RaftTypeConfig
pub async fn append_entries(&self, rpc: AppendEntriesRequest<C>) -> Result<AppendEntriesResponse<C>, RaftError<C>> {
tracing::debug!(rpc = display(&rpc), "Raft::append_entries");

let (tx, rx) = C::AsyncRuntime::oneshot();
let (tx, rx) = C::oneshot();
self.inner.call_core(RaftMsg::AppendEntries { rpc, tx }, rx).await
}

Expand All @@ -375,7 +372,7 @@ where C: RaftTypeConfig
pub async fn vote(&self, rpc: VoteRequest<C>) -> Result<VoteResponse<C>, RaftError<C>> {
tracing::info!(rpc = display(&rpc), "Raft::vote()");

let (tx, rx) = C::AsyncRuntime::oneshot();
let (tx, rx) = C::oneshot();
self.inner.call_core(RaftMsg::RequestVote { rpc, tx }, rx).await
}

Expand All @@ -387,7 +384,7 @@ where C: RaftTypeConfig
pub async fn get_snapshot(&self) -> Result<Option<Snapshot<C>>, RaftError<C>> {
tracing::debug!("Raft::get_snapshot()");

let (tx, rx) = C::AsyncRuntime::oneshot();
let (tx, rx) = C::oneshot();
let cmd = ExternalCommand::GetSnapshot { tx };
self.inner.call_core(RaftMsg::ExternalCommand { cmd }, rx).await
}
Expand Down Expand Up @@ -415,7 +412,7 @@ where C: RaftTypeConfig
) -> Result<SnapshotResponse<C>, Fatal<C>> {
tracing::info!("Raft::install_full_snapshot()");

let (tx, rx) = C::AsyncRuntime::oneshot();
let (tx, rx) = C::oneshot();
let res = self.inner.call_core(RaftMsg::InstallFullSnapshot { vote, snapshot, tx }, rx).await;
match res {
Ok(x) => Ok(x),
Expand Down Expand Up @@ -492,7 +489,7 @@ where C: RaftTypeConfig
#[deprecated(since = "0.9.0", note = "use `Raft::ensure_linearizable()` instead")]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn is_leader(&self) -> Result<(), RaftError<C, CheckIsLeaderError<C>>> {
let (tx, rx) = C::AsyncRuntime::oneshot();
let (tx, rx) = C::oneshot();
let _ = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?;
Ok(())
}
Expand Down Expand Up @@ -571,7 +568,7 @@ where C: RaftTypeConfig
pub async fn get_read_log_id(
&self,
) -> Result<(Option<LogId<C::NodeId>>, Option<LogId<C::NodeId>>), RaftError<C, CheckIsLeaderError<C>>> {
let (tx, rx) = C::AsyncRuntime::oneshot();
let (tx, rx) = C::oneshot();
let (read_log_id, applied) = self.inner.call_core(RaftMsg::CheckIsLeaderRequest { tx }, rx).await?;
Ok((read_log_id, applied))
}
Expand Down Expand Up @@ -658,7 +655,7 @@ where C: RaftTypeConfig
#[tracing::instrument(level = "debug", skip(self))]
pub async fn initialize<T>(&self, members: T) -> Result<(), RaftError<C, InitializeError<C>>>
where T: IntoNodes<C::NodeId, C::Node> + Debug {
let (tx, rx) = C::AsyncRuntime::oneshot();
let (tx, rx) = C::oneshot();
self.inner
.call_core(
RaftMsg::Initialize {
Expand Down Expand Up @@ -740,7 +737,7 @@ where C: RaftTypeConfig
F: FnOnce(&RaftState<C>) -> V + OptionalSend + 'static,
V: OptionalSend + 'static,
{
let (tx, rx) = C::AsyncRuntime::oneshot();
let (tx, rx) = C::oneshot();

self.external_request(|st| {
let result = func(st);
Expand Down
3 changes: 1 addition & 2 deletions openraft/src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ use crate::type_config::alias::InstantOf;
use crate::type_config::alias::JoinHandleOf;
use crate::type_config::alias::LogIdOf;
use crate::type_config::TypeConfigExt;
use crate::AsyncRuntime;
use crate::LogId;
use crate::RaftLogId;
use crate::RaftNetworkFactory;
Expand Down Expand Up @@ -198,7 +197,7 @@ where
entries_hint: Default::default(),
};

let join_handle = C::AsyncRuntime::spawn(this.main().instrument(span));
let join_handle = C::spawn(this.main().instrument(span));

ReplicationHandle {
join_handle,
Expand Down

0 comments on commit b60e7dc

Please sign in to comment.