Skip to content

Commit 114b74c

Browse files
authored
Handle MsgSnapshot sent before MsgAppend (#389)
Signed-off-by: Calvin Neo <[email protected]>
1 parent 4ebe44d commit 114b74c

File tree

7 files changed

+242
-43
lines changed

7 files changed

+242
-43
lines changed

proxy_components/engine_store_ffi/src/core/fast_add_peer.rs

+82-14
Original file line numberDiff line numberDiff line change
@@ -78,33 +78,91 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
7878
)
7979
}
8080

81+
// Returns whether we should ignore the MsgSnapshot.
82+
#[allow(clippy::collapsible_if)]
83+
fn snapshot_filter(&self, msg: &RaftMessage) -> bool {
84+
let inner_msg = msg.get_message();
85+
let region_id = msg.get_region_id();
86+
let new_peer_id = msg.get_to_peer().get_id();
87+
let mut should_skip = false;
88+
let f = |info: MapEntry<u64, Arc<CachedRegionInfo>>| {
89+
match info {
90+
MapEntry::Occupied(mut o) => {
91+
// If the peer is bootstrapped, we will accept the MsgSnapshot.
92+
if o.get().inited_or_fallback.load(Ordering::SeqCst) {
93+
return;
94+
}
95+
let has_already_inited = self.is_initialized(region_id);
96+
if has_already_inited {
97+
o.get_mut().inited_or_fallback.store(true, Ordering::SeqCst);
98+
}
99+
if o.get().fast_add_peer_start.load(Ordering::SeqCst) != 0 {
100+
if o.get().snapshot_inflight.load(Ordering::SeqCst) == 0 {
101+
// If the FAP snapshot is building, skip this MsgSnapshot.
102+
// We will wait until the FAP is succeed or fallbacked.
103+
info!("fast path: ongoing {}:{} {}, MsgSnapshot rejected",
104+
self.store_id, region_id, new_peer_id;
105+
"to_peer_id" => msg.get_to_peer().get_id(),
106+
"from_peer_id" => msg.get_from_peer().get_id(),
107+
"region_id" => region_id,
108+
"inner_msg" => Self::format_msg(inner_msg),
109+
"has_already_inited" => has_already_inited,
110+
"inited_or_fallback" => o.get().inited_or_fallback.load(Ordering::SeqCst),
111+
"snapshot_inflight" => o.get().snapshot_inflight.load(Ordering::SeqCst),
112+
"fast_add_peer_start" => o.get().fast_add_peer_start.load(Ordering::SeqCst),
113+
);
114+
should_skip = true;
115+
}
116+
// Otherwise, this snapshot could be either FAP
117+
// snapshot, or normal snapshot.
118+
// In each case, we should handle them.
119+
}
120+
}
121+
MapEntry::Vacant(_) => {}
122+
}
123+
};
124+
125+
match self.get_cached_manager().get_inited_or_fallback(region_id) {
126+
Some(true) => {
127+
// Most cases, when the peer is already inited.
128+
}
129+
None | Some(false) => self
130+
.get_cached_manager()
131+
.access_cached_region_info_mut(region_id, f)
132+
.unwrap(),
133+
};
134+
135+
should_skip
136+
}
137+
81138
// Returns whether we need to ignore this message and run fast path instead.
82139
pub fn maybe_fast_path_tick(&self, msg: &RaftMessage) -> bool {
83140
if !self.packed_envs.engine_store_cfg.enable_fast_add_peer {
84141
// fast path not enabled
85142
return false;
86143
}
87144
let inner_msg = msg.get_message();
145+
let region_id = msg.get_region_id();
146+
let new_peer_id = msg.get_to_peer().get_id();
88147
if inner_msg.get_commit() == 0 && inner_msg.get_msg_type() == MessageType::MsgHeartbeat {
89148
return false;
90149
} else if inner_msg.get_msg_type() == MessageType::MsgAppend {
150+
// Go on to following logic to see if we should filter.
151+
} else if inner_msg.get_msg_type() == MessageType::MsgSnapshot {
152+
return self.snapshot_filter(msg);
91153
} else {
154+
// We only handles the first MsgAppend.
92155
return false;
93156
}
94-
// We don't need to recover all region infomation from restart,
157+
// We don't need to recover all region information from restart,
95158
// since we have `has_already_inited`.
96-
let inner_msg = msg.get_message();
97-
if inner_msg.get_msg_type() != MessageType::MsgAppend {
98-
// we only handles the first MsgAppend
99-
return false;
100-
}
101-
let region_id = msg.get_region_id();
102-
let new_peer_id = msg.get_to_peer().get_id();
159+
103160
let cached_manager = self.get_cached_manager();
104161
let mut is_first = false;
105162
let mut is_replicated = false;
106163
let mut has_already_inited = None;
107164
let mut early_skip = false;
165+
108166
let f = |info: MapEntry<u64, Arc<CachedRegionInfo>>| {
109167
let current = SystemTime::now()
110168
.duration_since(SystemTime::UNIX_EPOCH)
@@ -406,11 +464,15 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
406464
self.store_id, region_id, new_peer_id, s;
407465
"region_id" => region_id,
408466
);
409-
// We don't fallback if the fap snapshot is persisted,
410-
// Because it has been sent, or has not been sent.
411-
// So we can't decide whether to use fallback to clean the previous
412-
// snapshot. Any later error will cause fap snapshot
413-
// mismatch.
467+
// We call fallback here even if the fap is persisted and sent.
468+
// Because the sent snapshot is only to be handled if (idnex, term) matches,
469+
// even if there is another normal snapshot. Because both snapshots are
470+
// idendical. TODO However, we can retry FAP for
471+
// several times before we fail. However,
472+
// the cases here is rare. We have only observed several raft logs missing
473+
// problem.
474+
let cached_manager = self.get_cached_manager();
475+
cached_manager.fallback_to_slow_path(region_id);
414476
return false;
415477
}
416478
};
@@ -421,6 +483,8 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
421483
self.store_id, region_id, new_peer_id, e;
422484
"region_id" => region_id,
423485
);
486+
let cached_manager = self.get_cached_manager();
487+
cached_manager.fallback_to_slow_path(region_id);
424488
return false;
425489
}
426490
};
@@ -484,7 +548,11 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
484548
// Find term of entry at applied_index.
485549
let applied_index = apply_state.get_applied_index();
486550
let applied_term =
487-
self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index")?;
551+
match self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index")
552+
{
553+
Ok(x) => x,
554+
Err(e) => return Err(e),
555+
};
488556
// Will otherwise cause "got message with lower index than committed" loop.
489557
// Maybe this can be removed, since fb0917bfa44ec1fc55967 can pass if we remove
490558
// this constraint.

