diff --git a/openraft/src/core/balancer.rs b/openraft/src/core/balancer.rs index dda23d348..95ce11a9a 100644 --- a/openraft/src/core/balancer.rs +++ b/openraft/src/core/balancer.rs @@ -24,11 +24,11 @@ impl Balancer { self.raft_msg } - pub(crate) fn notify(&self) -> u64 { + pub(crate) fn notification(&self) -> u64 { self.total - self.raft_msg } - pub(crate) fn increase_notify(&mut self) { + pub(crate) fn increase_notification(&mut self) { self.raft_msg = self.raft_msg * 15 / 16; if self.raft_msg == 0 { self.raft_msg = 1; diff --git a/openraft/src/core/mod.rs b/openraft/src/core/mod.rs index d401e4707..9cb824774 100644 --- a/openraft/src/core/mod.rs +++ b/openraft/src/core/mod.rs @@ -6,7 +6,7 @@ pub(crate) mod balancer; pub(crate) mod command_state; -pub(crate) mod notify; +pub(crate) mod notification; mod raft_core; pub(crate) mod raft_msg; mod replication_state; diff --git a/openraft/src/core/notify.rs b/openraft/src/core/notification.rs similarity index 96% rename from openraft/src/core/notify.rs rename to openraft/src/core/notification.rs index b1670d27e..96939359b 100644 --- a/openraft/src/core/notify.rs +++ b/openraft/src/core/notification.rs @@ -9,7 +9,7 @@ use crate::StorageError; use crate::Vote; /// A message coming from the internal components. -pub(crate) enum Notify +pub(crate) enum Notification where C: RaftTypeConfig { VoteResponse { @@ -59,7 +59,7 @@ where C: RaftTypeConfig }, } -impl Notify +impl Notification where C: RaftTypeConfig { pub(crate) fn sm(command_result: sm::CommandResult) -> Self { @@ -67,7 +67,7 @@ where C: RaftTypeConfig } } -impl fmt::Display for Notify +impl fmt::Display for Notification where C: RaftTypeConfig { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { diff --git a/openraft/src/core/raft_core.rs b/openraft/src/core/raft_core.rs index e589be8db..0294a4a7b 100644 --- a/openraft/src/core/raft_core.rs +++ b/openraft/src/core/raft_core.rs @@ -26,7 +26,7 @@ use crate::config::Config; use crate::config::RuntimeConfig; use crate::core::balancer::Balancer; use crate::core::command_state::CommandState; -use crate::core::notify::Notify; +use crate::core::notification::Notification; use crate::core::raft_msg::external_command::ExternalCommand; use crate::core::raft_msg::AppendEntriesTx; use crate::core::raft_msg::ClientReadTx; @@ -175,10 +175,10 @@ where /// A Sender to send callback by other components to [`RaftCore`], when an action is finished, /// such as flushing log to disk, or applying log entries to state machine. - pub(crate) tx_notify: MpscUnboundedSenderOf>, + pub(crate) tx_notification: MpscUnboundedSenderOf>, /// A Receiver to receive callback from other components. - pub(crate) rx_notify: MpscUnboundedReceiverOf>, + pub(crate) rx_notification: MpscUnboundedReceiverOf>, pub(crate) tx_metrics: WatchSenderOf>, pub(crate) tx_data_metrics: WatchSenderOf>, @@ -278,7 +278,7 @@ where let my_vote = *self.engine.state.vote_ref(); let ttl = Duration::from_millis(self.config.heartbeat_interval); let eff_mem = self.engine.state.membership_state.effective().clone(); - let core_tx = self.tx_notify.clone(); + let core_tx = self.tx_notification.clone(); let mut granted = btreeset! {my_id}; @@ -366,7 +366,7 @@ where vote ); - let send_res = core_tx.send(Notify::HigherVote { + let send_res = core_tx.send(Notification::HigherVote { target, higher: vote, sender_vote: my_vote, @@ -765,7 +765,7 @@ where snapshot_network, self.log_store.get_log_reader().await, self.sm_handle.new_snapshot_reader(), - self.tx_notify.clone(), + self.tx_notification.clone(), tracing::span!(parent: &self.span, Level::DEBUG, "replication", id=display(self.id), target=display(target)), ) } @@ -858,9 +858,9 @@ where return Err(Fatal::Stopped); } - notify_res = self.rx_notify.recv() => { + notify_res = self.rx_notification.recv() => { match notify_res { - Some(notify) => self.handle_notify(notify)?, + Some(notify) => self.handle_notification(notify)?, None => { tracing::error!("all rx_notify senders are dropped"); return Err(Fatal::Stopped); @@ -884,14 +884,14 @@ where // There is a message waking up the loop, process channels one by one. let raft_msg_processed = self.process_raft_msg(balancer.raft_msg()).await?; - let notify_processed = self.process_notify(balancer.notify()).await?; + let notify_processed = self.process_notification(balancer.notification()).await?; // If one of the channel consumed all its budget, re-balance the budget ratio. #[allow(clippy::collapsible_else_if)] - if notify_processed == balancer.notify() { + if notify_processed == balancer.notification() { tracing::info!("there may be more Notify to process, increase Notify ratio"); - balancer.increase_notify(); + balancer.increase_notification(); } else { if raft_msg_processed == balancer.raft_msg() { tracing::info!("there may be more RaftMsg to process, increase RaftMsg ratio"); @@ -946,9 +946,9 @@ where /// /// It returns the number of processed notifications. /// If the input channel is closed, it returns `Fatal::Stopped`. - async fn process_notify(&mut self, at_most: u64) -> Result> { + async fn process_notification(&mut self, at_most: u64) -> Result> { for i in 0..at_most { - let res = self.rx_notify.try_recv(); + let res = self.rx_notification.try_recv(); let notify = match res { Ok(msg) => msg, Err(e) => match e { @@ -963,7 +963,7 @@ where }, }; - self.handle_notify(notify)?; + self.handle_notification(notify)?; // TODO: does run_engine_commands() run too frequently? // to run many commands in one shot, it is possible to batch more commands to gain @@ -995,7 +995,7 @@ where let target_node = self.engine.state.membership_state.effective().get_node(&target).unwrap().clone(); let mut client = self.network_factory.new_client(target, &target_node).await; - let tx = self.tx_notify.clone(); + let tx = self.tx_notification.clone(); let ttl = Duration::from_millis(self.config.election_timeout_min); let id = self.id; @@ -1023,7 +1023,7 @@ where match res { Ok(resp) => { - let _ = tx.send(Notify::VoteResponse { + let _ = tx.send(Notification::VoteResponse { target, resp, sender_vote: vote, @@ -1151,11 +1151,11 @@ where // TODO: Make this method non-async. It does not need to run any async command in it. #[tracing::instrument(level = "debug", skip_all, fields(state = debug(self.engine.state.server_state), id=display(self.id)))] - pub(crate) fn handle_notify(&mut self, notify: Notify) -> Result<(), Fatal> { + pub(crate) fn handle_notification(&mut self, notify: Notification) -> Result<(), Fatal> { tracing::debug!("recv from rx_notify: {}", notify); match notify { - Notify::VoteResponse { + Notification::VoteResponse { target, resp, sender_vote, @@ -1174,7 +1174,7 @@ where } } - Notify::HigherVote { + Notification::HigherVote { target, higher, sender_vote, @@ -1193,7 +1193,7 @@ where } } - Notify::Tick { i } => { + Notification::Tick { i } => { // check every timer let now = C::now(); @@ -1241,12 +1241,12 @@ where } } - Notify::StorageError { error } => { + Notification::StorageError { error } => { tracing::error!("RaftCore received Notify::StorageError: {}", error); return Err(Fatal::StorageError(error)); } - Notify::LocalIO { io_id } => { + Notification::LocalIO { io_id } => { match io_id { IOId::AppendLog(append_log_io_id) => { // No need to check against membership change, @@ -1259,7 +1259,7 @@ where } } - Notify::Network { response } => { + Notification::Network { response } => { // match response { replication::Response::Progress { @@ -1306,7 +1306,7 @@ where } } - Notify::StateMachine { command_result } => { + Notification::StateMachine { command_result } => { tracing::debug!("sm::StateMachine command result: {:?}", command_result); let seq = command_result.command_seq; @@ -1554,8 +1554,8 @@ where tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),); let io_id = IOId::new_append_log(vote.into_committed(), last_log_id); - let notify = Notify::LocalIO { io_id }; - let callback = IOFlushed::new(notify, self.tx_notify.downgrade()); + let notify = Notification::LocalIO { io_id }; + let callback = IOFlushed::new(notify, self.tx_notification.downgrade()); // Submit IO request, do not wait for the response. self.log_store.append(entries, callback).await?; @@ -1564,7 +1564,7 @@ where self.log_store.save_vote(&vote).await?; self.engine.state.io_state_mut().update_vote(vote); - let _ = self.tx_notify.send(Notify::VoteResponse { + let _ = self.tx_notification.send(Notification::VoteResponse { target: self.id, // last_log_id is not used when sending VoteRequest to local node resp: VoteResponse::new(vote, None), diff --git a/openraft/src/core/sm/worker.rs b/openraft/src/core/sm/worker.rs index d45a04544..e068993fe 100644 --- a/openraft/src/core/sm/worker.rs +++ b/openraft/src/core/sm/worker.rs @@ -1,7 +1,7 @@ use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::OneshotSender; -use crate::core::notify::Notify; +use crate::core::notification::Notification; use crate::core::raft_msg::ResultSender; use crate::core::sm::handle::Handle; use crate::core::sm::Command; @@ -46,7 +46,7 @@ where cmd_rx: MpscUnboundedReceiverOf>, /// Send back the result of the command to RaftCore. - resp_tx: MpscUnboundedSenderOf>, + resp_tx: MpscUnboundedSenderOf>, } impl Worker @@ -56,7 +56,11 @@ where LR: RaftLogReader, { /// Spawn a new state machine worker, return a controlling handle. - pub(crate) fn spawn(state_machine: SM, log_reader: LR, resp_tx: MpscUnboundedSenderOf>) -> Handle { + pub(crate) fn spawn( + state_machine: SM, + log_reader: LR, + resp_tx: MpscUnboundedSenderOf>, + ) -> Handle { let (cmd_tx, cmd_rx) = C::mpsc_unbounded(); let worker = Worker { @@ -78,7 +82,7 @@ where if let Err(err) = res { tracing::error!("{} while execute state machine command", err,); - let _ = self.resp_tx.send(Notify::StateMachine { + let _ = self.resp_tx.send(Notification::StateMachine { command_result: CommandResult { command_seq: 0, result: Err(err), @@ -124,7 +128,7 @@ where tracing::info!("Done install complete snapshot, meta: {}", meta); let res = CommandResult::new(cmd.seq, Ok(Response::InstallSnapshot(Some(meta)))); - let _ = self.resp_tx.send(Notify::sm(res)); + let _ = self.resp_tx.send(Notification::sm(res)); } CommandPayload::BeginReceivingSnapshot { tx } => { tracing::info!("{}: BeginReceivingSnapshot", func_name!()); @@ -137,7 +141,7 @@ where CommandPayload::Apply { first, last } => { let resp = self.apply(first, last).await?; let res = CommandResult::new(cmd.seq, Ok(Response::Apply(resp))); - let _ = self.resp_tx.send(Notify::sm(res)); + let _ = self.resp_tx.send(Notification::sm(res)); } }; } @@ -195,7 +199,7 @@ where /// as applying a log entry, /// - or it must be able to acquire a lock that prevents any write operations. #[tracing::instrument(level = "info", skip_all)] - async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: MpscUnboundedSenderOf>) { + async fn build_snapshot(&mut self, seq: CommandSeq, resp_tx: MpscUnboundedSenderOf>) { // TODO: need to be abortable? // use futures::future::abortable; // let (fu, abort_handle) = abortable(async move { builder.build_snapshot().await }); @@ -208,7 +212,7 @@ where let res = builder.build_snapshot().await; let res = res.map(|snap| Response::BuildSnapshot(snap.meta)); let cmd_res = CommandResult::new(seq, res); - let _ = resp_tx.send(Notify::sm(cmd_res)); + let _ = resp_tx.send(Notification::sm(cmd_res)); }); tracing::info!("{} returning; spawned building snapshot task", func_name!()); } diff --git a/openraft/src/core/tick.rs b/openraft/src/core/tick.rs index 7ec997231..9fa7c9c79 100644 --- a/openraft/src/core/tick.rs +++ b/openraft/src/core/tick.rs @@ -13,7 +13,7 @@ use tracing::Level; use tracing::Span; use crate::async_runtime::MpscUnboundedSender; -use crate::core::notify::Notify; +use crate::core::notification::Notification; use crate::type_config::alias::JoinHandleOf; use crate::type_config::alias::MpscUnboundedSenderOf; use crate::type_config::TypeConfigExt; @@ -25,7 +25,7 @@ where C: RaftTypeConfig { interval: Duration, - tx: MpscUnboundedSenderOf>, + tx: MpscUnboundedSenderOf>, /// Emit event or not enabled: Arc, @@ -54,7 +54,11 @@ where C: RaftTypeConfig impl Tick where C: RaftTypeConfig { - pub(crate) fn spawn(interval: Duration, tx: MpscUnboundedSenderOf>, enabled: bool) -> TickHandle { + pub(crate) fn spawn( + interval: Duration, + tx: MpscUnboundedSenderOf>, + enabled: bool, + ) -> TickHandle { let enabled = Arc::new(AtomicBool::from(enabled)); let this = Self { interval, @@ -106,7 +110,7 @@ where C: RaftTypeConfig i += 1; - let send_res = self.tx.send(Notify::Tick { i }); + let send_res = self.tx.send(Notification::Tick { i }); if let Err(_e) = send_res { tracing::info!("Stopping tick_loop(), main loop terminated"); break; diff --git a/openraft/src/raft/mod.rs b/openraft/src/raft/mod.rs index 16715b709..166440add 100644 --- a/openraft/src/raft/mod.rs +++ b/openraft/src/raft/mod.rs @@ -291,8 +291,8 @@ where C: RaftTypeConfig tx_api: tx_api.clone(), rx_api, - tx_notify, - rx_notify, + tx_notification: tx_notify, + rx_notification: rx_notify, tx_metrics, tx_data_metrics, diff --git a/openraft/src/replication/mod.rs b/openraft/src/replication/mod.rs index 4bb74f6c0..8d0b9b9ff 100644 --- a/openraft/src/replication/mod.rs +++ b/openraft/src/replication/mod.rs @@ -27,7 +27,7 @@ use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::MpscUnboundedWeakSender; use crate::config::Config; -use crate::core::notify::Notify; +use crate::core::notification::Notification; use crate::core::sm::handle::SnapshotReader; use crate::display_ext::DisplayOptionExt; use crate::error::HigherVote; @@ -95,7 +95,7 @@ where /// A channel for sending events to the RaftCore. #[allow(clippy::type_complexity)] - tx_raft_core: MpscUnboundedSenderOf>, + tx_raft_core: MpscUnboundedSenderOf>, /// A channel for receiving events from the RaftCore and snapshot transmitting task. rx_event: MpscUnboundedReceiverOf>, @@ -169,7 +169,7 @@ where snapshot_network: N::Network, log_reader: LS::LogReader, snapshot_reader: SnapshotReader, - tx_raft_core: MpscUnboundedSenderOf>, + tx_raft_core: MpscUnboundedSenderOf>, span: tracing::Span, ) -> ReplicationHandle { tracing::debug!( @@ -264,7 +264,7 @@ where return Err(closed); } ReplicationError::HigherVote(h) => { - let _ = self.tx_raft_core.send(Notify::Network { + let _ = self.tx_raft_core.send(Notification::Network { response: Response::HigherVote { target: self.target, higher: h.higher, @@ -277,7 +277,7 @@ where tracing::error!(error=%error, "error replication to target={}", self.target); // TODO: report this error - let _ = self.tx_raft_core.send(Notify::Network { + let _ = self.tx_raft_core.send(Notification::Network { response: Response::StorageError { error }, }); return Ok(()); @@ -498,7 +498,7 @@ where /// Send the error result to RaftCore. /// RaftCore will then submit another replication command. fn send_progress_error(&mut self, request_id: RequestId, err: RPCError) { - let _ = self.tx_raft_core.send(Notify::Network { + let _ = self.tx_raft_core.send(Notification::Network { response: Response::Progress { target: self.target, request_id, @@ -530,7 +530,7 @@ where } let _ = self.tx_raft_core.send({ - Notify::Network { + Notification::Network { response: Response::Progress { session_id: self.session_id, request_id, diff --git a/openraft/src/storage/callback.rs b/openraft/src/storage/callback.rs index b717b5a7a..992035339 100644 --- a/openraft/src/storage/callback.rs +++ b/openraft/src/storage/callback.rs @@ -6,7 +6,7 @@ use tokio::sync::oneshot; use crate::async_runtime::MpscUnboundedSender; use crate::async_runtime::MpscUnboundedWeakSender; -use crate::core::notify::Notify; +use crate::core::notification::Notification; use crate::type_config::alias::MpscUnboundedWeakSenderOf; use crate::ErrorSubject; use crate::ErrorVerb; @@ -25,16 +25,19 @@ pub struct IOFlushed where C: RaftTypeConfig { /// The notify to send when the IO complete. - notify: Notify, + notification: Notification, - tx: MpscUnboundedWeakSenderOf>, + tx: MpscUnboundedWeakSenderOf>, } impl IOFlushed where C: RaftTypeConfig { - pub(crate) fn new(notify: Notify, tx: MpscUnboundedWeakSenderOf>) -> Self { - Self { notify, tx } + pub(crate) fn new(notify: Notification, tx: MpscUnboundedWeakSenderOf>) -> Self { + Self { + notification: notify, + tx, + } } #[deprecated(since = "0.10.0", note = "Use `io_completed` instead")] @@ -57,15 +60,15 @@ where C: RaftTypeConfig "{}: IOFlushed error: {}, while flushing IO: {}", func_name!(), e, - self.notify + self.notification ); let sto_err = self.make_storage_error(e); - tx.send(Notify::StorageError { error: sto_err }) + tx.send(Notification::StorageError { error: sto_err }) } Ok(_) => { - tracing::debug!("{}: IOFlushed completed: {}", func_name!(), self.notify); - tx.send(self.notify) + tracing::debug!("{}: IOFlushed completed: {}", func_name!(), self.notification); + tx.send(self.notification) } }; @@ -76,19 +79,19 @@ where C: RaftTypeConfig /// Figure out the error subject and verb from the kind of response `Notify`. fn make_storage_error(&self, e: io::Error) -> StorageError { - match &self.notify { - Notify::VoteResponse { .. } => StorageError::from_io_error(ErrorSubject::Vote, ErrorVerb::Write, e), - Notify::LocalIO { io_id } => { + match &self.notification { + Notification::VoteResponse { .. } => StorageError::from_io_error(ErrorSubject::Vote, ErrorVerb::Write, e), + Notification::LocalIO { io_id } => { let subject = io_id.subject(); let verb = io_id.verb(); StorageError::from_io_error(subject, verb, e) } - Notify::HigherVote { .. } - | Notify::StorageError { .. } - | Notify::Network { .. } - | Notify::StateMachine { .. } - | Notify::Tick { .. } => { - unreachable!("Unexpected notification: {}", self.notify) + Notification::HigherVote { .. } + | Notification::StorageError { .. } + | Notification::Network { .. } + | Notification::StateMachine { .. } + | Notification::Tick { .. } => { + unreachable!("Unexpected notification: {}", self.notification) } } } diff --git a/openraft/src/storage/v2/raft_log_storage_ext.rs b/openraft/src/storage/v2/raft_log_storage_ext.rs index f61760958..a8ac991ef 100644 --- a/openraft/src/storage/v2/raft_log_storage_ext.rs +++ b/openraft/src/storage/v2/raft_log_storage_ext.rs @@ -2,7 +2,7 @@ use openraft_macros::add_async_trait; use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; -use crate::core::notify::Notify; +use crate::core::notification::Notification; use crate::log_id::RaftLogId; use crate::raft_state::io_state::io_id::IOId; use crate::storage::IOFlushed; @@ -35,13 +35,13 @@ where C: RaftTypeConfig let (tx, mut rx) = C::mpsc_unbounded(); let io_id = IOId::::new_append_log(Vote::::default().into_committed(), last_log_id); - let notify = Notify::LocalIO { io_id }; + let notify = Notification::LocalIO { io_id }; let callback = IOFlushed::::new(notify, tx.downgrade()); self.append(entries, callback).await?; let got = rx.recv().await.unwrap(); - if let Notify::StorageError { error } = got { + if let Notification::StorageError { error } = got { return Err(error); } diff --git a/openraft/src/testing/suite.rs b/openraft/src/testing/suite.rs index e04ac9b7b..74bf5a290 100644 --- a/openraft/src/testing/suite.rs +++ b/openraft/src/testing/suite.rs @@ -9,7 +9,7 @@ use maplit::btreeset; use crate::async_runtime::MpscUnboundedReceiver; use crate::async_runtime::MpscUnboundedSender; -use crate::core::notify::Notify; +use crate::core::notification::Notification; use crate::entry::RaftEntry; use crate::log_id::RaftLogId; use crate::membership::EffectiveMembership; @@ -1348,12 +1348,12 @@ where // Dummy log io id for blocking append let io_id = IOId::::new_append_log(Vote::::default().into_committed(), last_log_id); - let notify = Notify::LocalIO { io_id }; + let notify = Notification::LocalIO { io_id }; let cb = IOFlushed::new(notify, tx.downgrade()); store.append(entries, cb).await?; let got = rx.recv().await.unwrap(); - if let Notify::StorageError { error } = got { + if let Notification::StorageError { error } = got { return Err(error); } Ok(())