Skip to content

Commit

Permalink
Refactor: Monotonic increasing LogIOId
Browse files Browse the repository at this point in the history
`LogIOId` is utilized to uniquely identify an append-entries IO
globally. It plays a crucial role in the log IO callback, informing
`RaftCore` about the latest log that has been successfully flushed to
disk.

In this commit, `LogIOId` is defined as a tuple comprising `leader_id`,
which submits the log IO, and `last_log_id`, which represents the final
log ID in the append-entries IO operation.

Given that the `leader_id` is monotonically increasing and a leader
consistently submits log entries in a sequential order, this design
ensures that `LogIOId` is also monotonically increasing. This feature
allows for effective tracking of IO progress.
  • Loading branch information
drmingdrmer committed May 10, 2024
1 parent 94600eb commit cc59bcf
Show file tree
Hide file tree
Showing 21 changed files with 158 additions and 41 deletions.
15 changes: 10 additions & 5 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ use crate::raft::AppendEntriesRequest;
use crate::raft::AppendEntriesResponse;
use crate::raft::ClientWriteResponse;
use crate::raft::VoteRequest;
use crate::raft_state::LogIOId;
use crate::raft_state::LogStateReader;
use crate::replication;
use crate::replication::request::Replicate;
Expand Down Expand Up @@ -709,6 +710,7 @@ where
pub(crate) async fn append_to_log<I>(
&mut self,
entries: I,
vote: Vote<C::NodeId>,
last_log_id: LogId<C::NodeId>,
) -> Result<(), StorageError<C::NodeId>>
where
Expand All @@ -718,7 +720,10 @@ where
tracing::debug!("append_to_log");

let (tx, rx) = C::AsyncRuntime::oneshot();
let callback = LogFlushed::new(Some(last_log_id), tx);
let log_io_id = LogIOId::new(vote, Some(last_log_id));

let callback = LogFlushed::new(log_io_id, tx);

