Skip to content

Commit

Permalink
Add timeout for snapshot_sync_ctx (#561)
Browse files Browse the repository at this point in the history
* Add timeout for snapshot_sync_ctx

* [Update PR] Indent and comments

---------

Co-authored-by: Jung-Sang Ahn <[email protected]>
  • Loading branch information
yuwmao and greensky00 authored Jan 16, 2025
1 parent 50e2f94 commit bf5e149
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 7 deletions.
20 changes: 20 additions & 0 deletions include/libnuraft/raft_params.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ struct raft_params {
, log_sync_stop_gap_(99999)
, snapshot_distance_(0)
, snapshot_block_size_(0)
, snapshot_sync_ctx_timeout_(0)
, enable_randomized_snapshot_creation_(false)
, max_append_size_(100)
, reserved_log_items_(100000)
Expand Down Expand Up @@ -221,6 +222,17 @@ struct raft_params {
return *this;
}

/**
* Timeout for syncing the snapshot requests.
*
* @param timeout_ms
* @return self
*/
raft_params& with_snapshot_sync_ctx_timeout(int32 timeout_ms) {
snapshot_sync_ctx_timeout_ = timeout_ms;
return *this;
}

/**
* The number of reserved log items when doing log compaction.
*
Expand Down Expand Up @@ -405,6 +417,14 @@ public:
*/
int32 snapshot_block_size_;

/**
* Timeout(ms) for snapshot_sync_ctx, if a single snapshot syncing request
* exceeds this, it will be considered as timeout and ctx will be released.
* 0 means it will be set to the default value
* `heart_beat_interval_ * response_limit_`.
*/
int32 snapshot_sync_ctx_timeout_;

/**
* Enable randomized snapshot creation which will avoid
* simultaneous snapshot creation among cluster members.
Expand Down
7 changes: 7 additions & 0 deletions include/libnuraft/raft_server.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,13 @@ public:
*/
std::string get_user_ctx() const;

/**
* Get timeout for snapshot_sync_ctx
*
* @return snapshot_sync_ctx_timeout.
*/
int32 get_snapshot_sync_ctx_timeout() const;

/**
* Get ID of this server.
*
Expand Down
13 changes: 10 additions & 3 deletions src/handle_join_leave.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -91,9 +91,16 @@ ptr<resp_msg> raft_server::handle_add_srv_req(req_msg& req) {
srv_to_join_->get_id(),
last_active_ms);

if ( last_active_ms <=
(ulong)raft_server::raft_limits_.response_limit_ *
ctx_->get_params()->heart_beat_interval_ ) {
// NOTE:
// If snapshot transmission was in progress, we will follow the
// snapshot timeout. Otherwise, we will follow the response timeout.
ulong sync_timeout = (ulong)raft_limits_.response_limit_ *
ctx_->get_params()->heart_beat_interval_;
if (srv_to_join_->get_snapshot_sync_ctx()) {
sync_timeout = (ulong)get_snapshot_sync_ctx_timeout();
}

if (last_active_ms <= sync_timeout) {
resp->set_result_code(cmd_result_code::SERVER_IS_JOINING);
return resp;
}
Expand Down
5 changes: 1 addition & 4 deletions src/handle_snapshot_sync.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ ptr<req_msg> raft_server::create_sync_snapshot_req(ptr<peer>& pp,
destroy_user_snp_ctx(sync_ctx);
}

// Timeout: heartbeat * response limit.
ulong snp_timeout_ms = ctx_->get_params()->heart_beat_interval_ *
raft_server::raft_limits_.response_limit_;
p.set_snapshot_in_sync(snp, snp_timeout_ms);
p.set_snapshot_in_sync(snp, ulong(get_snapshot_sync_ctx_timeout()));
}

if (params->use_bg_thread_for_snapshot_io_) {
Expand Down
7 changes: 7 additions & 0 deletions src/raft_server.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,13 @@ std::string raft_server::get_user_ctx() const {
return c_conf->get_user_ctx();
}

int32 raft_server::get_snapshot_sync_ctx_timeout() const {
if (ctx_->get_params()->snapshot_sync_ctx_timeout_ == 0) {
return raft_limits_.response_limit_ * ctx_->get_params()->heart_beat_interval_;
}
return ctx_->get_params()->snapshot_sync_ctx_timeout_;
}

int32 raft_server::get_dc_id(int32 srv_id) const {
ptr<cluster_config> c_conf = get_config();
ptr<srv_config> s_conf = c_conf->get_server(srv_id);
Expand Down

0 comments on commit bf5e149

Please sign in to comment.