proxy_components/mock-engine-store/src/mock_store/mock_core.rs

+4
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,12 @@ impl MockRegion {
4343
#[derive(Default)]
4444
pub struct RegionStats {
4545
pub pre_handle_count: AtomicU64,
46+
// Count of call to `ffi_fast_add_peer`.
4647
pub fast_add_peer_count: AtomicU64,
4748
pub apply_snap_count: AtomicU64,
49+
// FAP is finished building. Whether succeed or not.
50+
pub finished_fast_add_peer_count: AtomicU64,
51+
pub started_fast_add_peers: std::sync::Mutex<HashSet<u64>>,
4852
}
4953

5054
// In case of newly added cfs.

proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs

+14
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,20 @@ impl EngineStoreServer {
7676
f(self.region_states.borrow_mut().get_mut(&region_id).unwrap())
7777
}
7878

79+
pub fn mutate_region_states_mut<F: FnMut(&mut RegionStats)>(
80+
&self,
81+
region_id: RegionId,
82+
mut f: F,
83+
) {
84+
let has = self.region_states.borrow().contains_key(&region_id);
85+
if !has {
86+
self.region_states
87+
.borrow_mut()
88+
.insert(region_id, Default::default());
89+
}
90+
f(self.region_states.borrow_mut().get_mut(&region_id).unwrap())
91+
}
92+
7993
pub fn get_mem(
8094
&self,
8195
region_id: u64,

proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs

+33-1
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
125125
let store_id = (*store.engine_store_server).id;
126126
(*store.engine_store_server).mutate_region_states(region_id, |e: &mut RegionStats| {
127127
e.fast_add_peer_count.fetch_add(1, Ordering::SeqCst);
128+
e.started_fast_add_peers.lock().unwrap().insert(region_id);
128129
});
129130

130131
let failed_add_peer_res =
@@ -147,6 +148,13 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
147148
});
148149
0
149150
})() != 0;
151+
let force_wait_for_data: bool = (|| {
152+
fail::fail_point!("fap_mock_force_wait_for_data", |t| {
153+
let t = t.unwrap().parse::<u64>().unwrap();
154+
t
155+
});
156+
0
157+
})() != 0;
150158
let fail_after_write: bool = (|| {
151159
fail::fail_point!("fap_mock_fail_after_write", |t| {
152160
let t = t.unwrap().parse::<u64>().unwrap();
@@ -156,6 +164,10 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
156164
})() != 0;
157165
debug!("recover from remote peer: enter from {} to {}", from_store, store_id; "region_id" => region_id);
158166

167+
if force_wait_for_data {
168+
debug!("recover from remote peer: force_wait_for_data from {} to {}", from_store, store_id; "region_id" => region_id);
169+
return failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::WaitForData);
170+
}
159171
for retry in 0..300 {
160172
let mut ret: Option<interfaces_ffi::FastAddPeerRes> = None;
161173
if retry > 0 {
@@ -309,13 +321,33 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer(
309321
if block_wait {
310322
continue;
311323
} else {
324+
(*store.engine_store_server).mutate_region_states(
325+
region_id,
326+
|e: &mut RegionStats| {
327+
e.finished_fast_add_peer_count
328+
.fetch_add(1, Ordering::SeqCst);
329+
},
330+
);
312331
return r;
313332
}
314333
}
315-
_ => return r,
334+
_ => {
335+
(*store.engine_store_server).mutate_region_states(
336+
region_id,
337+
|e: &mut RegionStats| {
338+
e.finished_fast_add_peer_count
339+
.fetch_add(1, Ordering::SeqCst);
340+
},
341+
);
342+
return r;
343+
}
316344
}
317345
}
318346
}
319347
error!("recover from remote peer: failed after retry"; "region_id" => region_id);
348+
(*store.engine_store_server).mutate_region_states(region_id, |e: &mut RegionStats| {
349+
e.finished_fast_add_peer_count
350+
.fetch_add(1, Ordering::SeqCst);
351+
});
320352
failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData)
321353
}

