Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle MsgSnapshot sent before MsgAppend #389

Merged
merged 5 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 82 additions & 14 deletions proxy_components/engine_store_ffi/src/core/fast_add_peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,33 +78,91 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
)
}

// Returns whether we should ignore the MsgSnapshot.
#[allow(clippy::collapsible_if)]
fn snapshot_filter(&self, msg: &RaftMessage) -> bool {
let inner_msg = msg.get_message();
let region_id = msg.get_region_id();
let new_peer_id = msg.get_to_peer().get_id();
let mut should_skip = false;
let f = |info: MapEntry<u64, Arc<CachedRegionInfo>>| {
match info {
MapEntry::Occupied(mut o) => {
// If the peer is bootstrapped, we will accept the MsgSnapshot.
if o.get().inited_or_fallback.load(Ordering::SeqCst) {
return;
}
let has_already_inited = self.is_initialized(region_id);
if has_already_inited {
o.get_mut().inited_or_fallback.store(true, Ordering::SeqCst);
}
if o.get().fast_add_peer_start.load(Ordering::SeqCst) != 0 {
if o.get().snapshot_inflight.load(Ordering::SeqCst) == 0 {
// If the FAP snapshot is building, skip this MsgSnapshot.
// We will wait until the FAP is succeed or fallbacked.
info!("fast path: ongoing {}:{} {}, MsgSnapshot rejected",
self.store_id, region_id, new_peer_id;
"to_peer_id" => msg.get_to_peer().get_id(),
Comment on lines +103 to +105
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: format

Suggested change
info!("fast path: ongoing {}:{} {}, MsgSnapshot rejected",
self.store_id, region_id, new_peer_id;
"to_peer_id" => msg.get_to_peer().get_id(),
info!("fast path: ongoing {}:{} {}, MsgSnapshot rejected",
self.store_id, region_id, new_peer_id;
"to_peer_id" => msg.get_to_peer().get_id(),

"from_peer_id" => msg.get_from_peer().get_id(),
"region_id" => region_id,
"inner_msg" => Self::format_msg(inner_msg),
"has_already_inited" => has_already_inited,
"inited_or_fallback" => o.get().inited_or_fallback.load(Ordering::SeqCst),
"snapshot_inflight" => o.get().snapshot_inflight.load(Ordering::SeqCst),
"fast_add_peer_start" => o.get().fast_add_peer_start.load(Ordering::SeqCst),
);
should_skip = true;
}
// Otherwise, this snapshot could be either FAP
// snapshot, or normal snapshot.
// In each case, we should handle them.
}
}
MapEntry::Vacant(_) => {}
}
};

match self.get_cached_manager().get_inited_or_fallback(region_id) {
Some(true) => {
// Most cases, when the peer is already inited.
}
None | Some(false) => self
.get_cached_manager()
.access_cached_region_info_mut(region_id, f)
.unwrap(),
};

should_skip
}

// Returns whether we need to ignore this message and run fast path instead.
pub fn maybe_fast_path_tick(&self, msg: &RaftMessage) -> bool {
if !self.packed_envs.engine_store_cfg.enable_fast_add_peer {
// fast path not enabled
return false;
}
let inner_msg = msg.get_message();
let region_id = msg.get_region_id();
let new_peer_id = msg.get_to_peer().get_id();
if inner_msg.get_commit() == 0 && inner_msg.get_msg_type() == MessageType::MsgHeartbeat {
return false;
} else if inner_msg.get_msg_type() == MessageType::MsgAppend {
// Go on to following logic to see if we should filter.
} else if inner_msg.get_msg_type() == MessageType::MsgSnapshot {
return self.snapshot_filter(msg);
} else {
// We only handles the first MsgAppend.
return false;
}
// We don't need to recover all region infomation from restart,
// We don't need to recover all region information from restart,
// since we have `has_already_inited`.
let inner_msg = msg.get_message();
if inner_msg.get_msg_type() != MessageType::MsgAppend {
// we only handles the first MsgAppend
return false;
}
let region_id = msg.get_region_id();
let new_peer_id = msg.get_to_peer().get_id();

let cached_manager = self.get_cached_manager();
let mut is_first = false;
let mut is_replicated = false;
let mut has_already_inited = None;
let mut early_skip = false;

let f = |info: MapEntry<u64, Arc<CachedRegionInfo>>| {
let current = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
Expand Down Expand Up @@ -406,11 +464,15 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
self.store_id, region_id, new_peer_id, s;
"region_id" => region_id,
);
// We don't fallback if the fap snapshot is persisted,
// Because it has been sent, or has not been sent.
// So we can't decide whether to use fallback to clean the previous
// snapshot. Any later error will cause fap snapshot
// mismatch.
// We call fallback here even if the fap is persisted and sent.
// Because the sent snapshot is only to be handled if (idnex, term) matches,
// even if there is another normal snapshot. Because both snapshots are
// idendical. TODO However, we can retry FAP for
// several times before we fail. However,
// the cases here is rare. We have only observed several raft logs missing
// problem.
let cached_manager = self.get_cached_manager();
cached_manager.fallback_to_slow_path(region_id);
return false;
}
};
Expand All @@ -421,6 +483,8 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
self.store_id, region_id, new_peer_id, e;
"region_id" => region_id,
);
let cached_manager = self.get_cached_manager();
cached_manager.fallback_to_slow_path(region_id);
return false;
}
};
Expand Down Expand Up @@ -484,7 +548,11 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
// Find term of entry at applied_index.
let applied_index = apply_state.get_applied_index();
let applied_term =
self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index")?;
match self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index")
{
Ok(x) => x,
Err(e) => return Err(e),
};
// Will otherwise cause "got message with lower index than committed" loop.
// Maybe this can be removed, since fb0917bfa44ec1fc55967 can pass if we remove
// this constraint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,12 @@ impl MockRegion {
#[derive(Default)]
pub struct RegionStats {
pub pre_handle_count: AtomicU64,
// Count of call to `ffi_fast_add_peer`.
pub fast_add_peer_count: AtomicU64,
pub apply_snap_count: AtomicU64,
// FAP is finished building. Whether succeed or not.
pub finished_fast_add_peer_count: AtomicU64,
pub started_fast_add_peers: std::sync::Mutex<HashSet<u64>>,
}