self.log_store.append(entries, callback).await?;
rx.await
.map_err(|e| StorageIOError::write_logs(AnyError::error(e)))?
Expand Down Expand Up @@ -1591,23 +1596,23 @@ where
Command::QuitLeader => {
self.leader_data = None;
}
Command::AppendEntry { entry } => {
Command::AppendEntry { vote, entry } => {
let log_id = *entry.get_log_id();
tracing::debug!("AppendEntry: {}", &entry);

self.append_to_log([entry], log_id).await?;
self.append_to_log([entry], vote, log_id).await?;

// The leader may have changed.
// But reporting to a different leader is not a problem.
if let Ok(mut lh) = self.engine.leader_handler() {
lh.replication_handler().update_local_progress(Some(log_id));
}
}
Command::AppendInputEntries { entries } => {
Command::AppendInputEntries { vote, entries } => {
let last_log_id = *entries.last().unwrap().get_log_id();
tracing::debug!("AppendInputEntries: {}", DisplaySlice::<_>(&entries),);

self.append_to_log(entries, last_log_id).await?;
self.append_to_log(entries, vote, last_log_id).await?;

// The leader may have changed.
// But reporting to a different leader is not a problem.
Expand Down
26 changes: 22 additions & 4 deletions openraft/src/engine/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,28 @@ where C: RaftTypeConfig
QuitLeader,

/// Append one entry.
AppendEntry { entry: C::Entry },
AppendEntry {
/// Same as the `vote` in [`Command::AppendInputEntries`].
vote: Vote<C::NodeId>,

entry: C::Entry,
},

/// Append a `range` of entries.
AppendInputEntries { entries: Vec<C::Entry> },
AppendInputEntries {
/// The vote of the leader that submits the entries to write.
///
/// The leader could be a local leader that appends entries to the local log store,
/// or a remote leader that replicates entries to this follower.
///
/// The leader id is used to generate a monotonic increasing IO id, such as: [`LogIOId`].
/// Where [`LogIOId`] is `(leader_id, log_id)`.
///
/// [`LogIOId`]: crate::raft_state::io_state::log_io_id::LogIOId
vote: Vote<C::NodeId>,

entries: Vec<C::Entry>,
},

/// Replicate the committed log id to other nodes
ReplicateCommitted { committed: Option<LogId<C::NodeId>> },
Expand Down Expand Up @@ -121,8 +139,8 @@ where
match (self, other) {
(Command::BecomeLeader, Command::BecomeLeader) => true,
(Command::QuitLeader, Command::QuitLeader) => true,
(Command::AppendEntry { entry }, Command::AppendEntry { entry: b }, ) => entry == b,
(Command::AppendInputEntries { entries }, Command::AppendInputEntries { entries: b }, ) => entries == b,
(Command::AppendEntry { vote, entry }, Command::AppendEntry { vote: vb, entry: b }, ) => vote == vb && entry == b,
(Command::AppendInputEntries { vote, entries }, Command::AppendInputEntries { vote: vb, entries: b }, ) => vote == vb && entries == b,
(Command::ReplicateCommitted { committed }, Command::ReplicateCommitted { committed: b }, ) => committed == b,
(Command::Commit { seq, already_committed, upto, }, Command::Commit { seq: b_seq, already_committed: b_committed, upto: b_upto, }, ) => seq == b_seq && already_committed == b_committed && upto == b_upto,
(Command::Replicate { target, req }, Command::Replicate { target: b_target, req: other_req, }, ) => target == b_target && req == other_req,
Expand Down
6 changes: 5 additions & 1 deletion openraft/src/engine/engine_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,11 @@ where C: RaftTypeConfig
let em = EffectiveMembership::new_arc(Some(log_id), m.clone());
self.state.membership_state.append(em);

self.output.push_command(Command::AppendEntry { entry });
self.output.push_command(Command::AppendEntry {
// When initialize, it behaves as a learner.
vote: *self.state.vote_ref(),
entry,
});

self.server_state_handler().update_server_state_if_changed();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ use crate::entry::RaftEntry;
use crate::raft_state::LogStateReader;
use crate::testing::blank_ent;
use crate::testing::log_id;
use crate::utime::UTime;
use crate::EffectiveMembership;
use crate::Entry;
use crate::EntryPayload;
use crate::Membership;
use crate::MembershipState;
use crate::Vote;

fn m01() -> Membership<UTConfig> {
Membership::new(vec![btreeset! {0,1}], None)
Expand Down Expand Up @@ -78,6 +80,7 @@ fn test_follower_do_append_entries_empty() -> anyhow::Result<()> {
#[test]
fn test_follower_do_append_entries_no_membership_entries() -> anyhow::Result<()> {
let mut eng = eng();
eng.state.vote = UTime::without_utime(Vote::new(1, 1));

eng.following_handler().do_append_entries(
vec![
Expand Down Expand Up @@ -108,6 +111,7 @@ fn test_follower_do_append_entries_no_membership_entries() -> anyhow::Result<()>
vec![
//
Command::AppendInputEntries {
vote: Vote::new(1, 1),
entries: vec![blank_ent(3, 1, 4)]
},
],
Expand All @@ -124,6 +128,7 @@ fn test_follower_do_append_entries_one_membership_entry() -> anyhow::Result<()>
// - Follower become Learner, since it is not in the new effective membership.
let mut eng = eng();
eng.config.id = 2; // make it a member, the become learner
eng.state.vote = UTime::without_utime(Vote::new(1, 1));

eng.following_handler().do_append_entries(
vec![
Expand Down Expand Up @@ -164,6 +169,7 @@ fn test_follower_do_append_entries_one_membership_entry() -> anyhow::Result<()>
);
assert_eq!(
vec![Command::AppendInputEntries {
vote: Vote::new(1, 1),
entries: vec![
//
blank_ent(3, 1, 4),
Expand All @@ -187,6 +193,7 @@ fn test_follower_do_append_entries_three_membership_entries() -> anyhow::Result<
let mut eng = eng();
eng.config.id = 5; // make it a learner, then become follower
eng.state.server_state = eng.calc_server_state();
eng.state.vote = UTime::without_utime(Vote::new(1, 1));

eng.following_handler().do_append_entries(
vec![
Expand Down Expand Up @@ -225,6 +232,7 @@ fn test_follower_do_append_entries_three_membership_entries() -> anyhow::Result<
);
assert_eq!(
vec![Command::AppendInputEntries {
vote: Vote::new(1, 1),
entries: vec![
blank_ent(3, 1, 4),
Entry::<UTConfig>::new_membership(log_id(3, 1, 5), m01()),
Expand Down
9 changes: 8 additions & 1 deletion openraft/src/engine/handler/following_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,14 @@ where C: RaftTypeConfig
self.state.extend_log_ids(&entries);
self.append_membership(entries.iter());

self.output.push_command(Command::AppendInputEntries { entries });
// TODO: with asynchronous IO in future,
// do not write log until vote being committed,
// or consistency is broken.
self.output.push_command(Command::AppendInputEntries {
// A follower should always use the node's vote.
vote: *self.state.vote_ref(),
entries,
});
}

/// Commit entries that are already committed by the leader.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ fn test_leader_append_entries_normal() -> anyhow::Result<()> {
assert_eq!(
vec![
Command::AppendInputEntries {
vote: Vote::new_committed(3, 1),
entries: vec![
blank_ent(3, 1, 4), //
blank_ent(3, 1, 5),
Expand Down Expand Up @@ -180,6 +181,7 @@ fn test_leader_append_entries_single_node_leader() -> anyhow::Result<()> {

assert_eq!(
vec![Command::AppendInputEntries {
vote: Vote::new_committed(3, 1),
entries: vec![
blank_ent(3, 1, 4), //
blank_ent(3, 1, 5),
Expand Down Expand Up @@ -234,6 +236,7 @@ fn test_leader_append_entries_with_membership_log() -> anyhow::Result<()> {
assert_eq!(
vec![
Command::AppendInputEntries {
vote: Vote::new_committed(3, 1),
entries: vec![
blank_ent(3, 1, 4), //
Entry::new_membership(log_id(3, 1, 5), m1_2()),
Expand Down
10 changes: 9 additions & 1 deletion openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,15 @@ where C: RaftTypeConfig
}
}

self.output.push_command(Command::AppendInputEntries { entries });
// TODO: with asynchronous IO in future,
// do not write log until vote being committed,
// or consistency is broken.
self.output.push_command(Command::AppendInputEntries {
// A leader should always use the leader's vote.
// It is allowed to be different from local vote.
vote: self.leader.vote,
entries,
});

let mut rh = self.replication_handler();

Expand Down
10 changes: 9 additions & 1 deletion openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,15 @@ where C: RaftTypeConfig
);
self.state.log_ids.append(log_id);
let entry = C::Entry::new_blank(log_id);
self.output.push_command(Command::AppendEntry { entry });

// TODO: with asynchronous IO in future,
// do not write log until vote being committed,
// or consistency is broken.
self.output.push_command(Command::AppendEntry {
// A leader should always use the leader's vote.
vote: self.leader.vote,
entry,
});

self.update_local_progress(Some(log_id));
}
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/tests/append_entries_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ fn test_append_entries_prev_log_id_is_committed() -> anyhow::Result<()> {
},
Command::DeleteConflictLog { since: log_id(1, 1, 2) },
Command::AppendInputEntries {
vote: Vote::new_committed(2, 1),
entries: vec![blank_ent(2, 1, 2)]
},
],
Expand Down Expand Up @@ -292,6 +293,7 @@ fn test_append_entries_conflict() -> anyhow::Result<()> {
},
Command::DeleteConflictLog { since: log_id(2, 1, 3) },
Command::AppendInputEntries {
vote: Vote::new_committed(2, 1),
entries: vec![Entry::new_membership(log_id(3, 1, 3), m34())]
},
],
Expand Down
2 changes: 2 additions & 0 deletions openraft/src/engine/tests/elect_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ fn test_elect() -> anyhow::Result<()> {
Command::BecomeLeader,
Command::RebuildReplicationStreams { targets: vec![] },
Command::AppendEntry {
vote: Vote::new_committed(1, 1),
entry: Entry::<UTConfig>::new_blank(log_id(1, 1, 1))
},
Command::ReplicateCommitted {
Expand Down Expand Up @@ -127,6 +128,7 @@ fn test_elect() -> anyhow::Result<()> {
Command::BecomeLeader,
Command::RebuildReplicationStreams { targets: vec![] },
Command::AppendEntry {
vote: Vote::new_committed(2, 1),
entry: Entry::<UTConfig>::new_blank(log_id(2, 1, 1))
},
Command::ReplicateCommitted {
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/tests/handle_vote_resp_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ fn test_handle_vote_resp() -> anyhow::Result<()> {
targets: vec![(2, ProgressEntry::empty(1))]
},
Command::AppendEntry {
vote: Vote::new_committed(2, 1),
entry: Entry::<UTConfig>::new_blank(log_id(2, 1, 1)),
},
Command::Replicate {
Expand Down
3 changes: 3 additions & 0 deletions openraft/src/engine/tests/initialize_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ fn test_initialize_single_node() -> anyhow::Result<()> {
assert_eq!(
vec![
Command::AppendEntry {
vote: Vote::default(),
entry: Entry::<UTConfig>::new_membership(LogId::default(), m1())
},
// When update the effective membership, the engine set it to Follower.
Expand All @@ -71,6 +72,7 @@ fn test_initialize_single_node() -> anyhow::Result<()> {
Command::BecomeLeader,
Command::RebuildReplicationStreams { targets: vec![] },
Command::AppendEntry {
vote: Vote::new_committed(1, 1),
entry: Entry::<UTConfig>::new_blank(log_id(1, 1, 1))
},
Command::ReplicateCommitted {
Expand Down Expand Up @@ -130,6 +132,7 @@ fn test_initialize() -> anyhow::Result<()> {
assert_eq!(
vec![
Command::AppendEntry {
vote: Vote::default(),
entry: Entry::new_membership(LogId::default(), m12())
},
// When update the effective membership, the engine set it to Follower.
Expand Down
11 changes: 3 additions & 8 deletions openraft/src/raft_state/io_state.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,11 @@
use log_io_id::LogIOId;

use crate::display_ext::DisplayOption;
use crate::LeaderId;
use crate::LogId;
use crate::NodeId;
use crate::Vote;

#[derive(Debug, Clone, Copy)]
#[derive(Default)]
#[derive(PartialEq, Eq)]
pub(crate) struct LogIOId<NID: NodeId> {
pub(crate) leader_id: LeaderId<NID>,
pub(crate) log_id: Option<LogId<NID>>,
}
pub(crate) mod log_io_id;

/// IOState tracks the state of actually happened io including log flushed, applying log to state
/// machine or snapshot building.
Expand Down
40 changes: 40 additions & 0 deletions openraft/src/raft_state/io_state/log_io_id.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use std::fmt;

use crate::display_ext::DisplayOptionExt;
use crate::LeaderId;
use crate::LogId;
use crate::NodeId;

/// A monotonic increasing id for log io operation.
///
/// The leader could be a local leader that appends entries to the local log store,
/// or a remote leader that replicates entries to this follower.
///
/// It is monotonic increasing because:
/// - Leader id increase monotonically in the entire cluster.
/// - Leader propose or replicate log entries in order.
#[derive(Debug, Clone, Copy)]
#[derive(Default)]
#[derive(PartialEq, Eq)]
pub(crate) struct LogIOId<NID: NodeId> {
/// The id of the leader that performs the log io operation.
pub(crate) leader_id: LeaderId<NID>,

/// The last log id that has been flushed to storage.
pub(crate) log_id: Option<LogId<NID>>,
}

impl<NID: NodeId> fmt::Display for LogIOId<NID> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "by_leader({}):{}", self.leader_id, self.log_id.display())
}
}

impl<NID: NodeId> LogIOId<NID> {
pub(crate) fn new(leader_id: impl Into<LeaderId<NID>>, log_id: Option<LogId<NID>>) -> Self {
Self {
leader_id: leader_id.into(),
log_id,
}
}
}
2 changes: 1 addition & 1 deletion openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ mod membership_state;
pub(crate) mod snapshot_streaming;
mod vote_state_reader;

#[allow(unused)] pub(crate) use io_state::log_io_id::LogIOId;
pub(crate) use io_state::IOState;
#[allow(unused)] pub(crate) use io_state::LogIOId;

#[cfg(test)]
mod tests {
Expand Down
Loading

0 comments on commit cc59bcf

Please sign in to comment.