Skip to content

Commit

Permalink
handle msgsnapshot before msgappend
Browse files Browse the repository at this point in the history
Signed-off-by: Calvin Neo <[email protected]>
  • Loading branch information
CalvinNeo committed Aug 12, 2024
1 parent 4ebe44d commit 51d55c1
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 17 deletions.
102 changes: 86 additions & 16 deletions proxy_components/engine_store_ffi/src/core/fast_add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,33 +78,91 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
)
}

// Returns whether we should ignore the MsgSnapshot.
fn snapshot_filter(&self, msg: &RaftMessage) -> bool {
let inner_msg = msg.get_message();
let region_id = msg.get_region_id();
let new_peer_id = msg.get_to_peer().get_id();
let mut should_skip = false;
let f = |info: MapEntry<u64, Arc<CachedRegionInfo>>| {
match info {
MapEntry::Occupied(mut o) => {
// If the peer is bootstrapped, we will accept the MsgSnapshot.
if o.get().inited_or_fallback.load(Ordering::SeqCst) {
return;
}
let has_already_inited = self.is_initialized(region_id);
if has_already_inited {
o.get_mut().inited_or_fallback.store(true, Ordering::SeqCst);
}
if o.get().fast_add_peer_start.load(Ordering::SeqCst) != 0 {
if o.get().snapshot_inflight.load(Ordering::SeqCst) == 0 {
// If the FAP snapshot is building, skip this MsgSnapshot.
// We will wait until the FAP is succeed or fallbacked.
info!("fast path: ongoing {}:{} {}, MsgSnapshot rejected",
self.store_id, region_id, new_peer_id;
"to_peer_id" => msg.get_to_peer().get_id(),
"from_peer_id" => msg.get_from_peer().get_id(),
"region_id" => region_id,
"inner_msg" => Self::format_msg(inner_msg),
"has_already_inited" => has_already_inited,
"inited_or_fallback" => o.get().inited_or_fallback.load(Ordering::SeqCst),
"snapshot_inflight" => o.get().snapshot_inflight.load(Ordering::SeqCst),
"fast_add_peer_start" => o.get().fast_add_peer_start.load(Ordering::SeqCst),
);
should_skip = true;
return;
}
// Otherwise, this snapshot could be either FAP
// snapshot, or normal snapshot.
// In each case, we should handle them.
}
}
MapEntry::Vacant(_) => {}
}
};

match self.get_cached_manager().get_inited_or_fallback(region_id) {
Some(true) => {
// Most cases, when the peer is already inited.
}
None | Some(false) => self
.get_cached_manager()
.access_cached_region_info_mut(region_id, f)
.unwrap(),
};

should_skip
}

// Returns whether we need to ignore this message and run fast path instead.
pub fn maybe_fast_path_tick(&self, msg: &RaftMessage) -> bool {
if !self.packed_envs.engine_store_cfg.enable_fast_add_peer {
// fast path not enabled
return false;
}
let inner_msg = msg.get_message();
let region_id = msg.get_region_id();
let new_peer_id = msg.get_to_peer().get_id();
if inner_msg.get_commit() == 0 && inner_msg.get_msg_type() == MessageType::MsgHeartbeat {
return false;
} else if inner_msg.get_msg_type() == MessageType::MsgAppend {
// Go on to following logic to see if we should filter.
} else if inner_msg.get_msg_type() == MessageType::MsgSnapshot {
return self.snapshot_filter(msg);
} else {
// We only handles the first MsgAppend.
return false;
}
// We don't need to recover all region infomation from restart,
// We don't need to recover all region information from restart,
// since we have `has_already_inited`.
let inner_msg = msg.get_message();
if inner_msg.get_msg_type() != MessageType::MsgAppend {
// we only handles the first MsgAppend
return false;
}
let region_id = msg.get_region_id();
let new_peer_id = msg.get_to_peer().get_id();

let cached_manager = self.get_cached_manager();
let mut is_first = false;
let mut is_replicated = false;
let mut has_already_inited = None;
let mut early_skip = false;

let f = |info: MapEntry<u64, Arc<CachedRegionInfo>>| {
let current = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand Down Expand Up @@ -406,11 +464,15 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
self.store_id, region_id, new_peer_id, s;
"region_id" => region_id,
);
// We don't fallback if the fap snapshot is persisted,
// Because it has been sent, or has not been sent.
// So we can't decide whether to use fallback to clean the previous
// snapshot. Any later error will cause fap snapshot
// mismatch.
// We call fallback here even if the fap is persisted and sent.
// Because the sent snapshot is only to be handled if (idnex, term) matches,
// even if there is another normal snapshot. Because both snapshots are
// idendical. TODO However, we can retry FAP for
// several times before we fail. However,
// the cases here is rare. We have only observed several raft logs missing
// problem.
let cached_manager = self.get_cached_manager();
cached_manager.fallback_to_slow_path(region_id);
return false;
}
};
Expand All @@ -421,6 +483,8 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
self.store_id, region_id, new_peer_id, e;
"region_id" => region_id,
);
let cached_manager = self.get_cached_manager();
cached_manager.fallback_to_slow_path(region_id);
return false;
}
};
Expand Down Expand Up @@ -484,16 +548,22 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
// Find term of entry at applied_index.
let applied_index = apply_state.get_applied_index();
let applied_term =
self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index")?;
match self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index")
{
Ok(x) => x,
Err(e) => return Err(e),
};
// Will otherwise cause "got message with lower index than committed" loop.
// Maybe this can be removed, since fb0917bfa44ec1fc55967 can pass if we remove
// this constraint.
self.check_entry_at_index(
if let Err(e) = self.check_entry_at_index(
region_id,
apply_state.get_commit_index(),
new_peer_id,
"commit_index",
)?;
) {
return Err(e);
}

// Get a snapshot object.
let (mut snapshot, key) = {
Expand Down
1 change: 1 addition & 0 deletions proxy_tests/proxy/shared/fast_add_peer/fp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ fn test_fall_back_to_slow_path() {
fail::cfg("fap_core_no_fast_path", "panic").unwrap();

pd_client.must_add_peer(1, new_learner_peer(2, 2));
// FAP will fail for "can't find entry for index 9 of region 1".
check_key(&cluster, b"k2", b"v2", Some(true), None, Some(vec![1, 2]));
must_wait_until_cond_node(
&cluster.cluster_ext,
Expand Down
2 changes: 1 addition & 1 deletion proxy_tests/proxy/shared/replica_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ fn test_raft_cmd_request_learner_advanve_max_ts() {
r.set_end_key(e.to_vec());
read_index_request.mut_ranges().push(r);
}
let mut cmd =
let cmd =
proxy_ffi::read_index_helper::gen_read_index_raft_cmd_req(&mut read_index_request);

let (result_tx, result_rx) = oneshot::channel();
Expand Down

0 comments on commit 51d55c1

Please sign in to comment.