Skip to content

Commit cafa770

Browse files
authored
Make FAP FFIs aware the index and term of an fap snapshot (#365)
1 parent 23baffc commit cafa770

File tree

11 files changed

+266
-82
lines changed

11 files changed

+266
-82
lines changed

proxy_components/engine_store_ffi/src/core/fast_add_peer.rs

+25-21
Original file line numberDiff line numberDiff line change
@@ -214,7 +214,6 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
214214
Ordering::SeqCst,
215215
);
216216
}
217-
// TODO include create
218217
is_replicated = o.get().replicated_or_created.load(Ordering::SeqCst);
219218
}
220219
MapEntry::Vacant(v) => {
@@ -309,7 +308,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
309308
return true;
310309
}
311310

312-
info!("fast path: ongoing {}:{} {}, fetch data from remote peer", self.store_id, region_id, new_peer_id;
311+
debug!("fast path: ongoing {}:{} {}, fetch data from remote peer", self.store_id, region_id, new_peer_id;
313312
"to_peer_id" => msg.get_to_peer().get_id(),
314313
"from_peer_id" => msg.get_from_peer().get_id(),
315314
"region_id" => region_id,
@@ -407,7 +406,11 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
407406
self.store_id, region_id, new_peer_id, s;
408407
"region_id" => region_id,
409408
);
410-
self.fap_fallback_to_slow(region_id);
409+
// We don't fallback if the fap snapshot is persisted,
410+
// Because it has been sent, or has not been sent.
411+
// So we can't decide whether to use fallback to clean the previous
412+
// snapshot. Any later error will cause fap snapshot
413+
// mismatch.
411414
return false;
412415
}
413416
};
@@ -418,7 +421,6 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
418421
self.store_id, region_id, new_peer_id, e;
419422
"region_id" => region_id,
420423
);
421-
self.fap_fallback_to_slow(region_id);
422424
return false;
423425
}
424426
};
@@ -481,28 +483,30 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
481483
}
482484
}
483485

486+
// Find term of entry at applied_index.
487+
let applied_index = apply_state.get_applied_index();
488+
let applied_term =
489+
self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index")?;
490+
// Will otherwise cause "got message with lower index than committed" loop.
491+
// Maybe this can be removed, since fb0917bfa44ec1fc55967 can pass if we remove
492+
// this constraint.
493+
self.check_entry_at_index(
494+
region_id,
495+
apply_state.get_commit_index(),
496+
new_peer_id,
497+
"commit_index",
498+
)?;
499+
484500
// Get a snapshot object.
485501
let (mut snapshot, key) = {
486-
// Find term of entry at applied_index.
487-
let applied_index = apply_state.get_applied_index();
488-
let applied_term =
489-
self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index")?;
490-
// Will otherwise cause "got message with lower index than committed" loop.
491-
// Maybe this can be removed, since fb0917bfa44ec1fc55967 can pass if we remove
492-
// this constraint.
493-
self.check_entry_at_index(
494-
region_id,
495-
apply_state.get_commit_index(),
496-
new_peer_id,
497-
"commit_index",
498-
)?;
499-
500502
let key = SnapKey::new(region_id, applied_term, applied_index);
501503
self.snap_mgr.register(key.clone(), SnapEntry::Generating);
502-
defer!(self.snap_mgr.deregister(&key, &SnapEntry::Generating));
504+
// TODO(fap) could be "save meta file without metadata for" error, if generated
505+
// twice. See `do_build`.
503506
let snapshot = self.snap_mgr.get_snapshot_for_building(&key)?;
504-
(snapshot, key.clone())
507+
(snapshot, key)
505508
};
509+
defer!(self.snap_mgr.deregister(&key, &SnapEntry::Generating));
506510

507511
// Build snapshot by do_snapshot
508512
let mut pb_snapshot: eraftpb::Snapshot = Default::default();
@@ -522,7 +526,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
522526
// Create fake cf file.
523527
let mut path = cf_file.path.clone();
524528
path.push(cf_file.file_prefix.clone());
525-
path.set_extension("empty.sst");
529+
path.set_extension(".sst");
526530
// Something like `${prefix}/gen_1_6_15_write.sst`
527531
let mut f = std::fs::File::create(path.as_path())?;
528532
f.flush()?;

