Skip to content

Commit

Permalink
Refactor: linearizable-read use noop_log_id as the first log proposed…
Browse files Browse the repository at this point in the history
… by current leader
  • Loading branch information
drmingdrmer committed Mar 24, 2024
1 parent 16d4f6f commit 519ffa6
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 72 deletions.
17 changes: 11 additions & 6 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,16 @@ where
// Setup sentinel values to track when we've received majority confirmation of leadership.

let resp = {
let read_log_id = self.engine.state.get_read_log_id().copied();
let l = self.engine.leader_handler();
let lh = match l {
Ok(leading_handler) => leading_handler,
Err(forward) => {
let _ = tx.send(Err(forward.into()));
return;
}
};

let read_log_id = lh.get_read_log_id();

// TODO: this applied is a little stale when being returned to client.
// Fix this when the following heartbeats are replaced with calling RaftNetwork.
Expand Down Expand Up @@ -1133,11 +1142,7 @@ where
self.engine.handle_install_full_snapshot(vote, snapshot, tx);
}
RaftMsg::CheckIsLeaderRequest { tx } => {
if self.engine.state.is_leader(&self.engine.config.id) {
self.handle_check_is_leader_request(tx).await;
} else {
self.reject_with_forward_to_leader(tx);
}
self.handle_check_is_leader_request(tx).await;
}
RaftMsg::ClientWriteRequest { app_data, tx } => {
self.write_entry(C::Entry::from_app_data(app_data), Some(tx));
Expand Down
60 changes: 60 additions & 0 deletions openraft/src/engine/handler/leader_handler/get_read_log_id_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
use std::sync::Arc;

use maplit::btreeset;
#[allow(unused_imports)] use pretty_assertions::assert_eq;
#[allow(unused_imports)] use pretty_assertions::assert_ne;
#[allow(unused_imports)] use pretty_assertions::assert_str_eq;

use crate::engine::testing::UTConfig;
use crate::engine::Engine;
use crate::testing::log_id;
use crate::utime::UTime;
use crate::EffectiveMembership;
use crate::Membership;
use crate::MembershipState;
use crate::TokioInstant;
use crate::Vote;

fn m01() -> Membership<UTConfig> {
Membership::<UTConfig>::new(vec![btreeset! {0,1}], None)
}

fn m23() -> Membership<UTConfig> {
Membership::<UTConfig>::new(vec![btreeset! {2,3}], btreeset! {1,2,3})
}

fn eng() -> Engine<UTConfig> {
let mut eng = Engine::testing_default(0);
eng.state.enable_validation(false); // Disable validation for incomplete state

eng.config.id = 1;
eng.state.committed = Some(log_id(0, 1, 0));
eng.state.vote = UTime::new(TokioInstant::now(), Vote::new_committed(3, 1));
eng.state.log_ids.append(log_id(1, 1, 1));
eng.state.log_ids.append(log_id(2, 1, 3));
eng.state.membership_state = MembershipState::new(
Arc::new(EffectiveMembership::new(Some(log_id(1, 1, 1)), m01())),
Arc::new(EffectiveMembership::new(Some(log_id(2, 1, 3)), m23())),
);
eng.state.server_state = eng.calc_server_state();

eng
}

#[test]
fn test_get_read_log_id() -> anyhow::Result<()> {
let mut eng = eng();
eng.vote_handler().become_leading();

eng.state.committed = Some(log_id(0, 1, 0));
eng.internal_server_state.leading_mut().unwrap().noop_log_id = Some(log_id(1, 1, 2));

let got = eng.leader_handler()?.get_read_log_id();
assert_eq!(Some(log_id(1, 1, 2)), got);

eng.state.committed = Some(log_id(2, 1, 3));
let got = eng.leader_handler()?.get_read_log_id();
assert_eq!(Some(log_id(2, 1, 3)), got);

Ok(())
}
13 changes: 12 additions & 1 deletion openraft/src/engine/handler/leader_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#[allow(unused_imports)] use crate::docs;
use crate::engine::handler::replication_handler::ReplicationHandler;
use crate::engine::handler::replication_handler::SendNone;
use crate::engine::Command;
Expand All @@ -7,11 +6,14 @@ use crate::engine::EngineOutput;
use crate::entry::RaftPayload;
use crate::internal_server_state::LeaderQuorumSet;
use crate::leader::Leading;
use crate::raft_state::LogStateReader;
use crate::type_config::alias::LogIdOf;
use crate::RaftLogId;
use crate::RaftState;
use crate::RaftTypeConfig;

#[cfg(test)] mod append_entries_test;
#[cfg(test)] mod get_read_log_id_test;
#[cfg(test)] mod send_heartbeat_test;

/// Handle leader operations.
Expand Down Expand Up @@ -81,6 +83,15 @@ where C: RaftTypeConfig
rh.initiate_replication(SendNone::True);
}

