diff --git a/proxy_components/engine_store_ffi/src/core/fast_add_peer.rs b/proxy_components/engine_store_ffi/src/core/fast_add_peer.rs index aa19b838828..49d201ba7cd 100644 --- a/proxy_components/engine_store_ffi/src/core/fast_add_peer.rs +++ b/proxy_components/engine_store_ffi/src/core/fast_add_peer.rs @@ -78,6 +78,63 @@ impl ProxyForwarder { ) } + // Returns whether we should ignore the MsgSnapshot. + #[allow(clippy::collapsible_if)] + fn snapshot_filter(&self, msg: &RaftMessage) -> bool { + let inner_msg = msg.get_message(); + let region_id = msg.get_region_id(); + let new_peer_id = msg.get_to_peer().get_id(); + let mut should_skip = false; + let f = |info: MapEntry>| { + match info { + MapEntry::Occupied(mut o) => { + // If the peer is bootstrapped, we will accept the MsgSnapshot. + if o.get().inited_or_fallback.load(Ordering::SeqCst) { + return; + } + let has_already_inited = self.is_initialized(region_id); + if has_already_inited { + o.get_mut().inited_or_fallback.store(true, Ordering::SeqCst); + } + if o.get().fast_add_peer_start.load(Ordering::SeqCst) != 0 { + if o.get().snapshot_inflight.load(Ordering::SeqCst) == 0 { + // If the FAP snapshot is building, skip this MsgSnapshot. + // We will wait until the FAP is succeed or fallbacked. + info!("fast path: ongoing {}:{} {}, MsgSnapshot rejected", + self.store_id, region_id, new_peer_id; + "to_peer_id" => msg.get_to_peer().get_id(), + "from_peer_id" => msg.get_from_peer().get_id(), + "region_id" => region_id, + "inner_msg" => Self::format_msg(inner_msg), + "has_already_inited" => has_already_inited, + "inited_or_fallback" => o.get().inited_or_fallback.load(Ordering::SeqCst), + "snapshot_inflight" => o.get().snapshot_inflight.load(Ordering::SeqCst), + "fast_add_peer_start" => o.get().fast_add_peer_start.load(Ordering::SeqCst), + ); + should_skip = true; + } + // Otherwise, this snapshot could be either FAP + // snapshot, or normal snapshot. + // In each case, we should handle them. + } + } + MapEntry::Vacant(_) => {} + } + }; + + match self.get_cached_manager().get_inited_or_fallback(region_id) { + Some(true) => { + // Most cases, when the peer is already inited. + } + None | Some(false) => self + .get_cached_manager() + .access_cached_region_info_mut(region_id, f) + .unwrap(), + }; + + should_skip + } + // Returns whether we need to ignore this message and run fast path instead. pub fn maybe_fast_path_tick(&self, msg: &RaftMessage) -> bool { if !self.packed_envs.engine_store_cfg.enable_fast_add_peer { @@ -85,26 +142,27 @@ impl ProxyForwarder { return false; } let inner_msg = msg.get_message(); + let region_id = msg.get_region_id(); + let new_peer_id = msg.get_to_peer().get_id(); if inner_msg.get_commit() == 0 && inner_msg.get_msg_type() == MessageType::MsgHeartbeat { return false; } else if inner_msg.get_msg_type() == MessageType::MsgAppend { + // Go on to following logic to see if we should filter. + } else if inner_msg.get_msg_type() == MessageType::MsgSnapshot { + return self.snapshot_filter(msg); } else { + // We only handles the first MsgAppend. return false; } - // We don't need to recover all region infomation from restart, + // We don't need to recover all region information from restart, // since we have `has_already_inited`. - let inner_msg = msg.get_message(); - if inner_msg.get_msg_type() != MessageType::MsgAppend { - // we only handles the first MsgAppend - return false; - } - let region_id = msg.get_region_id(); - let new_peer_id = msg.get_to_peer().get_id(); + let cached_manager = self.get_cached_manager(); let mut is_first = false; let mut is_replicated = false; let mut has_already_inited = None; let mut early_skip = false; + let f = |info: MapEntry>| { let current = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) @@ -406,11 +464,15 @@ impl ProxyForwarder { self.store_id, region_id, new_peer_id, s; "region_id" => region_id, ); - // We don't fallback if the fap snapshot is persisted, - // Because it has been sent, or has not been sent. - // So we can't decide whether to use fallback to clean the previous - // snapshot. Any later error will cause fap snapshot - // mismatch. + // We call fallback here even if the fap is persisted and sent. + // Because the sent snapshot is only to be handled if (idnex, term) matches, + // even if there is another normal snapshot. Because both snapshots are + // idendical. TODO However, we can retry FAP for + // several times before we fail. However, + // the cases here is rare. We have only observed several raft logs missing + // problem. + let cached_manager = self.get_cached_manager(); + cached_manager.fallback_to_slow_path(region_id); return false; } }; @@ -421,6 +483,8 @@ impl ProxyForwarder { self.store_id, region_id, new_peer_id, e; "region_id" => region_id, ); + let cached_manager = self.get_cached_manager(); + cached_manager.fallback_to_slow_path(region_id); return false; } }; @@ -484,7 +548,11 @@ impl ProxyForwarder { // Find term of entry at applied_index. let applied_index = apply_state.get_applied_index(); let applied_term = - self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index")?; + match self.check_entry_at_index(region_id, applied_index, new_peer_id, "applied_index") + { + Ok(x) => x, + Err(e) => return Err(e), + }; // Will otherwise cause "got message with lower index than committed" loop. // Maybe this can be removed, since fb0917bfa44ec1fc55967 can pass if we remove // this constraint. diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_core.rs b/proxy_components/mock-engine-store/src/mock_store/mock_core.rs index fcbe37f43bb..bea14f04572 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_core.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_core.rs @@ -43,8 +43,12 @@ impl MockRegion { #[derive(Default)] pub struct RegionStats { pub pre_handle_count: AtomicU64, + // Count of call to `ffi_fast_add_peer`. pub fast_add_peer_count: AtomicU64, pub apply_snap_count: AtomicU64, + // FAP is finished building. Whether succeed or not. + pub finished_fast_add_peer_count: AtomicU64, + pub started_fast_add_peers: std::sync::Mutex>, } // In case of newly added cfs. diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs index 7e7f68bab8e..ce722ab23f5 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_engine_store_server.rs @@ -76,6 +76,20 @@ impl EngineStoreServer { f(self.region_states.borrow_mut().get_mut(®ion_id).unwrap()) } + pub fn mutate_region_states_mut( + &self, + region_id: RegionId, + mut f: F, + ) { + let has = self.region_states.borrow().contains_key(®ion_id); + if !has { + self.region_states + .borrow_mut() + .insert(region_id, Default::default()); + } + f(self.region_states.borrow_mut().get_mut(®ion_id).unwrap()) + } + pub fn get_mem( &self, region_id: u64, diff --git a/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs b/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs index 547e33dd5fc..520e7e09cdb 100644 --- a/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs +++ b/proxy_components/mock-engine-store/src/mock_store/mock_fast_add_peer_impls.rs @@ -125,6 +125,7 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer( let store_id = (*store.engine_store_server).id; (*store.engine_store_server).mutate_region_states(region_id, |e: &mut RegionStats| { e.fast_add_peer_count.fetch_add(1, Ordering::SeqCst); + e.started_fast_add_peers.lock().unwrap().insert(region_id); }); let failed_add_peer_res = @@ -147,6 +148,13 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer( }); 0 })() != 0; + let force_wait_for_data: bool = (|| { + fail::fail_point!("fap_mock_force_wait_for_data", |t| { + let t = t.unwrap().parse::().unwrap(); + t + }); + 0 + })() != 0; let fail_after_write: bool = (|| { fail::fail_point!("fap_mock_fail_after_write", |t| { let t = t.unwrap().parse::().unwrap(); @@ -156,6 +164,10 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer( })() != 0; debug!("recover from remote peer: enter from {} to {}", from_store, store_id; "region_id" => region_id); + if force_wait_for_data { + debug!("recover from remote peer: force_wait_for_data from {} to {}", from_store, store_id; "region_id" => region_id); + return failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::WaitForData); + } for retry in 0..300 { let mut ret: Option = None; if retry > 0 { @@ -309,13 +321,33 @@ pub(crate) unsafe extern "C" fn ffi_fast_add_peer( if block_wait { continue; } else { + (*store.engine_store_server).mutate_region_states( + region_id, + |e: &mut RegionStats| { + e.finished_fast_add_peer_count + .fetch_add(1, Ordering::SeqCst); + }, + ); return r; } } - _ => return r, + _ => { + (*store.engine_store_server).mutate_region_states( + region_id, + |e: &mut RegionStats| { + e.finished_fast_add_peer_count + .fetch_add(1, Ordering::SeqCst); + }, + ); + return r; + } } } } error!("recover from remote peer: failed after retry"; "region_id" => region_id); + (*store.engine_store_server).mutate_region_states(region_id, |e: &mut RegionStats| { + e.finished_fast_add_peer_count + .fetch_add(1, Ordering::SeqCst); + }); failed_add_peer_res(interfaces_ffi::FastAddPeerStatus::BadData) } diff --git a/proxy_tests/proxy/shared/fast_add_peer/fp.rs b/proxy_tests/proxy/shared/fast_add_peer/fp.rs index 8c8941e8de2..5c5dbf5d240 100644 --- a/proxy_tests/proxy/shared/fast_add_peer/fp.rs +++ b/proxy_tests/proxy/shared/fast_add_peer/fp.rs @@ -853,6 +853,7 @@ fn test_fall_back_to_slow_path() { fail::cfg("fap_core_no_fast_path", "panic").unwrap(); pd_client.must_add_peer(1, new_learner_peer(2, 2)); + // FAP will fail for "can't find entry for index 9 of region 1". check_key(&cluster, b"k2", b"v2", Some(true), None, Some(vec![1, 2])); must_wait_until_cond_node( &cluster.cluster_ext, @@ -943,3 +944,102 @@ fn test_single_replica_migrate() { fail::remove("on_pre_write_apply_state"); cluster.shutdown(); } + +// Test MsgSnapshot before MsgAppend +/// According to https://github.com/tikv/raft-rs/blob/2aefbf627f243dd261b7585ef1250d32efd9dfe7/src/raft.rs#L842, +/// if log is truncated in Leader, a MsgSnapshot may be sent directly before a +/// MsgAppend. If such MsgSnapshot is received when a FAP snapshot IS BUILDING, +/// then it will be dropped. +#[test] +fn test_msgsnapshot_before_msgappend() { + let (mut cluster, pd_client) = new_mock_cluster_snap(0, 2); + pd_client.disable_default_operator(); + fail::cfg("post_apply_snapshot_allow_no_unips", "return").unwrap(); + cluster.cfg.proxy_cfg.engine_store.enable_fast_add_peer = true; + + tikv_util::set_panic_hook(true, "./"); + // Can always apply snapshot immediately + fail::cfg("on_can_apply_snapshot", "return(true)").unwrap(); + fail::cfg("on_pre_write_apply_state", "return").unwrap(); + + let _ = cluster.run_conf_change(); + + cluster.must_put(b"k1", b"v1"); + check_key(&cluster, b"k1", b"v1", Some(true), None, Some(vec![1])); + cluster.must_put(b"k2", b"v2"); + + fail::cfg("fap_core_no_fallback", "panic").unwrap(); + fail::cfg("fap_mock_force_wait_for_data", "return(1)").unwrap(); + pd_client.must_add_peer(1, new_learner_peer(2, 2)); + + std::thread::sleep(Duration::from_secs(1)); + + // Trigger direct MsgSnapshot. + let region = cluster.get_region("k1".as_bytes()); + let prev_state = maybe_collect_states(&cluster.cluster_ext, 1, Some(vec![1])); + let (compact_index, compact_term) = get_valid_compact_index(&prev_state); + debug!("compact at index {}", compact_index); + let compact_log = test_raftstore::new_compact_log_request(compact_index, compact_term); + let req = test_raftstore::new_admin_request(1, region.get_region_epoch(), compact_log); + let res = cluster + .call_command_on_leader(req, Duration::from_secs(3)) + .unwrap(); + + let mut t = 0; + while true { + let mut buf = Vec::::new(); + cluster + .get_engines(1) + .raft + .get_all_entries_to(1, &mut buf) + .unwrap(); + if buf.len() == 1 { + break; + } + std::thread::sleep(std::time::Duration::from_secs(1)); + t += 1; + assert!(t < 11); + } + + // MsgSnapshot will be rejected before. + fail::remove("fap_mock_force_wait_for_data"); + cluster.clear_send_filters(); + + pd_client.must_add_peer(1, new_learner_peer(2, 2)); + + iter_ffi_helpers(&cluster, Some(vec![2]), &mut |_, ffi: &mut FFIHelperSet| { + let mut x: u64 = 0; + let mut y: u64 = 0; + (*ffi.engine_store_server).mutate_region_states_mut(1, |e: &mut RegionStats| { + x = e.finished_fast_add_peer_count.load(Ordering::SeqCst); + }); + (*ffi.engine_store_server).mutate_region_states_mut(1, |e: &mut RegionStats| { + y = e.started_fast_add_peers.lock().unwrap().len() as u64; + }); + assert_eq!(x, y); + }); + + // FAP will fail for "can't find entry for index 9 of region 1". + check_key(&cluster, b"k2", b"v2", Some(true), None, Some(vec![1, 2])); + must_wait_until_cond_node( + &cluster.cluster_ext, + 1, + Some(vec![2]), + &|states: &States| -> bool { + find_peer_by_id(states.in_disk_region_state.get_region(), 2).is_some() + }, + ); + + iter_ffi_helpers(&cluster, Some(vec![2]), &mut |_, ffi: &mut FFIHelperSet| { + assert_eq!( + ffi.engine_store_server_helper + .query_fap_snapshot_state(1, 2, 0, 0), + proxy_ffi::interfaces_ffi::FapSnapshotState::NotFound + ); + }); + + fail::remove("on_can_apply_snapshot"); + fail::remove("on_pre_write_apply_state"); + fail::remove("fap_core_no_fallback"); + cluster.shutdown(); +} diff --git a/proxy_tests/proxy/shared/pprof_jemalloc.rs b/proxy_tests/proxy/shared/pprof_jemalloc.rs index 9aadeacc8eb..cdf985c83d2 100644 --- a/proxy_tests/proxy/shared/pprof_jemalloc.rs +++ b/proxy_tests/proxy/shared/pprof_jemalloc.rs @@ -4,8 +4,6 @@ use std::path::Path; use tempfile::NamedTempFile; -use crate::utils::v1::*; - #[test] fn test_adhoc_dump_prof() { use proxy_server::status_server::vendored_utils::{ @@ -17,8 +15,8 @@ fn test_adhoc_dump_prof() { let _ = activate_prof(); } - let x = vec![1; 1000]; - let y = vec![1; 1000]; + let _x = vec![1; 1000]; + let _y = vec![1; 1000]; let f = NamedTempFile::new().unwrap(); let path = f.path().to_str().unwrap(); diff --git a/proxy_tests/proxy/shared/replica_read.rs b/proxy_tests/proxy/shared/replica_read.rs index d998a173820..d87a6ab87d2 100644 --- a/proxy_tests/proxy/shared/replica_read.rs +++ b/proxy_tests/proxy/shared/replica_read.rs @@ -7,7 +7,7 @@ use engine_store_ffi::ffi::{ ProtoMsgBaseBuff, }; -use crate::{shared::ffi, utils::v1::*}; +use crate::utils::v1::*; #[derive(Default)] struct GcMonitor { @@ -230,7 +230,7 @@ fn test_read_index_applying() { cluster.must_put(b"k0", b"v"); { let prev_state = maybe_collect_states(&cluster.cluster_ext, r1, Some(vec![1])); - let (compact_index, compact_term) = get_valid_compact_index_by(&prev_state, Some(vec![1])); + let _ = get_valid_compact_index_by(&prev_state, Some(vec![1])); } cluster.pd_client.must_none_pending_peer(p2.clone()); // assert_eq!(cluster.pd_client.get_pending_peers().len(), 0); @@ -360,22 +360,21 @@ fn test_util() { use kvproto::{ kvrpcpb::{Context, DiskFullOpt, KeyRange}, raft_cmdpb::{CmdType, RaftCmdRequest, RaftRequestHeader, Request as RaftRequest}, - raft_serverpb::RaftMessage, }; use raftstore::{ router::RaftStoreRouter, store::{msg::Callback, RaftCmdExtraOpts, ReadIndexContext}, }; use tokio::sync::oneshot; -use txn_types::{Key, Lock, LockType, TimeStamp}; +use txn_types::{Key, Lock, LockType}; use uuid::Uuid; -use crate::utils::v1_server::{new_server_cluster, ChannelBuilder, Environment, TikvClient}; +use crate::utils::v1_server::new_server_cluster; // https://github.com/tikv/tikv/issues/16823 #[test] fn test_raft_cmd_request_cant_advanve_max_ts() { - use kvproto::kvrpcpb::{ReadIndexRequest, ReadIndexResponse}; + use kvproto::kvrpcpb::ReadIndexRequest; let mut cluster = new_server_cluster(0, 1); cluster.run(); @@ -384,10 +383,6 @@ fn test_raft_cmd_request_cant_advanve_max_ts() { let region = cluster.get_region(b""); let leader = region.get_peers()[0].clone(); - let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); - - let env = Arc::new(Environment::new(1)); - let channel = ChannelBuilder::new(env).connect(&addr); let mut ctx = Context::default(); let region_id = leader.get_id(); @@ -468,7 +463,6 @@ fn test_raft_cmd_request_learner_advanve_max_ts() { let region = cluster.get_region(b""); assert_eq!(region_id, 1); assert_eq!(region.get_id(), 1); - let leader = region.get_peers()[0].clone(); fail::cfg("on_pre_write_apply_state", "return(true)").unwrap(); let learner = new_learner_peer(2, 2); @@ -493,11 +487,6 @@ fn test_raft_cmd_request_learner_advanve_max_ts() { ); guards[0].with_lock(|l| *l = Some(lock.clone())); - let addr = cluster.sim.rl().get_addr(learner.get_store_id()).to_owned(); - - let env = Arc::new(Environment::new(1)); - let channel = ChannelBuilder::new(env).connect(&addr); - // cluster.must_put(b"k", b"v"); let read_index = |ranges: &[(&[u8], &[u8])]| { @@ -518,7 +507,7 @@ fn test_raft_cmd_request_learner_advanve_max_ts() { r.set_end_key(e.to_vec()); read_index_request.mut_ranges().push(r); } - let mut cmd = + let cmd = proxy_ffi::read_index_helper::gen_read_index_raft_cmd_req(&mut read_index_request); let (result_tx, result_rx) = oneshot::channel(); @@ -584,12 +573,6 @@ fn test_raft_message_can_advanve_max_ts() { let region = cluster.get_region(b""); let leader = region.get_peers()[0].clone(); let follower = new_learner_peer(2, 2); - let addr = cluster.sim.rl().get_addr(leader.get_store_id()).to_owned(); - - let env = Arc::new(Environment::new(1)); - let channel = ChannelBuilder::new(env).connect(&addr); - - let region_id = leader.get_id(); let read_index = |ranges: &[(&[u8], &[u8])]| { let mut m = raft::eraftpb::Message::default(); @@ -627,7 +610,7 @@ fn test_raft_message_can_advanve_max_ts() { // wait a while until the node updates its own max ts let prev_cm_max_ts = cm.max_ts(); - let (resp, start_ts) = read_index(&[(b"l", b"yz")]); + let (_, start_ts) = read_index(&[(b"l", b"yz")]); cluster.must_put(b"a", b"b"); std::thread::sleep(Duration::from_millis(2000)); // assert!(!resp.has_locked());