proxy_components/engine_store_ffi/src/core/forward_raft/fap_snapshot.rs

+85-33
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,19 @@ use crate::{
55
fatal,
66
};
77

8+
#[derive(PartialEq)]
9+
pub enum SnapshotDeducedType {
10+
Uncertain,
11+
Regular,
12+
Fap,
13+
}
14+
815
impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
16+
pub fn deduce_snapshot_type(_peer_id: u64, _snap: &store::Snapshot) -> SnapshotDeducedType {
17+
// TODO(fap) implement in seperated modes(serverless or op).
18+
SnapshotDeducedType::Uncertain
19+
}
20+
921
pub fn pre_apply_snapshot_for_fap_snapshot(
1022
&self,
1123
ob_region: &Region,
@@ -21,7 +33,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
2133
|info: MapEntry<u64, Arc<CachedRegionInfo>>| match info {
2234
MapEntry::Occupied(_) => {
2335
if !self.engine_store_server_helper.kvstore_region_exist(region_id) {
24-
if self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id) == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted {
36+
if self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id, snap_key.idx, snap_key.term) == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted {
2537
info!("fast path: prehandle first snapshot skipped {}:{} {}", self.store_id, region_id, peer_id;
2638
"snap_key" => ?snap_key,
2739
"region_id" => region_id,
@@ -32,7 +44,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
3244
}
3345
MapEntry::Vacant(_) => {
3446
// It won't go here because cached region info is inited after restart and on the first fap message.
35-
let pstate = self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id);
47+
let pstate = self.engine_store_server_helper.query_fap_snapshot_state(region_id, peer_id, snap_key.idx, snap_key.term);
3648
if pstate == proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted {
3749
// We have a fap snapshot now. skip
3850
info!("fast path: prehandle first snapshot skipped after restart {}:{} {}", self.store_id, region_id, peer_id;
@@ -63,10 +75,21 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
6375
ob_region: &Region,
6476
peer_id: u64,
6577
snap_key: &store::SnapKey,
78+
maybe_snap: Option<&store::Snapshot>,
6679
) -> bool {
6780
let region_id = ob_region.get_id();
68-
let try_apply_fap_snapshot = |c: Arc<CachedRegionInfo>, restarted: bool| {
69-
info!("fast path: start applying first snapshot {}:{} {}", self.store_id, region_id, peer_id;
81+
let try_apply_fap_snapshot = |c: Arc<CachedRegionInfo>| {
82+
let already_existed = self
83+
.engine_store_server_helper
84+
.kvstore_region_exist(region_id);
85+
if already_existed {
86+
debug!("fast path: skip apply snapshot because not first {}:{} {}", self.store_id, region_id, peer_id;
87+
"snap_key" => ?snap_key,
88+
"region_id" => region_id,
89+
);
90+
return false;
91+
}
92+
info!("fast path: start applying first fap snapshot {}:{} {}", self.store_id, region_id, peer_id;
7093
"snap_key" => ?snap_key,
7194
"region_id" => region_id,
7295
);
@@ -80,36 +103,78 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
80103
.duration_since(SystemTime::UNIX_EPOCH)
81104
.unwrap();
82105

83-
let assert_exist = if !restarted {
84-
snapshot_sent_time != 0
85-
} else {
86-
false
106+
let snap = match maybe_snap {
107+
Some(s) => s,
108+
None => {
109+
return false;
110+
}
87111
};
88-
if !self
89-
.engine_store_server_helper
90-
.apply_fap_snapshot(region_id, peer_id, assert_exist)
91-
{
92-
// This is not a fap snapshot.
93-
info!("fast path: this is not fap snapshot {}:{} {}, goto tikv snapshot", self.store_id, region_id, peer_id;
112+
113+
// We can't rely on `snapshot_inflight`, because it will be undetermined ZERO
114+
// after restart.
115+
let expected_snapshot_type = Self::deduce_snapshot_type(peer_id, snap);
116+
117+
let quit_apply_fap = |tag: &str| {
118+
info!("fast path: fap snapshot mismatch/nonexist {}:{} {}", self.store_id, region_id, peer_id;
94119
"snap_key" => ?snap_key,
95120
"region_id" => region_id,
96121
"cost_snapshot" => current.as_millis() - snapshot_sent_time,
97122
"cost_total" => current.as_millis() - fap_start_time,
98123
"current_enabled" => current_enabled,
99-
"from_restart" => restarted,
124+
"tag" => tag
100125
);
126+
if expected_snapshot_type == SnapshotDeducedType::Fap {
127+
// It won't actually happen because TiFlash will panic since `assert_exist` is
128+
// true in this case.
129+
fatal!(
130+
"fast path: fap snapshot apply failed {}:{} {}, which is assert to be fap snapshot",
131+
self.store_id,
132+
region_id,
133+
peer_id
134+
);
135+
}
101136
c.snapshot_inflight.store(0, Ordering::SeqCst);
102137
c.fast_add_peer_start.store(0, Ordering::SeqCst);
103138
c.inited_or_fallback.store(true, Ordering::SeqCst);
104-
return false;
139+
false
140+
};
141+
142+
// If there is no fap snapshot with given (index, term) we shall quit.
143+
if self.engine_store_server_helper.query_fap_snapshot_state(
144+
region_id,
145+
peer_id,
146+
snap_key.idx,
147+
snap_key.term,
148+
) != proxy_ffi::interfaces_ffi::FapSnapshotState::Persisted
149+
{
150+
return quit_apply_fap("pre check");
105151
}
106-
info!("fast path: finished applied first snapshot {}:{} {}, recover MsgAppend", self.store_id, region_id, peer_id;
152+
153+
// Only succeeds if (index, term) matches.
154+
// Returns false if `assert_exist` is false,
155+
// Panics if `assert_exist` is true.
156+
// The logic is kind of redundant, but we want to make it complete on both
157+
// sides.
158+
if !self.engine_store_server_helper.apply_fap_snapshot(
159+
region_id,
160+
peer_id,
161+
true,
162+
snap_key.idx,
163+
snap_key.term,
164+
) {
165+
return quit_apply_fap("apply");
166+
}
167+
// If it's a reguar snapshot have the same (index, term) as the fap snapshot,
168+
// it make no difference which snapshot we actually applied.
169+
// So we always choose to apply a fap snapshot, since it saves as from
170+
// prehandling work.
171+
info!("fast path: finished applied first fap snapshot {}:{} {}, recover MsgAppend", self.store_id, region_id, peer_id;
107172
"snap_key" => ?snap_key,
108173
"region_id" => region_id,
109174
"cost_snapshot" => current.as_millis() - snapshot_sent_time,
110175
"cost_total" => current.as_millis() - fap_start_time,
111176
"current_enabled" => current_enabled,
112-
"from_restart" => restarted,
177+
"replacement_of_regular" => expected_snapshot_type == SnapshotDeducedType::Regular
113178
);
114179
c.snapshot_inflight.store(0, Ordering::SeqCst);
115180
c.fast_add_peer_start.store(0, Ordering::SeqCst);
@@ -132,26 +197,13 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
132197
let mut applied_fap = false;
133198
#[allow(clippy::collapsible_if)]
134199
if should_check_fap_snapshot {
135-
let mut maybe_cached_info: Option<Arc<CachedRegionInfo>> = None;
136200
if self
137201
.get_cached_manager()
138202
.access_cached_region_info_mut(
139203
region_id,
140204
|info: MapEntry<u64, Arc<CachedRegionInfo>>| match info {
141205
MapEntry::Occupied(o) => {
142-
maybe_cached_info = Some(o.get().clone());
143-
let already_existed = self.engine_store_server_helper.kvstore_region_exist(region_id);
144-
debug!("fast path: check should apply fap snapshot {}:{} {}", self.store_id, region_id, peer_id;
145-
"snap_key" => ?snap_key,
146-
"region_id" => region_id,
147-
"inited_or_fallback" => o.get().inited_or_fallback.load(Ordering::SeqCst),
148-
"snapshot_inflight" => o.get().snapshot_inflight.load(Ordering::SeqCst),
149-
"already_existed" => already_existed,
150-
);
151-
if !already_existed {
152-
// May be a fap snapshot, try to apply.
153-
applied_fap = try_apply_fap_snapshot(o.get().clone(), false);
154-
}
206+
applied_fap = try_apply_fap_snapshot(o.get().clone());
155207
}
156208
MapEntry::Vacant(_) => {
157209
// It won't go here because cached region info is inited after restart and on the first fap message.
@@ -161,7 +213,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
161213
);
162214
assert!(self.is_initialized(region_id));
163215
let o = Arc::new(CachedRegionInfo::default());
164-
applied_fap = try_apply_fap_snapshot(o, true);
216+
applied_fap = try_apply_fap_snapshot(o);
165217
}
166218
},
167219
)

proxy_components/engine_store_ffi/src/core/forward_raft/region.rs

+20-6
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use proxy_ffi::fatal;
2+
13
// Copyright 2022 TiKV Project Authors. Licensed under Apache-2.0.
24
use crate::core::{common::*, ProxyForwarder};
35

@@ -40,10 +42,16 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
4042
v.insert(Arc::new(c));
4143
}
4244
};
43-
// TODO remove unwrap
44-
self.get_cached_manager()
45+
if self
46+
.get_cached_manager()
4547
.access_cached_region_info_mut(region_id, f)
46-
.unwrap();
48+
.is_err()
49+
{
50+
fatal!(
51+
"on_region_changed could not found region info region_id={}",
52+
region_id
53+
);
54+
}
4755
}
4856
}
4957

@@ -135,9 +143,15 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
135143
v.insert(Arc::new(c));
136144
}
137145
};
138-
// TODO remove unwrap
139-
self.get_cached_manager()
146+
if self
147+
.get_cached_manager()
140148
.access_cached_region_info_mut(region_id, f)
141-
.unwrap();
149+
.is_err()
150+
{
151+
fatal!(
152+
"on_role_change could not found region info region_id={}",
153+
region_id
154+
);
155+
}
142156
}
143157
}

proxy_components/engine_store_ffi/src/core/forward_raft/snapshot.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99

1010
type SSTInfo = (String, ColumnFamilyType);
1111

12-
fn retrieve_sst_files(peer_id: u64, snap: &store::Snapshot) -> Vec<SSTInfo> {
12+
pub fn retrieve_sst_files(peer_id: u64, snap: &store::Snapshot) -> Vec<SSTInfo> {
1313
let mut sst_views: Vec<SSTInfo> = vec![];
1414
let mut ssts = vec![];
1515
let v2_db_path = snap.snapshot_meta().as_ref().and_then(|m| {
@@ -40,7 +40,7 @@ fn retrieve_sst_files(peer_id: u64, snap: &store::Snapshot) -> Vec<SSTInfo> {
4040
assert!(!full_paths.is_empty());
4141
if full_paths.len() != 1 {
4242
// Multi sst files for one cf.
43-
tikv_util::info!("observe multi-file snapshot";
43+
tikv_util::debug!("observe multi-file snapshot";
4444
"snap" => ?snap,
4545
"cf" => ?cf_file.cf,
4646
"total" => full_paths.len(),
@@ -232,7 +232,7 @@ impl<T: Transport + 'static, ER: RaftEngine> ProxyForwarder<T, ER> {
232232
return;
233233
});
234234
let region_id = ob_region.get_id();
235-
if self.post_apply_snapshot_for_fap_snapshot(ob_region, peer_id, snap_key) {
235+
if self.post_apply_snapshot_for_fap_snapshot(ob_region, peer_id, snap_key, snap) {
236236
// Already handled as an fap snapshot.
237237
return;
238238
}

0 commit comments

Comments
 (0)