Skip to content

Commit

Permalink
Refactor: Initiate replication per event loop
Browse files Browse the repository at this point in the history
When a replication response is handled, RaftCore should initiate next
replication if there are more logs to to replicate.

Before this commit this is done in response-handler.
In this commit it is moved to `RaftCore::runtime_loop`:
replications are re-initiated when all of the events are processed.

This way the replication response handler is simplified.
  • Loading branch information
drmingdrmer committed Jul 9, 2024
1 parent f1792dc commit 460d158
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 17 deletions.
7 changes: 7 additions & 0 deletions openraft/src/core/raft_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use crate::display_ext::DisplayInstantExt;
use crate::display_ext::DisplayOption;
use crate::display_ext::DisplayOptionExt;
use crate::display_ext::DisplaySlice;
use crate::engine::handler::replication_handler::SendNone;
use crate::engine::Command;
use crate::engine::Condition;
use crate::engine::Engine;
Expand Down Expand Up @@ -922,6 +923,12 @@ where
balancer.increase_raft_msg();
}
}

// Keep replicating to a target if the replication stream to it is idle.
if let Ok(mut lh) = self.engine.leader_handler() {
lh.replication_handler().initiate_replication(SendNone::False);
}
self.run_engine_commands().await?;
}
}

Expand Down
17 changes: 0 additions & 17 deletions openraft/src/engine/handler/replication_handler/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::ops::Deref;

use crate::display_ext::DisplayOptionExt;
use crate::engine::handler::log_handler::LogHandler;
use crate::engine::handler::snapshot_handler::SnapshotHandler;
Expand Down Expand Up @@ -308,21 +306,6 @@ where C: RaftTypeConfig
// The purge job may be postponed because a replication task is using them.
// Thus we just try again to purge when progress is updated.
self.try_purge_log();

// initialize next replication to this target

{
let p = self.leader.progress.get_mut(&target).unwrap();

let r = p.next_send(self.state.deref(), self.config.max_payload_entries);
tracing::debug!(next_send_res = debug(&r), "next_send");

if let Ok(inflight) = r {
Self::send_to_target(self.output, &target, inflight);
} else {
tracing::debug!("nothing to send to target={target}, progress:{}", p);
}
}
}

/// Update replication streams to reflect replication progress change.
Expand Down

0 comments on commit 460d158

Please sign in to comment.