/// Get the log id for a linearizable read.
///
/// See: [Read Operation](crate::docs::protocol::read)
pub(crate) fn get_read_log_id(&self) -> Option<LogIdOf<C>> {
let committed = self.state.committed().copied();
// noop log id is the first log this leader proposed.
std::cmp::max(self.leader.noop_log_id, committed)
}

pub(crate) fn replication_handler(&mut self) -> ReplicationHandler<C> {
ReplicationHandler {
config: self.config,
Expand Down
1 change: 1 addition & 0 deletions openraft/src/engine/log_id_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,7 @@ impl<NID: NodeId> LogIdList<NID> {
///
/// Note that the 0-th log does not belong to any leader(but a membership log to initialize a
/// cluster) but this method does not differentiate between them.
#[allow(dead_code)]
pub(crate) fn by_last_leader(&self) -> &[LogId<NID>] {
let ks = &self.key_log_ids;
let l = ks.len();
Expand Down
25 changes: 0 additions & 25 deletions openraft/src/raft_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ mod tests {
mod accepted_test;
mod forward_to_leader_test;
mod log_state_reader_test;
mod read_log_id_test;
mod validate_test;
}

Expand All @@ -39,7 +38,6 @@ pub(crate) use log_state_reader::LogStateReader;
pub use membership_state::MembershipState;
pub(crate) use vote_state_reader::VoteStateReader;

use crate::display_ext::DisplayOptionExt;
pub(crate) use crate::raft_state::snapshot_streaming::StreamingState;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::LogIdOf;
Expand Down Expand Up @@ -203,29 +201,6 @@ where C: RaftTypeConfig
self.vote.utime()
}

/// Get the log id for a linearizable read.
///
/// See: [Read Operation](crate::docs::protocol::read)
pub(crate) fn get_read_log_id(&self) -> Option<&LogId<C::NodeId>> {
// Get the first known log id appended by the last leader.
// - This log may not be committed.
// - The leader blank log may have been purged and this could be the last purged log id.
// - There must be such an entry, which is guaranteed by `Engine::establish_leader()`.
let leader_first = self.log_ids.by_last_leader().first();

debug_assert_eq!(
leader_first.map(|log_id| *log_id.committed_leader_id()),
self.vote_ref().committed_leader_id(),
"leader_first must belong to a leader of current vote: leader_first: {}, vote.committed_leader_id: {}",
leader_first.map(|log_id| log_id.committed_leader_id()).display(),
self.vote_ref().committed_leader_id().display(),
);

let committed = self.committed();

std::cmp::max(leader_first, committed)
}

/// Return the accepted last log id of the current leader.
pub(crate) fn accepted(&self) -> Option<&LogId<C::NodeId>> {
self.accepted.last_accepted_log_id(self.vote_ref().leader_id())
Expand Down
40 changes: 0 additions & 40 deletions openraft/src/raft_state/tests/read_log_id_test.rs

This file was deleted.

0 comments on commit 519ffa6

Please sign in to comment.