// In case of newly added cfs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,20 @@ impl EngineStoreServer {
f(self.region_states.borrow_mut().get_mut(&region_id).unwrap())
}

pub fn mutate_region_states_mut<F: FnMut(&mut RegionStats)>(
&self,
region_id: RegionId,
mut f: F,
) {
let has = self.region_states.borrow().contains_key(&region_id);
if !has {
self.region_states
.borrow_mut()
.insert(region_id, Default::default());
}
f(self.region_states.borrow_mut().get_mut(&region_id).unwrap())
}

pub fn get_mem(
&self,
region_id: u64,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
let store_id = (*store.engine_store_server).id;
(*store.engine_store_server).mutate_region_states(region_id, |e: &mut RegionStats| {
e.fast_add_peer_count.fetch_add(1, Ordering::SeqCst);
e.started_fast_add_peers.lock().unwrap().insert(region_id);
});

let failed_add_peer_res =
Expand All @@ -147,6 +148,13 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
});
0
})() != 0;
let force_wait_for_data: bool = (|| {
fail::fail_point!("fap_mock_force_wait_for_data", |t| {
let t = t.unwrap().parse::<u64>().unwrap();
t
});
0
})() != 0;
let fail_after_write: bool = (|| {
fail::fail_point!("fap_mock_fail_after_write", |t| {
let t = t.unwrap().parse::<u64>().unwrap();
Expand All @@ -156,6 +164,10 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
})() != 0;
debug!("recover from remote peer: enter from {} to {}", from_store, store_id; "region_id" => region_id);

if force_wait_for_data {
debug!("recover from remote peer: force_wait_for_data from {} to {}", from_store, store_id; "region_id" => region_id);
return failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::WaitForData);
}
for retry in 0..300 {
let mut ret: Option<interfaces_ffi::FastAddPeerRes> = None;
if retry > 0 {
Expand Down Expand Up @@ -309,13 +321,33 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
if block_wait {
continue;
} else {
(*store.engine_store_server).mutate_region_states(
region_id,
|e: &mut RegionStats| {
e.finished_fast_add_peer_count
.fetch_add(1, Ordering::SeqCst);
},
);
return r;
}
}
_ => return r,
_ => {
(*store.engine_store_server).mutate_region_states(
region_id,
|e: &mut RegionStats| {
e.finished_fast_add_peer_count
.fetch_add(1, Ordering::SeqCst);
},
);
return r;
}
}
}
}
error!("recover from remote peer: failed after retry"; "region_id" => region_id);
(*store.engine_store_server).mutate_region_states(region_id, |e: &mut RegionStats| {
e.finished_fast_add_peer_count
.fetch_add(1, Ordering::SeqCst);
});
failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData)
}
100 changes: 100 additions & 0 deletions proxy_tests/proxy/shared/fast_add_peer/fp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ fn test_fall_back_to_slow_path() {
fail::cfg("fap_core_no_fast_path", "panic").unwrap();

pd_client.must_add_peer(1, new_learner_peer(2, 2));
// FAP will fail for "can't find entry for index 9 of region 1".
check_key(&cluster, b"k2", b"v2", Some(true), None, Some(vec![1, 2]));
must_wait_until_cond_node(
&cluster.cluster_ext,
Expand Down Expand Up @@ -943,3 +944,102 @@ fn test_single_replica_migrate() {
fail::remove("on_pre_write_apply_state");
cluster.shutdown();
}

// Test MsgSnapshot before MsgAppend
/// According to https://github.com/tikv/raft-rs/blob/2aefbf627f243dd261b7585ef1250d32efd9dfe7/src/raft.rs#L842,
/// if log is truncated in Leader, a MsgSnapshot may be sent directly before a
/// MsgAppend. If such MsgSnapshot is received when a FAP snapshot IS BUILDING,
/// then it will be dropped.
#[test]
fn test_msgsnapshot_before_msgappend() {
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 2);
pd_client.disable_default_operator();
fail::cfg("post_apply_snapshot_allow_no_unips", "return").unwrap();
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;

tikv_util::set_panic_hook(true, "./");
// Can always apply snapshot immediately
fail::cfg("on_can_apply_snapshot", "return(true)").unwrap();
fail::cfg("on_pre_write_apply_state", "return").unwrap();

let _ = cluster.run_conf_change();

cluster.must_put(b"k1", b"v1");
check_key(&cluster, b"k1", b"v1", Some(true), None, Some(vec![1]));
cluster.must_put(b"k2", b"v2");

fail::cfg("fap_core_no_fallback", "panic").unwrap();
fail::cfg("fap_mock_force_wait_for_data", "return(1)").unwrap();
pd_client.must_add_peer(1, new_learner_peer(2, 2));

std::thread::sleep(Duration::from_secs(1));

// Trigger direct MsgSnapshot.
let region = cluster.get_region("k1".as_bytes());
let prev_state = maybe_collect_states(&cluster.cluster_ext, 1, Some(vec![1]));
let (compact_index, compact_term) = get_valid_compact_index(&prev_state);
debug!("compact at index {}", compact_index);
let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term);
let req = test_raftstore::new_admin_request(1, region.get_region_epoch(), compact_log);
let res = cluster
.call_command_on_leader(req, Duration::from_secs(3))
.unwrap();

let mut t = 0;
while true {
let mut buf = Vec::<raft::eraftpb::Entry>::new();
cluster
.get_engines(1)
.raft
.get_all_entries_to(1, &mut buf)
.unwrap();
if buf.len() == 1 {
break;
}
std::thread::sleep(std::time::Duration::from_secs(1));
t += 1;
assert!(t < 11);
}

// MsgSnapshot will be rejected before.
fail::remove("fap_mock_force_wait_for_data");
cluster.clear_send_filters();

pd_client.must_add_peer(1, new_learner_peer(2, 2));

iter_ffi_helpers(&cluster, Some(vec![2]), &mut |_, ffi: &mut FFIHelperSet| {
let mut x: u64 = 0;
let mut y: u64 = 0;
(*ffi.engine_store_server).mutate_region_states_mut(1, |e: &mut RegionStats| {
x = e.finished_fast_add_peer_count.load(Ordering::SeqCst);
});
(*ffi.engine_store_server).mutate_region_states_mut(1, |e: &mut RegionStats| {
y = e.started_fast_add_peers.lock().unwrap().len() as u64;
});
assert_eq!(x, y);
});

// FAP will fail for "can't find entry for index 9 of region 1".
check_key(&cluster, b"k2", b"v2", Some(true), None, Some(vec![1, 2]));
must_wait_until_cond_node(
&cluster.cluster_ext,
1,
Some(vec![2]),
&|states: &States| -> bool {
find_peer_by_id(states.in_disk_region_state.get_region(), 2).is_some()
},
);

iter_ffi_helpers(&cluster, Some(vec![2]), &mut |_, ffi: &mut FFIHelperSet| {
assert_eq!(
ffi.engine_store_server_helper
.query_fap_snapshot_state(1, 2, 0, 0),
proxy_ffi::interfaces_ffi::FapSnapshotState::NotFound
);
});

fail::remove("on_can_apply_snapshot");
fail::remove("on_pre_write_apply_state");
fail::remove("fap_core_no_fallback");
cluster.shutdown();
}
6 changes: 2 additions & 4 deletions proxy_tests/proxy/shared/pprof_jemalloc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ use std::path::Path;

use tempfile::NamedTempFile;

use crate::utils::v1::*;

#[test]
fn test_adhoc_dump_prof() {
use proxy_server::status_server::vendored_utils::{
Expand All @@ -17,8 +15,8 @@ fn test_adhoc_dump_prof() {
let _ = activate_prof();
}

let x = vec![1; 1000];
let y = vec![1; 1000];
let _x = vec![1; 1000];
let _y = vec![1; 1000];

let f = NamedTempFile::new().unwrap();
let path = f.path().to_str().unwrap();
Expand Down
Loading
Loading