diff --git a/openraft/src/engine/engine_impl.rs b/openraft/src/engine/engine_impl.rs index 21d532c21..1a43a2cb4 100644 --- a/openraft/src/engine/engine_impl.rs +++ b/openraft/src/engine/engine_impl.rs @@ -18,7 +18,6 @@ use crate::engine::handler::server_state_handler::ServerStateHandler; use crate::engine::handler::snapshot_handler::SnapshotHandler; use crate::engine::handler::vote_handler::VoteHandler; use crate::engine::Command; -use crate::engine::Condition; use crate::engine::EngineOutput; use crate::engine::Respond; use crate::entry::RaftPayload; @@ -462,17 +461,16 @@ where C: RaftTypeConfig }; let mut fh = self.following_handler(); - fh.install_full_snapshot(snapshot); + + // The condition to satisfy before running other command that depends on the snapshot. + // In this case, the response can only be sent when the snapshot is installed. + let cond = fh.install_full_snapshot(snapshot); let res = Ok(SnapshotResponse { vote: *self.state.vote_ref(), }); self.output.push_command(Command::Respond { - // When there is an error, there may still be queued IO, we need to run them before sending back - // response. - when: Some(Condition::StateMachineCommand { - command_seq: self.output.last_sm_seq(), - }), + when: cond, resp: Respond::new(res, tx), }); } diff --git a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs index 3cfcbafce..9409497fc 100644 --- a/openraft/src/engine/handler/following_handler/install_snapshot_test.rs +++ b/openraft/src/engine/handler/following_handler/install_snapshot_test.rs @@ -7,6 +7,7 @@ use pretty_assertions::assert_eq; use crate::core::sm; use crate::engine::testing::UTConfig; use crate::engine::Command; +use crate::engine::Condition; use crate::engine::Engine; use crate::engine::LogIdList; use crate::raft_state::LogStateReader; @@ -56,7 +57,7 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> { // `snapshot_meta.last_log_id`. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(2, 1, 2)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -65,6 +66,8 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(None, cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(2, 1, 2)), @@ -86,7 +89,7 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> { // Although in this case the state machine is not affected. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(4, 1, 5)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -95,6 +98,8 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(None, cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(2, 1, 2)), @@ -113,7 +118,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> { // Snapshot will be installed and there are no conflicting logs. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(4, 1, 6)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -122,6 +127,8 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(4, 1, 6)), @@ -187,7 +194,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { eng }; - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(5, 1, 6)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -196,6 +203,8 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(5, 1, 6)), @@ -238,7 +247,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> { // Snapshot will be installed and there are no conflicting logs. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(100, 1, 100)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -247,6 +256,8 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond); + assert_eq!( SnapshotMeta { last_log_id: Some(log_id(100, 1, 100)), @@ -293,7 +304,7 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> { // Snapshot will be installed and `accepted` should be updated. let mut eng = eng(); - eng.following_handler().install_full_snapshot(Snapshot { + let cond = eng.following_handler().install_full_snapshot(Snapshot { meta: SnapshotMeta { last_log_id: Some(log_id(100, 1, 100)), last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), @@ -302,6 +313,8 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> { snapshot: Box::new(Cursor::new(vec![0u8])), }); + assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond); + assert_eq!(Some(&log_id(100, 1, 100)), eng.state.accepted()); Ok(()) diff --git a/openraft/src/engine/handler/following_handler/mod.rs b/openraft/src/engine/handler/following_handler/mod.rs index c134461c3..8231cefdc 100644 --- a/openraft/src/engine/handler/following_handler/mod.rs +++ b/openraft/src/engine/handler/following_handler/mod.rs @@ -8,6 +8,7 @@ use crate::engine::handler::log_handler::LogHandler; use crate::engine::handler::server_state_handler::ServerStateHandler; use crate::engine::handler::snapshot_handler::SnapshotHandler; use crate::engine::Command; +use crate::engine::Condition; use crate::engine::EngineConfig; use crate::engine::EngineOutput; use crate::entry::RaftPayload; @@ -237,12 +238,26 @@ where C: RaftTypeConfig self.server_state_handler().update_server_state_if_changed(); } - /// Follower/Learner handles install-full-snapshot. + /// Installs a full snapshot on a follower or learner node. /// /// Refer to [`snapshot_replication`](crate::docs::protocol::replication::snapshot_replication) /// for the reason the following workflow is needed. + /// + /// The method processes the given `snapshot` and updates the internal state of the node based + /// on the snapshot's metadata. It checks if the `snapshot` is newer than the currently + /// committed snapshot. If not, it does nothing. + /// + /// It returns the condition about when the snapshot is installed and can proceed the commands + /// that depends on the state of snapshot. + /// + /// It returns an `Option>` indicating the next action: + /// - `Some(Condition::StateMachineCommand { command_seq })` if the snapshot will be installed. + /// Further commands that depend on snapshot state should use this condition so that these + /// command block until the condition is satisfied(`RaftCore` receives a `Notify`). + /// - Otherwise `None` if the snapshot will not be installed (e.g., if it is not newer than the + /// current state). #[tracing::instrument(level = "debug", skip_all)] - pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot) { + pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot) -> Option> { let meta = &snapshot.meta; tracing::info!("install_full_snapshot: meta:{:?}", meta); @@ -254,7 +269,7 @@ where C: RaftTypeConfig snap_last_log_id.display(), self.state.committed().display() ); - return; + return None; } // snapshot_last_log_id can not be None @@ -267,7 +282,7 @@ where C: RaftTypeConfig let mut snap_handler = self.snapshot_handler(); let updated = snap_handler.update_snapshot(meta.clone()); if !updated { - return; + return None; } let local = self.state.get_log_id(snap_last_log_id.index); @@ -285,9 +300,14 @@ where C: RaftTypeConfig )); self.output.push_command(Command::from(sm::Command::install_full_snapshot(snapshot))); + let last_sm_seq = self.output.last_sm_seq(); self.state.purge_upto = Some(snap_last_log_id); self.log_handler().purge_log(); + + Some(Condition::StateMachineCommand { + command_seq: last_sm_seq, + }) } /// Find the last 2 membership entries in a list of entries. diff --git a/openraft/src/engine/mod.rs b/openraft/src/engine/mod.rs index 09722b02f..8e13c481f 100644 --- a/openraft/src/engine/mod.rs +++ b/openraft/src/engine/mod.rs @@ -44,6 +44,7 @@ mod tests { mod handle_vote_req_test; mod handle_vote_resp_test; mod initialize_test; + mod install_full_snapshot_test; mod log_id_list_test; mod startup_test; mod trigger_purge_log_test; diff --git a/openraft/src/engine/tests/install_full_snapshot_test.rs b/openraft/src/engine/tests/install_full_snapshot_test.rs new file mode 100644 index 000000000..bfe5899cd --- /dev/null +++ b/openraft/src/engine/tests/install_full_snapshot_test.rs @@ -0,0 +1,163 @@ +use std::io::Cursor; + +use maplit::btreeset; +use pretty_assertions::assert_eq; + +use crate::core::sm; +use crate::engine::testing::UTConfig; +use crate::engine::Command; +use crate::engine::Condition; +use crate::engine::Engine; +use crate::engine::LogIdList; +use crate::engine::Respond; +use crate::raft::SnapshotResponse; +use crate::testing::log_id; +use crate::type_config::alias::AsyncRuntimeOf; +use crate::AsyncRuntime; +use crate::Membership; +use crate::Snapshot; +use crate::SnapshotMeta; +use crate::StoredMembership; +use crate::TokioInstant; +use crate::Vote; + +fn m12() -> Membership { + Membership::::new(vec![btreeset! {1,2}], None) +} + +fn m1234() -> Membership { + Membership::::new(vec![btreeset! {1,2,3,4}], None) +} + +fn eng() -> Engine { + let mut eng = Engine::testing_default(0); + eng.state.enable_validation(false); // Disable validation for incomplete state + + eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1)); + eng.state.committed = Some(log_id(4, 1, 5)); + eng.state.log_ids = LogIdList::new(vec![ + // + log_id(2, 1, 2), + log_id(3, 1, 5), + log_id(4, 1, 6), + log_id(4, 1, 8), + ]); + eng.state.snapshot_meta = SnapshotMeta { + last_log_id: Some(log_id(2, 1, 2)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m12()), + snapshot_id: "1-2-3-4".to_string(), + }; + eng.state.server_state = eng.calc_server_state(); + + eng +} + +#[test] +fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> { + // Snapshot will not be installed because new `last_log_id` is less or equal current + // `snapshot_meta.last_log_id`. + // + // It should respond at once. + + let mut eng = eng(); + + let curr_vote = *eng.state.vote_ref(); + + let (tx, _rx) = AsyncRuntimeOf::::oneshot(); + + eng.handle_install_full_snapshot( + curr_vote, + Snapshot { + meta: SnapshotMeta { + last_log_id: Some(log_id(1, 1, 2)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), + snapshot_id: "1-2-3-4".to_string(), + }, + snapshot: Box::new(Cursor::new(vec![0u8])), + }, + tx, + ); + + assert_eq!( + SnapshotMeta { + last_log_id: Some(log_id(2, 1, 2)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m12()), + snapshot_id: "1-2-3-4".to_string(), + }, + eng.state.snapshot_meta + ); + + let (dummy_tx, _rx) = AsyncRuntimeOf::::oneshot(); + assert_eq!( + vec![ + // + Command::Respond { + when: None, + resp: Respond::new(Ok(SnapshotResponse::new(curr_vote)), dummy_tx), + }, + ], + eng.output.take_commands() + ); + + Ok(()) +} + +#[test] +fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> { + // Snapshot will be installed and there are no conflicting logs. + // The response should be sent after the snapshot is installed. + + let mut eng = eng(); + + let curr_vote = *eng.state.vote_ref(); + + let (tx, _rx) = AsyncRuntimeOf::::oneshot(); + + eng.handle_install_full_snapshot( + curr_vote, + Snapshot { + meta: SnapshotMeta { + last_log_id: Some(log_id(4, 1, 6)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), + snapshot_id: "1-2-3-4".to_string(), + }, + snapshot: Box::new(Cursor::new(vec![0u8])), + }, + tx, + ); + + assert_eq!( + SnapshotMeta { + last_log_id: Some(log_id(4, 1, 6)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), + snapshot_id: "1-2-3-4".to_string(), + }, + eng.state.snapshot_meta + ); + + let (dummy_tx, _rx) = AsyncRuntimeOf::::oneshot(); + assert_eq!( + vec![ + // + Command::from( + sm::Command::install_full_snapshot(Snapshot { + meta: SnapshotMeta { + last_log_id: Some(log_id(4, 1, 6)), + last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()), + snapshot_id: "1-2-3-4".to_string(), + }, + snapshot: Box::new(Cursor::new(vec![0u8])), + }) + .with_seq(1) + ), + Command::PurgeLog { upto: log_id(4, 1, 6) }, + Command::Respond { + when: Some(Condition::StateMachineCommand { command_seq: 1 }), + resp: Respond::new(Ok(SnapshotResponse::new(curr_vote)), dummy_tx), + }, + ], + eng.output.take_commands() + ); + + Ok(()) +}