From 5c83e1670269c61baf230da71c6c947512100748 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Sun, 5 May 2024 15:38:53 +0800 Subject: [PATCH] Fix: Immediate response when snapshot installation is unnecessary When `Engine::handle_install_full_snapshot()` is called and the provided snapshot is not up-to-date, the snapshot should not be installed, and the response should be sent back immediately. Previously, the method might delay the response unnecessarily, waiting for an installation process that would not proceed. This commit adjusts the logic so that if the snapshot is recognized as outdated, it immediately returns a `None` `Condition`, ensuring the caller is informed straightaway that no installation will occur. --- openraft/src/engine/engine_impl.rs | 12 +- .../install_snapshot_test.rs | 25 ++- .../engine/handler/following_handler/mod.rs | 28 ++- openraft/src/engine/mod.rs | 1 + .../tests/install_full_snapshot_test.rs | 163 ++++++++++++++++++ 5 files changed, 212 insertions(+), 17 deletions(-) create mode 100644 openraft/src/engine/tests/install_full_snapshot_test.rs diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 21d532c21..1a43a2cb4 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -18,7 +18,6 @@ use crate::engine::handler::server_state_handler::ServerStateHandler; use crate::engine::handler::snapshot_handler::SnapshotHandler; use crate::engine::handler::vote_handler::VoteHandler; use crate::engine::Command; -use crate::engine::Condition; use crate::engine::EngineOutput; use crate::engine::Respond; use crate::entry::RaftPayload; @@ -462,17 +461,16 @@ where C: RaftTypeConfig }; let mut fh = self.following_handler(); - fh.install_full_snapshot(snapshot); + + // The condition to satisfy before running other command that depends on the snapshot. + // In this case, the response can only be sent when the snapshot is installed. + let cond = fh.install_full_snapshot(snapshot); let res = Ok(SnapshotResponse { vote: *self.state.vote_ref(), }); self.output.push_command(Command::Respond { - // When there is an error, there may still be queued IO, we need to run them before sending back - // response. - when: Some(Condition::StateMachineCommand { - command_seq: self.output.last_sm_seq(), - }), + when: cond, resp: Respond::new(res, tx), }); } diff --git a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs index 3cfcbafce..9409497fc 100644 --- a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs +++ b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs @@ -7,6 +7,7 @@ use pretty_assertions::assert_eq; use crate::core::sm; use crate::engine::testing::UTConfig; use crate::engine::Command; +use crate::engine::Condition; use crate::engine::Engine; use crate::engine::LogIdList; use crate::raft_state::LogStateReader; @@ -56,7 +57,7 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> { // `snapshot_meta.last_log_id`. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(2, 1, 2)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -65,6 +66,8 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(None, cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(2, 1, 2)), @@ -86,7 +89,7 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> { // Although in this case the state machine is not affected. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(4, 1, 5)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -95,6 +98,8 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(None, cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(2, 1, 2)), @@ -113,7 +118,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> { // Snapshot will be installed and there are no conflicting logs. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(4, 1, 6)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -122,6 +127,8 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(4, 1, 6)), @@ -187,7 +194,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { eng }; - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(5, 1, 6)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -196,6 +203,8 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(5, 1, 6)), @@ -238,7 +247,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> { // Snapshot will be installed and there are no conflicting logs. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(100, 1, 100)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -247,6 +256,8 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(100, 1, 100)), @@ -293,7 +304,7 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> { // Snapshot will be installed and `accepted` should be updated. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(100, 1, 100)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -302,6 +313,8 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond); + assert_eq!(Some(&log_id(100, 1, 100)), eng.state.accepted()); Ok(()) diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index c134461c3..8231cefdc 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -8,6 +8,7 @@ use crate::engine::handler::log_handler::LogHandler; use crate::engine::handler::server_state_handler::ServerStateHandler; use crate::engine::handler::snapshot_handler::SnapshotHandler; use crate::engine::Command; +use crate::engine::Condition; use crate::engine::EngineConfig; use crate::engine::EngineOutput; use crate::entry::RaftPayload; @@ -237,12 +238,26 @@ where C: RaftTypeConfig self.server_state_handler().update_server_state_if_changed(); } - /// Follower/Learner handles install-full-snapshot. + /// Installs a full snapshot on a follower or learner node. /// /// Refer to [`snapshot_replication`](crate::docs::protocol::replication::snapshot_replication) /// for the reason the following workflow is needed. + /// + /// The method processes the given `snapshot` and updates the internal state of the node based + /// on the snapshot's metadata. It checks if the `snapshot` is newer than the currently + /// committed snapshot. If not, it does nothing. + /// + /// It returns the condition about when the snapshot is installed and can proceed the commands + /// that depends on the state of snapshot. + /// + /// It returns an `Option>` indicating the next action: + /// - `Some(Condition::StateMachineCommand { command_seq })` if the snapshot will be installed. + /// Further commands that depend on snapshot state should use this condition so that these + /// command block until the condition is satisfied(`RaftCore` receives a `Notify`). + /// - Otherwise `None` if the snapshot will not be installed (e.g., if it is not newer than the + /// current state). #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot) { + pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot) -> Option> { let meta = &snapshot.meta; tracing::info!("install_full_snapshot: meta:{:?}", meta); @@ -254,7 +269,7 @@ where C: RaftTypeConfig snap_last_log_id.display(), self.state.committed().display() ); - return; + return None; } // snapshot_last_log_id can not be None @@ -267,7 +282,7 @@ where C: RaftTypeConfig let mut snap_handler = self.snapshot_handler(); let updated = snap_handler.update_snapshot(meta.clone()); if !updated { - return; + return None; } let local = self.state.get_log_id(snap_last_log_id.index); @@ -285,9 +300,14 @@ where C: RaftTypeConfig )); self.output.push_command(Command::from(sm::Command::install_full_snapshot(snapshot))); + let last_sm_seq = self.output.last_sm_seq(); self.state.purge_upto = Some(snap_last_log_id); self.log_handler().purge_log(); + + Some(Condition::StateMachineCommand { + command_seq: last_sm_seq, + }) } /// Find the last 2 membership entries in a list of entries. diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index 09722b02f..8e13c481f 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -44,6 +44,7 @@ mod tests { mod handle_vote_req_test; mod handle_vote_resp_test; mod initialize_test; + mod install_full_snapshot_test; mod log_id_list_test; mod startup_test; mod trigger_purge_log_test; diff --git a/openraft/src/engine/tests/install_full_snapshot_test.rs b/openraft/src/engine/tests/install_full_snapshot_test.rs new file mode 100644 index 000000000..bfe5899cd --- /dev/null +++ b/openraft/src/engine/tests/install_full_snapshot_test.rs @@ -0,0 +1,163 @@ +use std::io::Cursor; + +use maplit::btreeset; +use pretty_assertions::assert_eq; + +use crate::core::sm; +use crate::engine::testing::UTConfig; +use crate::engine::Command; +use crate::engine::Condition; +use crate::engine::Engine; +use crate::engine::LogIdList; +use crate::engine::Respond; +use crate::raft::SnapshotResponse; +use crate::testing::log_id; +use crate::type_config::alias::AsyncRuntimeOf; +use crate::AsyncRuntime; +use crate::Membership; +use crate::Snapshot; +use crate::SnapshotMeta; +use crate::StoredMembership; +use crate::TokioInstant; +use crate::Vote; + +fn m12() -> Membership { + Membership::::new(vec![btreeset! {1,2}], None) +} + +fn m1234() -> Membership { + Membership::::new(vec![btreeset! {1,2,3,4}], None) +} + +fn eng() -> Engine { + let mut eng = Engine::testing_default(0); + eng.state.enable_validation(false); // Disable validation for incomplete state + + eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1)); + eng.state.committed = Some(log_id(4, 1, 5)); + eng.state.log_ids = LogIdList::new(vec![ + // + log_id(2, 1, 2), + log_id(3, 1, 5), + log_id(4, 1, 6), + log_id(4, 1, 8), + ]); + eng.state.snapshot_meta = SnapshotMeta { + last_log_id: Some(log_id(2, 1, 2)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m12()), + snapshot_id: "1-2-3-4".to_string(), + }; + eng.state.server_state = eng.calc_server_state(); + + eng +} + +#[test] +fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> { + // Snapshot will not be installed because new `last_log_id` is less or equal current + // `snapshot_meta.last_log_id`. + // + // It should respond at once. + + let mut eng = eng(); + + let curr_vote = *eng.state.vote_ref(); + + let (tx, _rx) = AsyncRuntimeOf::::oneshot(); + + eng.handle_install_full_snapshot( + curr_vote, + Snapshot { + meta: SnapshotMeta { + last_log_id: Some(log_id(1, 1, 2)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), + snapshot_id: "1-2-3-4".to_string(), + }, + snapshot: Box::new(Cursor::new(vec![0u8])), + }, + tx, + ); + + assert_eq!( + SnapshotMeta { + last_log_id: Some(log_id(2, 1, 2)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m12()), + snapshot_id: "1-2-3-4".to_string(), + }, + eng.state.snapshot_meta + ); + + let (dummy_tx, _rx) = AsyncRuntimeOf::::oneshot(); + assert_eq!( + vec![ + // + Command::Respond { + when: None, + resp: Respond::new(Ok(SnapshotResponse::new(curr_vote)), dummy_tx), + }, + ], + eng.output.take_commands() + ); + + Ok(()) +} + +#[test] +fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> { + // Snapshot will be installed and there are no conflicting logs. + // The response should be sent after the snapshot is installed. + + let mut eng = eng(); + + let curr_vote = *eng.state.vote_ref(); + + let (tx, _rx) = AsyncRuntimeOf::::oneshot(); + + eng.handle_install_full_snapshot( + curr_vote, + Snapshot { + meta: SnapshotMeta { + last_log_id: Some(log_id(4, 1, 6)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), + snapshot_id: "1-2-3-4".to_string(), + }, + snapshot: Box::new(Cursor::new(vec![0u8])), + }, + tx, + ); + + assert_eq!( + SnapshotMeta { + last_log_id: Some(log_id(4, 1, 6)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), + snapshot_id: "1-2-3-4".to_string(), + }, + eng.state.snapshot_meta + ); + + let (dummy_tx, _rx) = AsyncRuntimeOf::::oneshot(); + assert_eq!( + vec![ + // + Command::from( + sm::Command::install_full_snapshot(Snapshot { + meta: SnapshotMeta { + last_log_id: Some(log_id(4, 1, 6)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), + snapshot_id: "1-2-3-4".to_string(), + }, + snapshot: Box::new(Cursor::new(vec![0u8])), + }) + .with_seq(1) + ), + Command::PurgeLog { upto: log_id(4, 1, 6) }, + Command::Respond { + when: Some(Condition::StateMachineCommand { command_seq: 1 }), + resp: Respond::new(Ok(SnapshotResponse::new(curr_vote)), dummy_tx), + }, + ], + eng.output.take_commands() + ); + + Ok(()) +}