proxy_tests/proxy/shared/fast_add_peer/fp.rs

+100
Original file line numberDiff line numberDiff line change
@@ -853,6 +853,7 @@ fn test_fall_back_to_slow_path() {
853853
fail::cfg("fap_core_no_fast_path", "panic").unwrap();
854854

855855
pd_client.must_add_peer(1, new_learner_peer(2, 2));
856+
// FAP will fail for "can't find entry for index 9 of region 1".
856857
check_key(&cluster, b"k2", b"v2", Some(true), None, Some(vec![1, 2]));
857858
must_wait_until_cond_node(
858859
&cluster.cluster_ext,
@@ -943,3 +944,102 @@ fn test_single_replica_migrate() {
943944
fail::remove("on_pre_write_apply_state");
944945
cluster.shutdown();
945946
}
947+
948+
// Test MsgSnapshot before MsgAppend
949+
/// According to https://github.com/tikv/raft-rs/blob/2aefbf627f243dd261b7585ef1250d32efd9dfe7/src/raft.rs#L842,
950+
/// if log is truncated in Leader, a MsgSnapshot may be sent directly before a
951+
/// MsgAppend. If such MsgSnapshot is received when a FAP snapshot IS BUILDING,
952+
/// then it will be dropped.
953+
#[test]
954+
fn test_msgsnapshot_before_msgappend() {
955+
let (mut cluster, pd_client) = new_mock_cluster_snap(0, 2);
956+
pd_client.disable_default_operator();
957+
fail::cfg("post_apply_snapshot_allow_no_unips", "return").unwrap();
958+
cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true;
959+
960+
tikv_util::set_panic_hook(true, "./");
961+
// Can always apply snapshot immediately
962+
fail::cfg("on_can_apply_snapshot", "return(true)").unwrap();
963+
fail::cfg("on_pre_write_apply_state", "return").unwrap();
964+
965+
let _ = cluster.run_conf_change();
966+
967+
cluster.must_put(b"k1", b"v1");
968+
check_key(&cluster, b"k1", b"v1", Some(true), None, Some(vec![1]));
969+
cluster.must_put(b"k2", b"v2");
970+
971+
fail::cfg("fap_core_no_fallback", "panic").unwrap();
972+
fail::cfg("fap_mock_force_wait_for_data", "return(1)").unwrap();
973+
pd_client.must_add_peer(1, new_learner_peer(2, 2));
974+
975+
std::thread::sleep(Duration::from_secs(1));
976+
977+
// Trigger direct MsgSnapshot.
978+
let region = cluster.get_region("k1".as_bytes());
979+
let prev_state = maybe_collect_states(&cluster.cluster_ext, 1, Some(vec![1]));
980+
let (compact_index, compact_term) = get_valid_compact_index(&prev_state);
981+
debug!("compact at index {}", compact_index);
982+
let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term);
983+
let req = test_raftstore::new_admin_request(1, region.get_region_epoch(), compact_log);
984+
let res = cluster
985+
.call_command_on_leader(req, Duration::from_secs(3))
986+
.unwrap();
987+
988+
let mut t = 0;
989+
while true {
990+
let mut buf = Vec::<raft::eraftpb::Entry>::new();
991+
cluster
992+
.get_engines(1)
993+
.raft
994+
.get_all_entries_to(1, &mut buf)
995+
.unwrap();
996+
if buf.len() == 1 {
997+
break;
998+
}
999+
std::thread::sleep(std::time::Duration::from_secs(1));
1000+
t += 1;
1001+
assert!(t < 11);
1002+
}
1003+
1004+
// MsgSnapshot will be rejected before.
1005+
fail::remove("fap_mock_force_wait_for_data");
1006+
cluster.clear_send_filters();
1007+
1008+
pd_client.must_add_peer(1, new_learner_peer(2, 2));
1009+
1010+
iter_ffi_helpers(&cluster, Some(vec![2]), &mut |_, ffi: &mut FFIHelperSet| {
1011+
let mut x: u64 = 0;
1012+
let mut y: u64 = 0;
1013+
(*ffi.engine_store_server).mutate_region_states_mut(1, |e: &mut RegionStats| {
1014+
x = e.finished_fast_add_peer_count.load(Ordering::SeqCst);
1015+
});
1016+
(*ffi.engine_store_server).mutate_region_states_mut(1, |e: &mut RegionStats| {
1017+
y = e.started_fast_add_peers.lock().unwrap().len() as u64;
1018+
});
1019+
assert_eq!(x, y);
1020+
});
1021+
1022+
// FAP will fail for "can't find entry for index 9 of region 1".
1023+
check_key(&cluster, b"k2", b"v2", Some(true), None, Some(vec![1, 2]));
1024+
must_wait_until_cond_node(
1025+
&cluster.cluster_ext,
1026+
1,
1027+
Some(vec![2]),
1028+
&|states: &States| -> bool {
1029+
find_peer_by_id(states.in_disk_region_state.get_region(), 2).is_some()
1030+
},
1031+
);
1032+
1033+
iter_ffi_helpers(&cluster, Some(vec![2]), &mut |_, ffi: &mut FFIHelperSet| {
1034+
assert_eq!(
1035+
ffi.engine_store_server_helper
1036+
.query_fap_snapshot_state(1, 2, 0, 0),
1037+
proxy_ffi::interfaces_ffi::FapSnapshotState::NotFound
1038+
);
1039+
});
1040+
1041+
fail::remove("on_can_apply_snapshot");
1042+
fail::remove("on_pre_write_apply_state");
1043+
fail::remove("fap_core_no_fallback");
1044+
cluster.shutdown();
1045+
}

proxy_tests/proxy/shared/pprof_jemalloc.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ use std::path::Path;
44

55
use tempfile::NamedTempFile;
66

7-
use crate::utils::v1::*;
8-
97
#[test]
108
fn test_adhoc_dump_prof() {
119
use proxy_server::status_server::vendored_utils::{
@@ -17,8 +15,8 @@ fn test_adhoc_dump_prof() {
1715
let _ = activate_prof();
1816
}
1917

20-
let x = vec![1; 1000];
21-
let y = vec![1; 1000];
18+
let _x = vec![1; 1000];
19+
let _y = vec![1; 1000];
2220

2321
let f = NamedTempFile::new().unwrap();
2422
let path = f.path().to_str().unwrap();

0 commit comments

Comments
 (0)