Skip to content

Commit

Permalink
Fix: Load mebmership since last_applied, not `last_membership.index…
Browse files Browse the repository at this point in the history
…` on startup

Modify `StorageHelper::last_membership_in_log()` to scan the log
starting from the last applied index rather than the index of the last
applied membership log. This change reduces unnecessary I/O operations
during startup, previously caused by scanning from an incorrect starting
point.
  • Loading branch information
drmingdrmer committed Apr 25, 2024
1 parent be0e6fa commit 849b2d0
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 7 deletions.
9 changes: 5 additions & 4 deletions openraft/src/storage/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,11 +189,9 @@ where
///
/// Thus a raft node will only need to store at most two recent membership logs.
pub async fn get_membership(&mut self) -> Result<MembershipState<C>, StorageError<C::NodeId>> {
let (_, sm_mem) = self.state_machine.applied_state().await?;
let (last_applied, sm_mem) = self.state_machine.applied_state().await?;

let sm_mem_next_index = sm_mem.log_id().next_index();

let log_mem = self.last_membership_in_log(sm_mem_next_index).await?;
let log_mem = self.last_membership_in_log(last_applied.next_index()).await?;
tracing::debug!(membership_in_sm=?sm_mem, membership_in_log=?log_mem, "{}", func_name!());

// There 2 membership configs in logs.
Expand Down Expand Up @@ -231,6 +229,9 @@ where
let st = self.log_store.get_log_state().await?;

let mut end = st.last_log_id.next_index();

tracing::info!("load membership from log: [{}..{})", since_index, end);

let start = std::cmp::max(st.last_purged_log_id.next_index(), since_index);
let step = 64;

Expand Down
61 changes: 58 additions & 3 deletions openraft/src/testing/suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ where
run_fut(run_test(builder, Self::last_membership_in_log_multi_step))?;
run_fut(run_test(builder, Self::get_membership_initial))?;
run_fut(run_test(builder, Self::get_membership_from_log_and_empty_sm))?;
run_fut(run_test(builder, Self::get_membership_from_log_and_sm))?;
run_fut(run_test(builder, Self::get_membership_from_empty_log_and_sm))?;
run_fut(run_test(builder, Self::get_membership_from_log_le_sm_last_applied))?;
run_fut(run_test(builder, Self::get_membership_from_log_gt_sm_last_applied_1))?;
run_fut(run_test(builder, Self::get_membership_from_log_gt_sm_last_applied_2))?;
run_fut(run_test(builder, Self::get_initial_state_without_init))?;
run_fut(run_test(builder, Self::get_initial_state_membership_from_log_and_sm))?;
run_fut(run_test(builder, Self::get_initial_state_with_state))?;
Expand Down Expand Up @@ -260,7 +263,10 @@ where
Ok(())
}

pub async fn get_membership_from_log_and_sm(mut store: LS, mut sm: SM) -> Result<(), StorageError<C::NodeId>> {
pub async fn get_membership_from_empty_log_and_sm(
mut store: LS,
mut sm: SM,
) -> Result<(), StorageError<C::NodeId>> {
tracing::info!("--- no log, read membership from state machine");
{
apply(&mut sm, [
Expand All @@ -280,10 +286,31 @@ where
mem_state.effective().membership(),
);
}
Ok(())
}

pub async fn get_membership_from_log_le_sm_last_applied(
mut store: LS,
mut sm: SM,
) -> Result<(), StorageError<C::NodeId>> {
tracing::info!("--- membership presents in log, but smaller than last_applied, read from state machine");
{
append(&mut store, [membership_ent_0::<C>(1, 1, btreeset! {1,2,3})]).await?;
apply(&mut sm, [
blank_ent_0::<C>(1, 1),
membership_ent_0::<C>(1, 2, btreeset! {3,4,5}),
blank_ent_0::<C>(1, 3),
blank_ent_0::<C>(1, 4),
])
.await?;

// Intentionally append a membership entry that does not match the state machine,
// in order to see which membership is loaded.
append(&mut store, [
blank_ent_0::<C>(1, 1),
blank_ent_0::<C>(1, 2),
membership_ent_0::<C>(1, 3, btreeset! {1,2,3}),
])
.await?;

let mem_state = StorageHelper::new(&mut store, &mut sm).get_membership().await?;

Expand All @@ -296,10 +323,23 @@ where
mem_state.effective().membership(),
);
}
Ok(())
}

pub async fn get_membership_from_log_gt_sm_last_applied_1(
mut store: LS,
mut sm: SM,
) -> Result<(), StorageError<C::NodeId>> {
tracing::info!("--- membership presents in log and > sm.last_applied, read from log");
{
apply(&mut sm, [
blank_ent_0::<C>(1, 1),
membership_ent_0::<C>(1, 2, btreeset! {3,4,5}),
])
.await?;

append(&mut store, [
membership_ent_0::<C>(1, 1, btreeset! {1,2,3}),
blank_ent_0::<C>(1, 2),
membership_ent_0::<C>(1, 3, btreeset! {7,8,9}),
])
Expand All @@ -316,10 +356,25 @@ where
mem_state.effective().membership(),
);
}
Ok(())
}

pub async fn get_membership_from_log_gt_sm_last_applied_2(
mut store: LS,
mut sm: SM,
) -> Result<(), StorageError<C::NodeId>> {
tracing::info!("--- two membership present in log and > sm.last_applied, read 2 from log");
{
apply(&mut sm, [
blank_ent_0::<C>(1, 1),
membership_ent_0::<C>(1, 2, btreeset! {3,4,5}),
])
.await?;

append(&mut store, [
membership_ent_0::<C>(1, 1, btreeset! {1,2,3}),
blank_ent_0::<C>(1, 2),
membership_ent_0::<C>(1, 3, btreeset! {7,8,9}),
blank_ent_0::<C>(1, 4),
membership_ent_0::<C>(1, 5, btreeset! {10,11}),
])
Expand Down

0 comments on commit 849b2d0

Please sign in to comment.