Skip to content

Commit

Permalink
Fix: Immediate response when snapshot installation is unnecessary
Browse files Browse the repository at this point in the history
When `Engine::handle_install_full_snapshot()` is called and the provided
snapshot is not up-to-date, the snapshot should not be installed, and
the response should be sent back immediately. Previously, the method
might delay the response unnecessarily, waiting for an installation
process that would not proceed.

This commit adjusts the logic so that if the snapshot is recognized as
outdated, it immediately returns a `None` `Condition`, ensuring the
caller is informed straightaway that no installation will occur.
  • Loading branch information
drmingdrmer committed May 5, 2024
1 parent 946dc3f commit 5c83e16
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 17 deletions.
12 changes: 5 additions & 7 deletions openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ use crate::engine::handler::server_state_handler::ServerStateHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
use crate::engine::handler::vote_handler::VoteHandler;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineOutput;
use crate::engine::Respond;
use crate::entry::RaftPayload;
Expand Down Expand Up @@ -462,17 +461,16 @@ where C: RaftTypeConfig
};

let mut fh = self.following_handler();
fh.install_full_snapshot(snapshot);

// The condition to satisfy before running other command that depends on the snapshot.
// In this case, the response can only be sent when the snapshot is installed.
let cond = fh.install_full_snapshot(snapshot);
let res = Ok(SnapshotResponse {
vote: *self.state.vote_ref(),
});

