Skip to content

Commit

Permalink
Refactor: RaftMetrics::heartbeat stores last-acked timestamp instea…
Browse files Browse the repository at this point in the history
…d of duration since last ack

Other changes: Display human readable Instant in log
  • Loading branch information
drmingdrmer committed Jul 25, 2024
1 parent 3b16d75 commit ec42d9a
Show file tree
Hide file tree
Showing 12 changed files with 127 additions and 69 deletions.
11 changes: 1 addition & 10 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,16 +526,7 @@ where
let replication = Some(replication_prog.iter().map(|(id, p)| (*id, *p.borrow())).collect());

let clock_prog = &leader.clock_progress;
let heartbeat = Some(
clock_prog
.iter()
.map(|(id, opt_t)| {
let millis_since_last_ack = opt_t.map(|t| t.elapsed().as_millis() as u64);

(*id, millis_since_last_ack)
})
.collect(),
);
let heartbeat = Some(clock_prog.iter().map(|(id, opt_t)| (*id, opt_t.map(SerdeInstant::new))).collect());

(replication, heartbeat)
} else {
Expand Down
13 changes: 9 additions & 4 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,19 @@ where C: RaftTypeConfig

{
let end = self.state.last_log_id().next_index();
let default_v = ProgressEntry::empty(end);
let default_v = || ProgressEntry::empty(end);

let old_progress = self.leader.progress.clone();

self.leader.progress =
old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), &learner_ids, default_v);
old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), learner_ids.clone(), default_v);
}

{
let old_progress = self.leader.clock_progress.clone();

self.leader.clock_progress =
old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), &learner_ids, None);
old_progress.upgrade_quorum_set(em.membership().to_quorum_set(), learner_ids, || None);
}
}

Expand Down Expand Up @@ -159,7 +159,12 @@ where C: RaftTypeConfig

tracing::debug!(
granted = display(granted.as_ref().map(|x| x.display()).display()),
clock_progress = debug(&self.leader.clock_progress),
clock_progress = display(
&self
.leader
.clock_progress
.display_with(|f, id, v| { write!(f, "{}: {}", id, v.as_ref().map(|x| x.display()).display()) })
),
"granted leader vote clock after updating"
);

Expand Down
3 changes: 2 additions & 1 deletion openraft/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ pub(crate) use wait_condition::Condition;

use crate::type_config::alias::LogIdOf;
use crate::type_config::alias::NodeIdOf;
use crate::type_config::alias::SerdeInstantOf;

pub(crate) type ReplicationMetrics<C> = BTreeMap<NodeIdOf<C>, Option<LogIdOf<C>>>;
/// Heartbeat metrics, a mapping between a node's ID and milliseconds since the
/// last acknowledged heartbeat or replication to this node.
pub(crate) type HeartbeatMetrics<C> = BTreeMap<NodeIdOf<C>, Option<u64>>;
pub(crate) type HeartbeatMetrics<C> = BTreeMap<NodeIdOf<C>, Option<SerdeInstantOf<C>>>;
3 changes: 2 additions & 1 deletion openraft/src/metrics/raft_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::metrics::HeartbeatMetrics;
use crate::metrics::ReplicationMetrics;
use crate::metrics::SerdeInstant;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::SerdeInstantOf;
use crate::Instant;
use crate::LogId;
use crate::RaftTypeConfig;
Expand Down Expand Up @@ -90,7 +91,7 @@ pub struct RaftMetrics<C: RaftTypeConfig> {
/// lost synchronization with the cluster.
/// An older value may suggest a higher probability of the leader being partitioned from the
/// cluster.
pub last_quorum_acked: Option<SerdeInstant<InstantOf<C>>>,
pub last_quorum_acked: Option<SerdeInstantOf<C>>,

/// The current membership config of the cluster.
pub membership_config: Arc<StoredMembership<C>>,
Expand Down
6 changes: 3 additions & 3 deletions openraft/src/metrics/serde_instant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ mod serde_impl {

use super::SerdeInstant;
use crate::engine::testing::UTConfig;
use crate::type_config::alias::InstantOf;
use crate::type_config::alias::SerdeInstantOf;
use crate::type_config::TypeConfigExt;

#[test]
Expand All @@ -158,7 +158,7 @@ mod serde_impl {
println!("json: {}", json);
println!("Now: {:?}", now);

let deserialized: SerdeInstant<InstantOf<UTConfig>> = serde_json::from_str(&json).unwrap();
let deserialized: SerdeInstantOf<UTConfig> = serde_json::from_str(&json).unwrap();
println!("Des: {:?}", *deserialized);

// Convert Instant to SerdeInstant is inaccurate.
Expand All @@ -171,7 +171,7 @@ mod serde_impl {
// Test serialization format

let nano = "1721829051211301916";
let deserialized: SerdeInstant<InstantOf<UTConfig>> = serde_json::from_str(nano).unwrap();
let deserialized: SerdeInstantOf<UTConfig> = serde_json::from_str(nano).unwrap();
let serialized = serde_json::to_string(&deserialized).unwrap();

assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion openraft/src/progress/bench/vec_progress_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use crate::quorum::Joint;
fn progress_update_01234_567(b: &mut Bencher) {
let membership: Vec<Vec<u64>> = vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7]];
let quorum_set = Joint::from(membership);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, 0..=7, 0);
let mut progress = VecProgress::<u64, u64, u64, _>::new(quorum_set, 0..=7, || 0);

let mut id = 0u64;
let mut values = [0, 1, 2, 3, 4, 5, 6, 7];
Expand Down
Loading

0 comments on commit ec42d9a

Please sign in to comment.