diff --git a/include/libnuraft/raft_params.hxx b/include/libnuraft/raft_params.hxx index 4c5a4d9e..86dddb38 100644 --- a/include/libnuraft/raft_params.hxx +++ b/include/libnuraft/raft_params.hxx @@ -74,6 +74,7 @@ struct raft_params { , log_sync_stop_gap_(99999) , snapshot_distance_(0) , snapshot_block_size_(0) + , snapshot_sync_ctx_timeout_(0) , enable_randomized_snapshot_creation_(false) , max_append_size_(100) , reserved_log_items_(100000) @@ -221,6 +222,17 @@ struct raft_params { return *this; } + /** + * Timeout for syncing the snapshot requests. + * + * @param timeout_ms + * @return self + */ + raft_params& with_snapshot_sync_ctx_timeout(int32 timeout_ms) { + snapshot_sync_ctx_timeout_ = timeout_ms; + return *this; + } + /** * The number of reserved log items when doing log compaction. * @@ -405,6 +417,14 @@ public: */ int32 snapshot_block_size_; + /** + * Timeout(ms) for snapshot_sync_ctx, if a single snapshot syncing request + * exceeds this, it will be considered as timeout and ctx will be released. + * 0 means it will be set to the default value + * `heart_beat_interval_ * response_limit_`. + */ + int32 snapshot_sync_ctx_timeout_; + /** * Enable randomized snapshot creation which will avoid * simultaneous snapshot creation among cluster members. diff --git a/include/libnuraft/raft_server.hxx b/include/libnuraft/raft_server.hxx index 65c34af8..b8cbb477 100644 --- a/include/libnuraft/raft_server.hxx +++ b/include/libnuraft/raft_server.hxx @@ -409,6 +409,13 @@ public: */ std::string get_user_ctx() const; + /** + * Get timeout for snapshot_sync_ctx + * + * @return snapshot_sync_ctx_timeout. + */ + int32 get_snapshot_sync_ctx_timeout() const; + /** * Get ID of this server. * diff --git a/src/handle_join_leave.cxx b/src/handle_join_leave.cxx index 30e45bab..0425c56d 100644 --- a/src/handle_join_leave.cxx +++ b/src/handle_join_leave.cxx @@ -91,9 +91,16 @@ ptr raft_server::handle_add_srv_req(req_msg& req) { srv_to_join_->get_id(), last_active_ms); - if ( last_active_ms <= - (ulong)raft_server::raft_limits_.response_limit_ * - ctx_->get_params()->heart_beat_interval_ ) { + // NOTE: + // If snapshot transmission was in progress, we will follow the + // snapshot timeout. Otherwise, we will follow the response timeout. + ulong sync_timeout = (ulong)raft_limits_.response_limit_ * + ctx_->get_params()->heart_beat_interval_; + if (srv_to_join_->get_snapshot_sync_ctx()) { + sync_timeout = (ulong)get_snapshot_sync_ctx_timeout(); + } + + if (last_active_ms <= sync_timeout) { resp->set_result_code(cmd_result_code::SERVER_IS_JOINING); return resp; } diff --git a/src/handle_snapshot_sync.cxx b/src/handle_snapshot_sync.cxx index 87e25fa1..b9e90f49 100644 --- a/src/handle_snapshot_sync.cxx +++ b/src/handle_snapshot_sync.cxx @@ -152,10 +152,7 @@ ptr raft_server::create_sync_snapshot_req(ptr& pp, destroy_user_snp_ctx(sync_ctx); } - // Timeout: heartbeat * response limit. - ulong snp_timeout_ms = ctx_->get_params()->heart_beat_interval_ * - raft_server::raft_limits_.response_limit_; - p.set_snapshot_in_sync(snp, snp_timeout_ms); + p.set_snapshot_in_sync(snp, ulong(get_snapshot_sync_ctx_timeout())); } if (params->use_bg_thread_for_snapshot_io_) { diff --git a/src/raft_server.cxx b/src/raft_server.cxx index ddf70501..82f3d9d5 100644 --- a/src/raft_server.cxx +++ b/src/raft_server.cxx @@ -1670,6 +1670,13 @@ std::string raft_server::get_user_ctx() const { return c_conf->get_user_ctx(); } +int32 raft_server::get_snapshot_sync_ctx_timeout() const { + if (ctx_->get_params()->snapshot_sync_ctx_timeout_ == 0) { + return raft_limits_.response_limit_ * ctx_->get_params()->heart_beat_interval_; + } + return ctx_->get_params()->snapshot_sync_ctx_timeout_; +} + int32 raft_server::get_dc_id(int32 srv_id) const { ptr c_conf = get_config(); ptr s_conf = c_conf->get_server(srv_id);