self.output.push_command(Command::Respond {
// When there is an error, there may still be queued IO, we need to run them before sending back
// response.
when: Some(Condition::StateMachineCommand {
command_seq: self.output.last_sm_seq(),
}),
when: cond,
resp: Respond::new(res, tx),
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use pretty_assertions::assert_eq;
use crate::core::sm;
use crate::engine::testing::UTConfig;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::Engine;
use crate::engine::LogIdList;
use crate::raft_state::LogStateReader;
Expand Down Expand Up @@ -56,7 +57,7 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
// `snapshot_meta.last_log_id`.
let mut eng = eng();

eng.following_handler().install_full_snapshot(Snapshot {
let cond = eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(2, 1, 2)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -65,6 +66,8 @@ fn test_install_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
snapshot: Box::new(Cursor::new(vec![0u8])),
});

assert_eq!(None, cond);

assert_eq!(
SnapshotMeta {
last_log_id: Some(log_id(2, 1, 2)),
Expand All @@ -86,7 +89,7 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> {
// Although in this case the state machine is not affected.
let mut eng = eng();

eng.following_handler().install_full_snapshot(Snapshot {
let cond = eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(4, 1, 5)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -95,6 +98,8 @@ fn test_install_snapshot_lt_committed() -> anyhow::Result<()> {
snapshot: Box::new(Cursor::new(vec![0u8])),
});

assert_eq!(None, cond);

assert_eq!(
SnapshotMeta {
last_log_id: Some(log_id(2, 1, 2)),
Expand All @@ -113,7 +118,7 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> {
// Snapshot will be installed and there are no conflicting logs.
let mut eng = eng();

eng.following_handler().install_full_snapshot(Snapshot {
let cond = eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(4, 1, 6)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -122,6 +127,8 @@ fn test_install_snapshot_not_conflict() -> anyhow::Result<()> {
snapshot: Box::new(Cursor::new(vec![0u8])),
});

assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond);

assert_eq!(
SnapshotMeta {
last_log_id: Some(log_id(4, 1, 6)),
Expand Down Expand Up @@ -187,7 +194,7 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
eng
};

eng.following_handler().install_full_snapshot(Snapshot {
let cond = eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(5, 1, 6)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -196,6 +203,8 @@ fn test_install_snapshot_conflict() -> anyhow::Result<()> {
snapshot: Box::new(Cursor::new(vec![0u8])),
});

assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond);

assert_eq!(
SnapshotMeta {
last_log_id: Some(log_id(5, 1, 6)),
Expand Down Expand Up @@ -238,7 +247,7 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> {
// Snapshot will be installed and there are no conflicting logs.
let mut eng = eng();

eng.following_handler().install_full_snapshot(Snapshot {
let cond = eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(100, 1, 100)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -247,6 +256,8 @@ fn test_install_snapshot_advance_last_log_id() -> anyhow::Result<()> {
snapshot: Box::new(Cursor::new(vec![0u8])),
});

assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond);

assert_eq!(
SnapshotMeta {
last_log_id: Some(log_id(100, 1, 100)),
Expand Down Expand Up @@ -293,7 +304,7 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> {
// Snapshot will be installed and `accepted` should be updated.
let mut eng = eng();

eng.following_handler().install_full_snapshot(Snapshot {
let cond = eng.following_handler().install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(100, 1, 100)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
Expand All @@ -302,6 +313,8 @@ fn test_install_snapshot_update_accepted() -> anyhow::Result<()> {
snapshot: Box::new(Cursor::new(vec![0u8])),
});

assert_eq!(Some(Condition::StateMachineCommand { command_seq: 1 }), cond);

assert_eq!(Some(&log_id(100, 1, 100)), eng.state.accepted());

Ok(())
Expand Down
28 changes: 24 additions & 4 deletions openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::engine::handler::log_handler::LogHandler;
use crate::engine::handler::server_state_handler::ServerStateHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::EngineConfig;
use crate::engine::EngineOutput;
use crate::entry::RaftPayload;
Expand Down Expand Up @@ -237,12 +238,26 @@ where C: RaftTypeConfig
self.server_state_handler().update_server_state_if_changed();
}

/// Follower/Learner handles install-full-snapshot.
/// Installs a full snapshot on a follower or learner node.
///
/// Refer to [`snapshot_replication`](crate::docs::protocol::replication::snapshot_replication)
/// for the reason the following workflow is needed.
///
/// The method processes the given `snapshot` and updates the internal state of the node based
/// on the snapshot's metadata. It checks if the `snapshot` is newer than the currently
/// committed snapshot. If not, it does nothing.
///
/// It returns the condition about when the snapshot is installed and can proceed the commands
/// that depends on the state of snapshot.
///
/// It returns an `Option<Condition<C>>` indicating the next action:
/// - `Some(Condition::StateMachineCommand { command_seq })` if the snapshot will be installed.
/// Further commands that depend on snapshot state should use this condition so that these
/// command block until the condition is satisfied(`RaftCore` receives a `Notify`).
/// - Otherwise `None` if the snapshot will not be installed (e.g., if it is not newer than the
/// current state).
#[tracing::instrument(level = "debug", skip_all)]
pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot<C>) {
pub(crate) fn install_full_snapshot(&mut self, snapshot: Snapshot<C>) -> Option<Condition<C::NodeId>> {
let meta = &snapshot.meta;
tracing::info!("install_full_snapshot: meta:{:?}", meta);

Expand All @@ -254,7 +269,7 @@ where C: RaftTypeConfig
snap_last_log_id.display(),
self.state.committed().display()
);
return;
return None;
}

// snapshot_last_log_id can not be None
Expand All @@ -267,7 +282,7 @@ where C: RaftTypeConfig
let mut snap_handler = self.snapshot_handler();
let updated = snap_handler.update_snapshot(meta.clone());
if !updated {
return;
return None;
}

let local = self.state.get_log_id(snap_last_log_id.index);
Expand All @@ -285,9 +300,14 @@ where C: RaftTypeConfig
));

self.output.push_command(Command::from(sm::Command::install_full_snapshot(snapshot)));
let last_sm_seq = self.output.last_sm_seq();

self.state.purge_upto = Some(snap_last_log_id);
self.log_handler().purge_log();

Some(Condition::StateMachineCommand {
command_seq: last_sm_seq,
})
}

/// Find the last 2 membership entries in a list of entries.
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ mod tests {
mod handle_vote_req_test;
mod handle_vote_resp_test;
mod initialize_test;
mod install_full_snapshot_test;
mod log_id_list_test;
mod startup_test;
mod trigger_purge_log_test;
Expand Down
163 changes: 163 additions & 0 deletions openraft/src/engine/tests/install_full_snapshot_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use std::io::Cursor;

use maplit::btreeset;
use pretty_assertions::assert_eq;

use crate::core::sm;
use crate::engine::testing::UTConfig;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::Engine;
use crate::engine::LogIdList;
use crate::engine::Respond;
use crate::raft::SnapshotResponse;
use crate::testing::log_id;
use crate::type_config::alias::AsyncRuntimeOf;
use crate::AsyncRuntime;
use crate::Membership;
use crate::Snapshot;
use crate::SnapshotMeta;
use crate::StoredMembership;
use crate::TokioInstant;
use crate::Vote;

fn m12() -> Membership<UTConfig> {
Membership::<UTConfig>::new(vec![btreeset! {1,2}], None)
}

fn m1234() -> Membership<UTConfig> {
Membership::<UTConfig>::new(vec![btreeset! {1,2,3,4}], None)
}

fn eng() -> Engine<UTConfig> {
let mut eng = Engine::testing_default(0);
eng.state.enable_validation(false); // Disable validation for incomplete state

eng.state.vote.update(TokioInstant::now(), Vote::new_committed(2, 1));
eng.state.committed = Some(log_id(4, 1, 5));
eng.state.log_ids = LogIdList::new(vec![
//
log_id(2, 1, 2),
log_id(3, 1, 5),
log_id(4, 1, 6),
log_id(4, 1, 8),
]);
eng.state.snapshot_meta = SnapshotMeta {
last_log_id: Some(log_id(2, 1, 2)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
};
eng.state.server_state = eng.calc_server_state();

eng
}

#[test]
fn test_handle_install_full_snapshot_lt_last_snapshot() -> anyhow::Result<()> {
// Snapshot will not be installed because new `last_log_id` is less or equal current
// `snapshot_meta.last_log_id`.
//
// It should respond at once.

let mut eng = eng();

let curr_vote = *eng.state.vote_ref();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();

eng.handle_install_full_snapshot(
curr_vote,
Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(1, 1, 2)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
},
snapshot: Box::new(Cursor::new(vec![0u8])),
},
tx,
);

assert_eq!(
SnapshotMeta {
last_log_id: Some(log_id(2, 1, 2)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m12()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.state.snapshot_meta
);

let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
assert_eq!(
vec![
//
Command::Respond {
when: None,
resp: Respond::new(Ok(SnapshotResponse::new(curr_vote)), dummy_tx),
},
],
eng.output.take_commands()
);

Ok(())
}

#[test]
fn test_handle_install_full_snapshot_no_conflict() -> anyhow::Result<()> {
// Snapshot will be installed and there are no conflicting logs.
// The response should be sent after the snapshot is installed.

let mut eng = eng();

let curr_vote = *eng.state.vote_ref();

let (tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();

eng.handle_install_full_snapshot(
curr_vote,
Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(4, 1, 6)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
},
snapshot: Box::new(Cursor::new(vec![0u8])),
},
tx,
);

assert_eq!(
SnapshotMeta {
last_log_id: Some(log_id(4, 1, 6)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
},
eng.state.snapshot_meta
);

let (dummy_tx, _rx) = AsyncRuntimeOf::<UTConfig>::oneshot();
assert_eq!(
vec![
//
Command::from(
sm::Command::install_full_snapshot(Snapshot {
meta: SnapshotMeta {
last_log_id: Some(log_id(4, 1, 6)),
last_membership: StoredMembership::new(Some(log_id(1, 1, 1)), m1234()),
snapshot_id: "1-2-3-4".to_string(),
},
snapshot: Box::new(Cursor::new(vec![0u8])),
})
.with_seq(1)
),
Command::PurgeLog { upto: log_id(4, 1, 6) },
Command::Respond {
when: Some(Condition::StateMachineCommand { command_seq: 1 }),
resp: Respond::new(Ok(SnapshotResponse::new(curr_vote)), dummy_tx),
},
],
eng.output.take_commands()
);

Ok(())
}

0 comments on commit 5c83e16

Please sign in to comment.